From 074bcaa77639873040effa72badbed13f09eae41 Mon Sep 17 00:00:00 2001 From: Gino D Date: Sat, 7 Jan 2023 18:45:57 +0100 Subject: [PATCH] Boot mode implemented, optimized execution time and fixed bugs --- src/picowatch.py | 112 +++++++++++++++++++++++++---------------------- 1 file changed, 60 insertions(+), 52 deletions(-) diff --git a/src/picowatch.py b/src/picowatch.py index 95346d0..f3dfb2c 100644 --- a/src/picowatch.py +++ b/src/picowatch.py @@ -192,7 +192,7 @@ class Pyboard(object): i: int = 0 serial: Union[Serial, Telnet] - def __init__(self, device: str, baudrate: int = 115200, login: str = '', password: str = ''): + def __init__(self, device: str, baudrate: int = 115200, login: str = 'micro', password: str = 'python'): is_telnet = device and device.count('.') == 3 for _ in range(0, 3): @@ -214,84 +214,91 @@ class Pyboard(object): arrows = ['◜', '◝', '◞', '◟'] sys.stdout.write(f'[∘] Transfering... {arrows[self.i]}\r') sys.stdout.flush() - self.i = (self.i + 1) % 4 - - def stdout_write_bytes(self, data: str): - sys.stdout.buffer.write(data.replace(b'\x04', b'')) - sys.stdout.buffer.flush() + self.i = (self.i + 1) % 4 def send_ok(self) -> bytes: self.serial.write(b'\x04') return b'OK' def send_ctrl_a(self) -> bytes: - # ctrl-A: enter raw REPL + # Ctrl-A on a blank line will enter raw REPL mode. This is like a permanent paste mode, except that characters are not echoed back. self.serial.write(b'\x01') return b'raw REPL; CTRL-B to exit\r\n>' def send_ctrl_b(self): - # ctrl-B: exit raw REPL + # Ctrl-B on a blank like goes to normal REPL mode. self.serial.write(b'\x02') def send_ctrl_c(self) -> bytes: - # ctrl-C twice: interrupt any running program + # Ctrl-C cancels any input, or interrupts the currently running code. for _ in range(0, 2): self.serial.write(b'\x03') return b'raw REPL; CTRL-B to exit\r\n' def send_ctrl_d(self) -> bytes: - # ctrl-D: soft reset + # Ctrl-D on a blank line will do a soft reset. self.serial.write(b'\x04') return b'soft reboot\r\n' - def read_until(self, delimiter: bytes, stream_output: bool = False, show_status: bool = False) -> Optional[bytes]: - data = self.serial.read(1) - - if stream_output: - self.stdout_write_bytes(data) - - timeout = 0 - max_len = len(delimiter) - - while not timeout >= 1000: - if data.endswith(delimiter): - return data - elif self.serial.inWaiting() > 0: - timeout = 0 - stream_data = self.serial.read(1) - data += stream_data - - if stream_output: - self.stdout_write_bytes(stream_data) - data = data[-max_len:] - elif show_status: - self.transfer_status() - else: - timeout += 1 - time.sleep(0.0001) - - def __enter__(self): - self.send_ctrl_c() + def until_nothing_in_waiting(self): n = self.serial.inWaiting() while n > 0: self.serial.read(n) n = self.serial.inWaiting() + def read_until(self, delimiter: bytes, stream_output: bool = False, show_status: bool = False) -> Optional[bytes]: + data = b'' + timeout = 0 + max_len = len(delimiter) + + while not timeout >= 100: + if data.endswith(delimiter): + return data + elif self.serial.inWaiting() > 0: + timeout = 0 + data += self.serial.read(1) + + if stream_output: + sys.stdout.buffer.write(data[:-1]) + sys.stdout.buffer.flush() + data = data[-max_len:] + elif show_status: + self.transfer_status() + else: + timeout += 1 + time.sleep(0.001) + + def boot(self): + self.send_ctrl_c() + self.until_nothing_in_waiting() + + if not self.read_until(self.send_ctrl_d()): + raise Exception('REPL: could not soft reboot') + + data = b'' + while not data.startswith(b'.\r\n>>>'): + if self.serial.inWaiting(): + output = self.serial.read(1) + data = data[-5:] + output + sys.stdout.buffer.write(output) + sys.stdout.buffer.flush() + + print(' ' + ('=' * 50)) + print('REPL: was interrupted') + + def __enter__(self): + self.send_ctrl_c() + self.until_nothing_in_waiting() + for _ in range(0, 5): if self.read_until(self.send_ctrl_a()): break time.sleep(0.01) else: - raise Exception('REPL: could not enter') - - if not self.read_until(self.send_ctrl_d()): - raise Exception('REPL: could not soft reboot') - - if not self.read_until(self.send_ctrl_c()): - raise Exception('REPL: could not interrupt after soft reboot') + raise Exception('Terminal: could not enter raw REPL mode') return self.__terminal @@ -300,24 +307,19 @@ class Pyboard(object): def __terminal(self, command: str, stream_output: bool = False) -> Optional[str]: command = textwrap.dedent(command) - # send input - if not isinstance(command, bytes): - command = bytes(command, encoding='utf8') - if not self.read_until(b'>'): - raise Exception('Terminal: prompt has been lost') + if not isinstance(command, bytes): + command = bytes(command, encoding='utf-8') for i in range(0, len(command), BUFFER_SIZE): if not stream_output: self.transfer_status() self.serial.write(command[i: min(i + BUFFER_SIZE, len(command))]) - time.sleep(0.0001) if not self.read_until(self.send_ok()): raise Exception('Terminal: could not execute command') - # catch output data = self.read_until(b'\x04', stream_output=stream_output, show_status=not stream_output) if not data: @@ -609,8 +611,12 @@ class Picowatch(object): self.filesystem = FileSystem(pyboard) signal.signal(signal.SIGINT, lambda a, b: self.interupt()) + def boot(self): + self.filesystem.pyboard.boot() + def interupt(self): self.filesystem.pyboard.send_ctrl_c() + self.filesystem.pyboard.until_nothing_in_waiting() def terminal(self, command: str): self.filesystem.terminal(command, stream_output=True) @@ -854,6 +860,8 @@ while True: sys.exit('Picowatch Terminal disconnected!') case ['help']: print('TODO') + case ['boot']: + picowatch.boot() case ['whois']: print('TODO') case ['reboot']: