import os import sys import time import json import serial import binascii import textwrap from typing import Dict, List, Optional, Tuple class Pyboard: PYBOARD_BUFFER_SIZE: int = 256 serial: serial.Serial def __init__(self, device: str, baudrate: int = 115200): for _ in range(0, 3): try: self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1) break except: time.sleep(1) else: raise Exception(f'Failed to access {device}') def close(self): self.serial.close() def stdout_write_bytes(self, data: str): sys.stdout.buffer.write(data.replace(b'\x04', b'')) sys.stdout.buffer.flush() def send_ok(self) -> bytes: self.serial.write(b'\x04') return b'OK' def send_ctrl_a(self) -> bytes: # ctrl-A: enter raw REPL self.serial.write(b'\x01') return b'raw REPL; CTRL-B to exit\r\n>' def send_ctrl_b(self): # ctrl-B: exit raw REPL self.serial.write(b'\x02') def send_ctrl_c(self, max_times: int = 2) -> bytes: # ctrl-C twice: interrupt any running program for _ in range(0, max_times): self.serial.write(b'\x03') time.sleep(0.1) return b'raw REPL; CTRL-B to exit\r\n' def send_ctrl_d(self) -> bytes: # ctrl-D: soft reset self.serial.write(b'\x04') return b'soft reboot\r\n' def read_until(self, delimiter: bytes, stream_output: bool = False) -> Optional[bytes]: data = self.serial.read(1) if stream_output: self.stdout_write_bytes(data) timeout = 0 max_len = len(delimiter) while not timeout >= 1000: if data.endswith(delimiter): return data elif self.serial.inWaiting() > 0: timeout = 0 stream_data = self.serial.read(1) data += stream_data if stream_output: self.stdout_write_bytes(stream_data) data = data[-max_len:] else: timeout += 1 time.sleep(0.001) def __enter__(self): self.send_ctrl_c() n = self.serial.inWaiting() while n > 0: self.serial.read(n) n = self.serial.inWaiting() for _ in range(0, 5): if self.read_until(self.send_ctrl_a()): break time.sleep(0.01) else: raise Exception('REPL: could not enter') if not self.read_until(self.send_ctrl_d()): raise Exception('REPL: could not soft reboot') if not self.read_until(self.send_ctrl_c()): raise Exception('REPL: could not interrupt after soft reboot') return self.__terminal def __exit__(self, a, b, c): self.send_ctrl_b() def __terminal(self, command: str, stream_output: bool = False, catch_output: bool = True) -> Optional[str]: command = textwrap.dedent(command) # send input if not isinstance(command, bytes): command = bytes(command, encoding='utf8') if not self.read_until(b'>'): raise Exception('Terminal: prompt has been lost') for i in range(0, len(command), Pyboard.PYBOARD_BUFFER_SIZE): self.serial.write(command[i: min(i + Pyboard.PYBOARD_BUFFER_SIZE, len(command))]) time.sleep(0.001) if not self.read_until(self.send_ok()): raise Exception('Terminal: could not execute command') if not catch_output: return # catch output data = self.read_until(b'\x04', stream_output=stream_output) if not data: raise Exception('Terminal: timeout waiting for first EOF reception') exception = self.read_until(b'\x04') if not exception: raise Exception('Terminal: timeout waiting for second EOF reception') data, exception = (data[:-1].decode('utf-8'), exception[:-1].decode('utf-8')) if exception: reason = 'Traceback (most recent call last):' traceback = exception.split(reason) if len(traceback) == 2: for call in traceback[1][:-2].split('\\r\\n'): reason += f'{call}\n' raise Exception(reason.strip()) else: raise Exception(exception) return data.strip() class Picowatch(object): _pyboard: Pyboard @property def pyboard(self) -> Optional[Pyboard]: return self._pyboard def __init__(self, pyboard: Pyboard): self._pyboard = pyboard def checksum(self, source: str, data: str = '') -> bool: output = self.terminal(f""" def checksum(data): v = 21 for c in data.decode("utf-8"): v ^= ord(c) return v with open("{source}", "rb") as fh: print(checksum(fh.read())) """) if isinstance(data, bytes): data = data.decode('utf-8') v = 21 for c in data: v ^= ord(c) return str(v) == output def get(self, filename: str) -> Tuple[bytes, bool]: if not filename.startswith('/'): filename = '/' + filename output = self.terminal(f""" import sys import ubinascii with open('{filename}', 'rb') as infile: while True: result = infile.read({Pyboard.PYBOARD_BUFFER_SIZE}) if result == b'': break sys.stdout.write(ubinascii.hexlify(result)) """) output = binascii.unhexlify(output) return (output, self.checksum(filename, output)) def save(self, source: str, destination: str) -> Tuple[str, bool]: output, checksum = self.get(source) with open(destination, 'wb') as fh: fh.write(output) return (destination, checksum) def put(self, filename: str, data: bytes) -> Tuple[str, bool]: if not filename.startswith('/'): filename = '/' + filename if not isinstance(data, bytes): data = bytes(data, encoding='utf8') try: if os.path.dirname(filename): self.mkdir(os.path.dirname(filename)) with self._pyboard as terminal: size = len(data) terminal(f"""fh = open("{filename}", "wb")""") for i in range(0, size, Pyboard.PYBOARD_BUFFER_SIZE): chunk_size = min(Pyboard.PYBOARD_BUFFER_SIZE, size - i) chunk = repr(data[i : i + chunk_size]) terminal(f"""fh.write({chunk})""") terminal("""fh.close()""") except Exception as e: raise e return (filename, self.checksum(filename, data)) def copy(self, source: str, destination: str) -> Tuple[str, int, bool]: with open(source, 'rb') as fh: return self.put(destination, fh.read()) def ls(self, dirname: str = '/') -> List[Dict[str, int]]: if not dirname.startswith('/'): dirname = '/' + dirname if not dirname.endswith('/'): dirname += '/' output = self.terminal(f""" try: import os import json except ImportError: import uos as os import ujson as json def ls(dirname): d = [] f = [] if not dirname.endswith("/"): dirname += "/" for t in os.ilistdir(dirname): if t[1] == 0x4000: d.append(dirname + t[0] + '/') zd, zf = ls(dirname + t[0] + '/') d.extend(zd) f.extend(zf) else: f.append((dirname + t[0], os.stat(dirname + t[0])[6])) return d, f print(json.dumps(ls("{dirname}"))) """) return json.loads(output) def rm(self, filename: str) -> List: if not filename.startswith('/'): filename = '/' + filename output = self.terminal(f""" try: import os import json except ImportError: import uos as os import ujson as json def ls(dirname): d = [] f = [] if not dirname.endswith("/"): dirname += "/" for t in os.ilistdir(dirname): if t[1] == 0x4000: d.append(dirname + t[0] + '/') zd, zf = ls(dirname + t[0] + '/') d.extend(zd) f.extend(zf) else: f.append(dirname + t[0]) return d, f def rm(filename): r = [] if os.stat(filename)[0] == 0x4000: d, f = ls(filename) for zf in f: try: os.remove(zf) r.append((zf, 1)) except Exception as e: r.append((zf, 0, str(e))) for zd in d: try: os.rmdir(zd) r.append((zd, 1)) except Exception as e: r.append((zd, 0, str(e))) try: os.rmdir(filename) r.append((filename, 1)) except Exception as e: r.append((filename, 0, str(e))) else: try: os.remove(filename) r.append((filename, 1)) except Exception as e: r.append((filename, 0, str(e))) return r try: print(json.dumps(rm("{filename}"))) except Exception as e: print(json.dumps([("{filename}", 0, str(e))])) """) return json.loads(output) def mkdir(self, dirname: str) -> List: if not dirname.startswith('/'): dirname = '/' + dirname if dirname.endswith('/'): dirname = dirname[:-1] output = self.terminal(f""" try: import os import json except ImportError: import uos as os import ujson as json r = [] d = [] for zd in str("{dirname}").split("/"): if not zd: continue d.append(zd) zd = "/".join(d) try: os.mkdir(zd) r.append(("/" + zd, 1)) except Exception as e: if str(e).find('EEXIST'): r.append(("/" + zd, 1)) else: r.append(("/" + zd, 0, str(e))) print(json.dumps(r)) """) return json.loads(output) def launch(self, filename: str): try: self.terminal(f""" with open("{filename}", "r") as fh: exec(fh.read()) """, stream_output=True) except Exception as e: print(str(e)) def devmode(self, filename: str): with self._pyboard as terminal: try: with open(filename, 'r') as fh: terminal(fh.read(), stream_output=True) except Exception as e: print(str(e)) def terminal(self, command: str, stream_output: bool = False) -> str: with self._pyboard as terminal: try: return terminal(command, stream_output=stream_output) except Exception as e: raise e # picowatch = Picowatch(Pyboard('COM5')) # with open('./setenv.py', 'rb') as fh: # print(picowatch.put('setenv.py', fh.read())) # print(picowatch.get('setenv.py')) # print(picowatch.save('main.py', 'cpmain.py')) # print(picowatch.copy('cpmain.py', 'main.py')) # picowatch.devmode('main.py')