#!/usr/bin/env python # Primadiag SAS - Paris # Author: Gino D. # Copyright (c) 2023 Primadiag # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE import os import sys import time import json import signal import tempfile import binascii import textwrap import subprocess from serial import Serial from dotenv import dotenv_values from typing import List, Optional, Tuple, Union BUFFER_SIZE: int = 256 class Tab(): colsize: Tuple = () def __init__(self, *length: int): self.colsize = length def blank(self, text: str = '-'): print(text * sum(self.colsize)) def print(self, *texts: str): def coltext(text: str, length: int = 20) -> str: text = str(text) if len(text) >= length: text = text[:(length - 4) if length > 5 else length] + '...' return text + ' ' * (length - len(text)) line = '' for i, text in enumerate(texts[:len(self.colsize)]): line += coltext(text, self.colsize[i]) for text in texts[len(self.colsize):]: line += coltext(text) print(line) def labels(self, *texts: str, blank_text: str = '-'): self.blank(blank_text) self.print(*texts) self.blank(blank_text) class Telnet: def __init__(self, IP: str, login: str, password: str): import telnetlib from collections import deque self.tn = telnetlib.Telnet(IP, timeout=15) self.fifo = deque() if b'Login as:' in self.tn.read_until(b'Login as:'): self.tn.write(bytes(login, 'ascii') + b'\r\n') if b'Password:' in self.tn.read_until(b'Password:'): time.sleep(0.2) self.tn.write(bytes(password, 'ascii') + b'\r\n') if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.'): return raise Exception('Failed to establish a Telnet connection with the board') def __del__(self): self.close() def close(self): try: self.tn.close() except: pass def read(self, size: int = 1) -> bytes: timeout = 0 while len(self.fifo) < size and not timeout >= 8: timeout = 0 data = self.tn.read_eager() if len(data): self.fifo.extend(data) timeout = 0 else: timeout += 1 time.sleep(0.25) data = b'' while len(data) < size and len(self.fifo) > 0: data += bytes([self.fifo.popleft()]) return data def write(self, data: bytes): self.tn.write(data) return len(data) def inWaiting(self) -> int: n_waiting = len(self.fifo) if not n_waiting: data = self.tn.read_eager() self.fifo.extend(data) n_waiting = len(data) return n_waiting class Pyboard(object): i: int = 0 serial: Union[Serial, Telnet] def __init__(self, device: str, baudrate: int = 115200, login: str = '', password: str = ''): is_telnet = device and device.count('.') == 3 for _ in range(0, 3): try: if is_telnet: self.serial = Telnet(device, login, password) else: self.serial = Serial(device, baudrate=baudrate, interCharTimeout=1) break except: time.sleep(1) else: raise Exception(f'Failed to access device: {device}') def close(self): self.serial.close() def transfer_status(self) -> int: arrows = ['◜', '◝', '◞', '◟'] sys.stdout.write(f'[∘] Transfering... {arrows[self.i]}\r') sys.stdout.flush() self.i = (self.i + 1) % 4 def stdout_write_bytes(self, data: str): sys.stdout.buffer.write(data.replace(b'\x04', b'')) sys.stdout.buffer.flush() def send_ok(self) -> bytes: self.serial.write(b'\x04') return b'OK' def send_ctrl_a(self) -> bytes: # ctrl-A: enter raw REPL self.serial.write(b'\x01') return b'raw REPL; CTRL-B to exit\r\n>' def send_ctrl_b(self): # ctrl-B: exit raw REPL self.serial.write(b'\x02') def send_ctrl_c(self) -> bytes: # ctrl-C twice: interrupt any running program for _ in range(0, 2): self.serial.write(b'\x03') return b'raw REPL; CTRL-B to exit\r\n' def send_ctrl_d(self) -> bytes: # ctrl-D: soft reset self.serial.write(b'\x04') return b'soft reboot\r\n' def read_until(self, delimiter: bytes, stream_output: bool = False, show_status: bool = False) -> Optional[bytes]: data = self.serial.read(1) if stream_output: self.stdout_write_bytes(data) timeout = 0 max_len = len(delimiter) while not timeout >= 1000: if data.endswith(delimiter): return data elif self.serial.inWaiting() > 0: timeout = 0 stream_data = self.serial.read(1) data += stream_data if stream_output: self.stdout_write_bytes(stream_data) data = data[-max_len:] elif show_status: self.transfer_status() else: timeout += 1 time.sleep(0.0001) def __enter__(self): self.send_ctrl_c() n = self.serial.inWaiting() while n > 0: self.serial.read(n) n = self.serial.inWaiting() for _ in range(0, 5): if self.read_until(self.send_ctrl_a()): break time.sleep(0.01) else: raise Exception('REPL: could not enter') if not self.read_until(self.send_ctrl_d()): raise Exception('REPL: could not soft reboot') if not self.read_until(self.send_ctrl_c()): raise Exception('REPL: could not interrupt after soft reboot') return self.__terminal def __exit__(self, a, b, c): self.send_ctrl_b() def __terminal(self, command: str, stream_output: bool = False) -> Optional[str]: command = textwrap.dedent(command) # send input if not isinstance(command, bytes): command = bytes(command, encoding='utf8') if not self.read_until(b'>'): raise Exception('Terminal: prompt has been lost') for i in range(0, len(command), BUFFER_SIZE): if not stream_output: self.transfer_status() self.serial.write(command[i: min(i + BUFFER_SIZE, len(command))]) time.sleep(0.0001) if not self.read_until(self.send_ok()): raise Exception('Terminal: could not execute command') # catch output data = self.read_until(b'\x04', stream_output=stream_output, show_status=not stream_output) if not data: raise Exception('Terminal: timeout waiting for first EOF reception') exception = self.read_until(b'\x04') if not exception: raise Exception('Terminal: timeout waiting for second EOF reception') data, exception = (data[:-1].decode('utf-8'), exception[:-1].decode('utf-8')) if exception: reason = 'Traceback (most recent call last):' traceback = exception.split(reason) if len(traceback) == 2: for call in traceback[1][:-2].split('\\r\\n'): reason += f'{call}\n' raise Exception(f'{"-"* 45}\n{reason.strip()}') else: raise Exception(f'{"-" * 45}\n{exception}') return data.strip() class FileSystem(object): pyboard: Pyboard def __init__(self, pyboard: Pyboard): self.pyboard = pyboard def checksum(self, source: str, data: str) -> str: output = self.terminal(f""" def checksum(data): v = 21 for c in data.decode("utf-8"): v ^= ord(c) return v with open("{source}", "rb") as fh: print(checksum(fh.read())) """) if isinstance(data, bytes): data = data.decode('utf-8') v = 21 for c in data: v ^= ord(c) if int(v) == int(output): return 'OK' return f'{v} != {output}' def get(self, filename: str) -> Tuple[bytes, bool]: if not filename.startswith('/'): filename = '/' + filename output = self.terminal(f""" import sys import ubinascii with open('{filename}', 'rb') as infile: while True: result = infile.read({BUFFER_SIZE}) if result == b'': break sys.stdout.write(ubinascii.hexlify(result)) """) output = binascii.unhexlify(output) return (output, self.checksum(filename, output)) def download(self, source: str, destination: str) -> str: output, checksum = self.get(source) with open(destination, 'wb') as fh: fh.write(output) return checksum def put(self, filename: str, data: bytes) -> Tuple[str, bool]: if not filename.startswith('/'): filename = '/' + filename if not isinstance(data, bytes): data = bytes(data, encoding='utf8') try: if os.path.dirname(filename): self.mkdir(os.path.dirname(filename)) with self.pyboard as terminal: size = len(data) terminal(f"""fh = open("{filename}", "wb")""") for i in range(0, size, BUFFER_SIZE): chunk_size = min(BUFFER_SIZE, size - i) chunk = repr(data[i : i + chunk_size]) terminal(f"""fh.write({chunk})""") terminal("""fh.close()""") except Exception as e: raise e return (filename, self.checksum(filename, data)) def upload(self, source: str, destination: str) -> str: with open(source, 'rb') as fh: _, checksum = self.put(destination, fh.read()) return checksum def ls(self, dirname: str = '/') -> Tuple[int, List, str]: dirname = dirname.strip('./').strip('/') output = self.terminal(f""" try: import os import json except ImportError: import uos as os import ujson as json def ls(dirname): e = [] s = os.stat(dirname) if s[0] == 0x4000: if not dirname.endswith("/"): dirname += "/" for t in os.ilistdir(dirname): if dirname.startswith('/'): dirname = dirname[1:] if t[1] == 0x4000: e.append((dirname + t[0] + '/', -1)) e.extend(ls(dirname + t[0] + '/')) else: e.append((dirname + t[0], os.stat(dirname + t[0])[6])) else: e.append((dirname, s[6])) return e try: s = 1 r = ls("{dirname}") x = '' except Exception as e: s = 0 r = [("{dirname}", -2)] x = str(e) print(json.dumps([s, r, x])) """) return json.loads(output) def rm(self, filename: str) -> Tuple[int, List, str]: if not filename.startswith('/'): filename = '/' + filename output = self.terminal(f""" try: import os import json except ImportError: import uos as os import ujson as json def ls(dirname): e = [] if not dirname.endswith("/"): dirname += "/" for t in os.ilistdir(dirname): if dirname.startswith('/'): dirname = dirname[1:] if t[1] == 0x4000: e.append((dirname + t[0] + '/', -1)) e.extend(ls(dirname + t[0] + '/')) else: e.append((dirname + t[0], os.stat(dirname + t[0])[6])) return e def rm(filename): r = [] if os.stat(filename)[0] == 0x4000: e = ls(filename) if filename != '/': e.append((filename, -1)) for f, s in e: if not s == -1: try: os.remove(f) r.append((f, 1, '')) except Exception as e: r.append((f, 0, str(e))) for f, s in e: if s == -1: try: os.rmdir(f) r.append((f, 1, '')) except Exception as e: r.append((f, 0, str(e))) else: try: os.remove(filename) r.append((filename, 1, '')) except Exception as e: r.append((filename, 0, str(e))) return r try: s = 1 r = rm("{filename}") x = '' except Exception as e: s = 0 r = [("{filename}", 0, str(e))] x = str(e) print(json.dumps([s, r, x])) """) return json.loads(output) def mkdir(self, dirname: str) -> List: if not dirname.startswith('/'): dirname = '/' + dirname if dirname.endswith('/'): dirname = dirname[:-1] output = self.terminal(f""" try: import os import json except ImportError: import uos as os import ujson as json r = [] d = [] for zd in str("{dirname}").split("/"): if not zd: continue d.append(zd) zd = "/".join(d) try: os.mkdir(zd) r.append(("/" + zd, 1)) except Exception as e: if str(e).find('EEXIST'): r.append(("/" + zd, 1)) else: r.append(("/" + zd, 0, str(e))) print(json.dumps(r)) """) return json.loads(output) def launch(self, filename: str): try: self.terminal(f""" with open("{filename}", "r") as fh: exec(fh.read()) """, stream_output=True) except Exception as e: print(str(e)) def terminal(self, command: str, stream_output: bool = False) -> str: with self.pyboard as terminal: try: return terminal(command, stream_output=stream_output) except Exception as e: raise e class Picowatch(object): def __init__(self, pyboard: Pyboard, project_name: str = 'src'): self.filesystem = FileSystem(pyboard) project_name = project_name.strip('./').strip('/') if not project_name or project_name == '/': raise Exception('Project name is incorrect, can not be empty or only a forward slash "/"!') self.project_name = project_name.replace(os.sep, '/') self.project_dirname = os.path.join(os.getcwd(), self.project_name).replace(os.sep, '/') if not os.path.isdir(self.project_dirname): raise Exception('Project is not a directory!') signal.signal(signal.SIGINT, lambda a, b: self.interupt()) def interupt(self): self.filesystem.pyboard.send_ctrl_c() def terminal(self, command: str): self.filesystem.terminal(command, stream_output=True) def listing(self, filepath: str = '/'): filepath = filepath.strip('./') tab = Tab(4, 30, 15, 100) tab.labels('[ ]', 'Filename', 'Size (kb)', 'Exception') status, output, exception = self.filesystem.ls(filepath) if status: for filename, size in output: if size == -1: tab.print('[*]', filename, '-') else: tab.print('[*]', filename, f'{round(size / 1024, 2)} kb') else: tab.print('[?]', filepath, '', str(exception)) def contents(self, filename: str): filename = filename.strip('./').strip('/') content, _ = self.filesystem.get(filename) print('-' * 50) for ln in content.decode('utf-8').split('\n'): print(ln) def upload(self, filepath: str): filepath = filepath.strip('./').strip('/') queue = [] source = os.path.join(self.project_dirname, filepath).replace(os.sep, '/') if os.path.isdir(source): for root, _, files in os.walk(source, followlinks=True): for filename in files: filename = os.path.join(root, filename).replace(os.sep, '/') queue.append((filename, filename.replace(self.project_dirname, ''))) elif os.path.exists(source): queue.append((source, filepath)) tab = Tab(4, 30, 15, 100) tab.labels('[ ]', 'Filename', 'Checksum', 'Exception') for source, destination in queue: destination = destination.strip('/') try: tab.print('[↑]', destination, self.filesystem.upload(source, destination)) except Exception as e: tab.print('[?]', destination, '', str(e)) def download(self, filepath: str): filepath = filepath.strip('./').strip('/') tab = Tab(4, 30, 15, 100) tab.labels('', 'Filename', 'Checksum', 'Exception') status, output, exception = self.filesystem.ls(filepath) if status: for remote, size in output: if size == -1: os.makedirs(os.path.join(self.project_dirname, remote), 777, exist_ok=True) for remote, size in output: destination = os.path.join(self.project_dirname, remote).replace(os.sep, '/') if not size == -1: try: tab.print('[↓]', remote, self.filesystem.download(remote, destination)) except Exception as e: tab.print('[?]', remote, '', str(e)) else: tab.print('[?]', filepath, '', exception) def delete(self, filepath: str): filepath = filepath.strip('./') tab = Tab(4, 30, 100) tab.labels('[ ]', 'Filename', 'Exception') status, output, exception = self.filesystem.rm(filepath) if status: for filename, checked, exception in output: if checked: tab.print('[-]', filename) else: tab.print('[?]', filename, exception) else: tab.print('[?]', filepath, exception) def compare(self, filepath: str): filepath = filepath.strip('./').strip('/') content, _ = self.filesystem.get(filepath) fh, tempname = tempfile.mkstemp() try: with os.fdopen(fh, 'wb') as tmp: tmp.write(content) subprocess.Popen(f'code --diff "{tempname}" "{os.path.join(self.project_dirname, filepath)}"', stdout=subprocess.PIPE, shell=True).communicate() finally: input('Press Enter to delete temp file.') os.remove(tempname) def status(self, return_output: bool = False): changes = [] try: output = subprocess.check_output(['git', 'status', '-s'], stderr=subprocess.STDOUT) for filename in [f.strip() for f in output.decode('utf-8').split('\n')]: if not filename: continue status, filename = filename.split(' ') if filename.startswith(self.project_name): if status in ['A', 'M', '??']: changes.append((1, filename.replace(self.project_name + '/', ''))) elif status == 'D': changes.append((-1, filename.replace(self.project_name + '/', ''))) except Exception as e: print(e.output.decode('utf-8').strip()) finally: if return_output: return changes tab = Tab(4, 40) tab.labels('[ ]', 'Filename') for status, filename in changes: tab.print('[+]' if status == 1 else '[-]', filename) def push(self): tab = Tab(4, 30, 15, 100) tab.labels('[ ]', 'Filename', 'Checksum', 'Exception') changes = self.status(return_output=True) for filepath in [filename for status, filename in changes if status == -1]: filepath = filepath.strip('/') status, output, exception = self.filesystem.rm(filepath) if status: for filename, checked, exception in output: if checked: tab.print('[-]', filename) else: tab.print('[?]', filename, '', exception) else: tab.print('[?]', filepath, '', exception) queue = [] for filepath in [filename for status, filename in changes if status == 1]: filepath = filepath.strip('/') source = os.path.join(self.project_dirname, filepath).replace(os.sep, '/') if os.path.isdir(source): for root, _, files in os.walk(source, followlinks=True): for file in files: file = os.path.join(root, file).replace(os.sep, '/') queue.append((file, file.replace(self.project_dirname, ''))) elif os.path.exists(source): queue.append((source, filepath)) for source, destination in queue: destination = destination.strip('/') try: tab.print('[↑]', destination, self.filesystem.upload(source, destination)) except Exception as e: tab.print('[?]', destination, '', str(e)) print('Pico board up to date.') def test(self, filename: str): with open(os.path.join(self.project_dirname, filename), 'r') as fh: self.filesystem.terminal(fh.read(), stream_output=True) def launch(self, filepath: str): self.filesystem.launch(filepath) print('Welcome to Picowatch Terminal') picowatch = False try: env = dotenv_values('.picowatch') picowatch = Picowatch(Pyboard(env["DEVICE"], env["BAUDRATE"]), env["PROJECT_NAME"]) print(f'Connected automatically to device: {env["DEVICE"]} at a baudrate of: {env["BAUDRATE"]}. Listening to project: ./{env["PROJECT_NAME"]}') except: while not picowatch: print('-' * 50) device = input('Port: ').strip() baudrate = input('Baudrate (115200): ').strip() or 115200 project_name = input('Project name (src): ').strip() or 'src' try: picowatch = Picowatch(Pyboard(device, baudrate), project_name) print('-' * 50) print(f'Connected to device: {picowatch} at a baudrate of: {baudrate}. Listening to project: ./{project_name}') except Exception as e: print(str(e)) picowatch.interupt() while True: try: unmessage = input('>>> ').strip() for message in unmessage.split('&'): match message.strip().split(' '): case ['exit']: sys.exit() case ['help']: print('TODO') case ['reboot']: picowatch.terminal('help()') case ['ls' | 'list', *source]: picowatch.listing(source[0] if source else '/') case ['cat' | 'code', source]: picowatch.contents(source) case ['rm' | 'delete', source]: picowatch.delete(source) case ['put' | 'upload', source]: picowatch.upload(source) case ['get' | 'download', source]: picowatch.download(source) case ['diff' | 'compare', filename]: picowatch.compare(filename) case ['status']: picowatch.status(return_output=False) case ['push']: picowatch.push() case ['compile', filename]: # https://pypi.org/project/mpy-cross/ pass case ['install', package_name]: pass case ['test', filename]: picowatch.test(filename) case ['!']: picowatch.test('main.py') case ['!!']: picowatch.launch('main.py') case _: if message.startswith('./'): picowatch.launch(message[2:]) elif message: print(f'"{message}" is not recognized.') except Exception as e: print(str(e))