Boot mode implemented, optimized execution time and fixed bugs
This commit is contained in:
110
src/picowatch.py
110
src/picowatch.py
@@ -192,7 +192,7 @@ class Pyboard(object):
|
||||
i: int = 0
|
||||
serial: Union[Serial, Telnet]
|
||||
|
||||
def __init__(self, device: str, baudrate: int = 115200, login: str = '', password: str = ''):
|
||||
def __init__(self, device: str, baudrate: int = 115200, login: str = 'micro', password: str = 'python'):
|
||||
is_telnet = device and device.count('.') == 3
|
||||
|
||||
for _ in range(0, 3):
|
||||
@@ -216,82 +216,89 @@ class Pyboard(object):
|
||||
sys.stdout.flush()
|
||||
self.i = (self.i + 1) % 4
|
||||
|
||||
def stdout_write_bytes(self, data: str):
|
||||
sys.stdout.buffer.write(data.replace(b'\x04', b''))
|
||||
sys.stdout.buffer.flush()
|
||||
|
||||
def send_ok(self) -> bytes:
|
||||
self.serial.write(b'\x04')
|
||||
return b'OK'
|
||||
|
||||
def send_ctrl_a(self) -> bytes:
|
||||
# ctrl-A: enter raw REPL
|
||||
# Ctrl-A on a blank line will enter raw REPL mode. This is like a permanent paste mode, except that characters are not echoed back.
|
||||
self.serial.write(b'\x01')
|
||||
return b'raw REPL; CTRL-B to exit\r\n>'
|
||||
|
||||
def send_ctrl_b(self):
|
||||
# ctrl-B: exit raw REPL
|
||||
# Ctrl-B on a blank like goes to normal REPL mode.
|
||||
self.serial.write(b'\x02')
|
||||
|
||||
def send_ctrl_c(self) -> bytes:
|
||||
# ctrl-C twice: interrupt any running program
|
||||
# Ctrl-C cancels any input, or interrupts the currently running code.
|
||||
for _ in range(0, 2):
|
||||
self.serial.write(b'\x03')
|
||||
|
||||
return b'raw REPL; CTRL-B to exit\r\n'
|
||||
|
||||
def send_ctrl_d(self) -> bytes:
|
||||
# ctrl-D: soft reset
|
||||
# Ctrl-D on a blank line will do a soft reset.
|
||||
self.serial.write(b'\x04')
|
||||
return b'soft reboot\r\n'
|
||||
|
||||
def read_until(self, delimiter: bytes, stream_output: bool = False, show_status: bool = False) -> Optional[bytes]:
|
||||
data = self.serial.read(1)
|
||||
|
||||
if stream_output:
|
||||
self.stdout_write_bytes(data)
|
||||
|
||||
timeout = 0
|
||||
max_len = len(delimiter)
|
||||
|
||||
while not timeout >= 1000:
|
||||
if data.endswith(delimiter):
|
||||
return data
|
||||
elif self.serial.inWaiting() > 0:
|
||||
timeout = 0
|
||||
stream_data = self.serial.read(1)
|
||||
data += stream_data
|
||||
|
||||
if stream_output:
|
||||
self.stdout_write_bytes(stream_data)
|
||||
data = data[-max_len:]
|
||||
elif show_status:
|
||||
self.transfer_status()
|
||||
else:
|
||||
timeout += 1
|
||||
time.sleep(0.0001)
|
||||
|
||||
def __enter__(self):
|
||||
self.send_ctrl_c()
|
||||
def until_nothing_in_waiting(self):
|
||||
n = self.serial.inWaiting()
|
||||
|
||||
while n > 0:
|
||||
self.serial.read(n)
|
||||
n = self.serial.inWaiting()
|
||||
|
||||
def read_until(self, delimiter: bytes, stream_output: bool = False, show_status: bool = False) -> Optional[bytes]:
|
||||
data = b''
|
||||
timeout = 0
|
||||
max_len = len(delimiter)
|
||||
|
||||
while not timeout >= 100:
|
||||
if data.endswith(delimiter):
|
||||
return data
|
||||
elif self.serial.inWaiting() > 0:
|
||||
timeout = 0
|
||||
data += self.serial.read(1)
|
||||
|
||||
if stream_output:
|
||||
sys.stdout.buffer.write(data[:-1])
|
||||
sys.stdout.buffer.flush()
|
||||
data = data[-max_len:]
|
||||
elif show_status:
|
||||
self.transfer_status()
|
||||
else:
|
||||
timeout += 1
|
||||
time.sleep(0.001)
|
||||
|
||||
def boot(self):
|
||||
self.send_ctrl_c()
|
||||
self.until_nothing_in_waiting()
|
||||
|
||||
if not self.read_until(self.send_ctrl_d()):
|
||||
raise Exception('REPL: could not soft reboot')
|
||||
|
||||
data = b''
|
||||
while not data.startswith(b'.\r\n>>>'):
|
||||
if self.serial.inWaiting():
|
||||
output = self.serial.read(1)
|
||||
data = data[-5:] + output
|
||||
sys.stdout.buffer.write(output)
|
||||
sys.stdout.buffer.flush()
|
||||
|
||||
print(' ' + ('=' * 50))
|
||||
print('REPL: was interrupted')
|
||||
|
||||
def __enter__(self):
|
||||
self.send_ctrl_c()
|
||||
self.until_nothing_in_waiting()
|
||||
|
||||
for _ in range(0, 5):
|
||||
if self.read_until(self.send_ctrl_a()):
|
||||
break
|
||||
|
||||
time.sleep(0.01)
|
||||
else:
|
||||
raise Exception('REPL: could not enter')
|
||||
|
||||
if not self.read_until(self.send_ctrl_d()):
|
||||
raise Exception('REPL: could not soft reboot')
|
||||
|
||||
if not self.read_until(self.send_ctrl_c()):
|
||||
raise Exception('REPL: could not interrupt after soft reboot')
|
||||
raise Exception('Terminal: could not enter raw REPL mode')
|
||||
|
||||
return self.__terminal
|
||||
|
||||
@@ -300,24 +307,19 @@ class Pyboard(object):
|
||||
|
||||
def __terminal(self, command: str, stream_output: bool = False) -> Optional[str]:
|
||||
command = textwrap.dedent(command)
|
||||
# send input
|
||||
if not isinstance(command, bytes):
|
||||
command = bytes(command, encoding='utf8')
|
||||
|
||||
if not self.read_until(b'>'):
|
||||
raise Exception('Terminal: prompt has been lost')
|
||||
if not isinstance(command, bytes):
|
||||
command = bytes(command, encoding='utf-8')
|
||||
|
||||
for i in range(0, len(command), BUFFER_SIZE):
|
||||
if not stream_output:
|
||||
self.transfer_status()
|
||||
|
||||
self.serial.write(command[i: min(i + BUFFER_SIZE, len(command))])
|
||||
time.sleep(0.0001)
|
||||
|
||||
if not self.read_until(self.send_ok()):
|
||||
raise Exception('Terminal: could not execute command')
|
||||
|
||||
# catch output
|
||||
data = self.read_until(b'\x04', stream_output=stream_output, show_status=not stream_output)
|
||||
|
||||
if not data:
|
||||
@@ -609,8 +611,12 @@ class Picowatch(object):
|
||||
self.filesystem = FileSystem(pyboard)
|
||||
signal.signal(signal.SIGINT, lambda a, b: self.interupt())
|
||||
|
||||
def boot(self):
|
||||
self.filesystem.pyboard.boot()
|
||||
|
||||
def interupt(self):
|
||||
self.filesystem.pyboard.send_ctrl_c()
|
||||
self.filesystem.pyboard.until_nothing_in_waiting()
|
||||
|
||||
def terminal(self, command: str):
|
||||
self.filesystem.terminal(command, stream_output=True)
|
||||
@@ -854,6 +860,8 @@ while True:
|
||||
sys.exit('Picowatch Terminal disconnected!')
|
||||
case ['help']:
|
||||
print('TODO')
|
||||
case ['boot']:
|
||||
picowatch.boot()
|
||||
case ['whois']:
|
||||
print('TODO')
|
||||
case ['reboot']:
|
||||
|
||||
Reference in New Issue
Block a user