#!/usr/bin/env python # Primadiag SAS - Paris # Author: Gino D. # Copyright (c) 2023 Primadiag # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE import os import sys import time import json import signal import tempfile import binascii import textwrap import subprocess from serial import Serial from typing import List, Optional, Tuple, Union BUFFER_SIZE: int = 126 class Tab(): colsize: Tuple = () def __init__(self, *length: int): self.colsize = length def blank(self, text: str = '-'): print(text * sum(self.colsize)) def print(self, *texts: str): def coltext(text: str, length: int = 20) -> str: text = str(text) if len(text) >= length: text = text[:(length - 4) if length > 5 else length] + '...' return text + ' ' * (length - len(text)) line = '' for i, text in enumerate(texts[:len(self.colsize)]): line += coltext(text, self.colsize[i]) for text in texts[len(self.colsize):]: line += coltext(text) print(line) def labels(self, *texts: str, blank_text: str = '-'): self.blank(blank_text) self.print(*texts) self.blank(blank_text) class Telnet: def __init__(self, IP: str, login: str, password: str): import telnetlib from collections import deque self.tn = telnetlib.Telnet(IP, timeout=15) self.fifo = deque() if b'Login as:' in self.tn.read_until(b'Login as:'): self.tn.write(bytes(login, 'ascii') + b'\r\n') if b'Password:' in self.tn.read_until(b'Password:'): time.sleep(0.2) self.tn.write(bytes(password, 'ascii') + b'\r\n') if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.'): return raise Exception('Failed to establish a Telnet connection with the board') def __del__(self): self.close() def close(self): try: self.tn.close() except: pass def read(self, size: int = 1) -> bytes: timeout = 0 while len(self.fifo) < size and not timeout >= 8: timeout = 0 data = self.tn.read_eager() if len(data): self.fifo.extend(data) timeout = 0 else: timeout += 1 time.sleep(0.25) data = b'' while len(data) < size and len(self.fifo) > 0: data += bytes([self.fifo.popleft()]) return data def write(self, data: bytes): self.tn.write(data) return len(data) def inWaiting(self) -> int: n_waiting = len(self.fifo) if not n_waiting: data = self.tn.read_eager() self.fifo.extend(data) n_waiting = len(data) return n_waiting class Pyboard(object): i: int = 0 serial: Union[Serial, Telnet] def __init__(self, device: str, baudrate: int = 115200, login: str = '', password: str = ''): is_telnet = device and device.count('.') == 3 for _ in range(0, 3): try: if is_telnet: self.serial = Telnet(device, login, password) else: self.serial = Serial(device, baudrate=baudrate, interCharTimeout=1) break except: time.sleep(1) else: raise Exception(f'Failed to access {device}') def close(self): self.serial.close() def transfer_status(self) -> int: arrows = ['◜', '◝', '◞', '◟'] sys.stdout.write(f'[∘] Transfering... {arrows[self.i]}\r') sys.stdout.flush() self.i = (self.i + 1) % 4 def stdout_write_bytes(self, data: str): sys.stdout.buffer.write(data.replace(b'\x04', b'')) sys.stdout.buffer.flush() def send_ok(self) -> bytes: self.serial.write(b'\x04') return b'OK' def send_ctrl_a(self) -> bytes: # ctrl-A: enter raw REPL self.serial.write(b'\x01') return b'raw REPL; CTRL-B to exit\r\n>' def send_ctrl_b(self): # ctrl-B: exit raw REPL self.serial.write(b'\x02') def send_ctrl_c(self) -> bytes: # ctrl-C twice: interrupt any running program for _ in range(0, 2): self.serial.write(b'\x03') return b'raw REPL; CTRL-B to exit\r\n' def send_ctrl_d(self) -> bytes: # ctrl-D: soft reset self.serial.write(b'\x04') return b'soft reboot\r\n' def read_until(self, delimiter: bytes, stream_output: bool = False, show_status: bool = False) -> Optional[bytes]: data = self.serial.read(1) if stream_output: self.stdout_write_bytes(data) timeout = 0 max_len = len(delimiter) while not timeout >= 1000: if data.endswith(delimiter): return data elif self.serial.inWaiting() > 0: timeout = 0 stream_data = self.serial.read(1) data += stream_data if stream_output: self.stdout_write_bytes(stream_data) data = data[-max_len:] elif show_status: self.transfer_status() else: timeout += 1 time.sleep(0.0001) def __enter__(self): self.send_ctrl_c() n = self.serial.inWaiting() while n > 0: self.serial.read(n) n = self.serial.inWaiting() for _ in range(0, 5): if self.read_until(self.send_ctrl_a()): break time.sleep(0.01) else: raise Exception('REPL: could not enter') if not self.read_until(self.send_ctrl_d()): raise Exception('REPL: could not soft reboot') if not self.read_until(self.send_ctrl_c()): raise Exception('REPL: could not interrupt after soft reboot') return self.__terminal def __exit__(self, a, b, c): self.send_ctrl_b() def __terminal(self, command: str, stream_output: bool = False) -> Optional[str]: command = textwrap.dedent(command) # send input if not isinstance(command, bytes): command = bytes(command, encoding='utf8') if not self.read_until(b'>'): raise Exception('Terminal: prompt has been lost') for i in range(0, len(command), BUFFER_SIZE): if not stream_output: self.transfer_status() self.serial.write(command[i: min(i + BUFFER_SIZE, len(command))]) time.sleep(0.0001) if not self.read_until(self.send_ok()): raise Exception('Terminal: could not execute command') # catch output data = self.read_until(b'\x04', stream_output=stream_output, show_status=not stream_output) if not data: raise Exception('Terminal: timeout waiting for first EOF reception') exception = self.read_until(b'\x04') if not exception: raise Exception('Terminal: timeout waiting for second EOF reception') data, exception = (data[:-1].decode('utf-8'), exception[:-1].decode('utf-8')) if exception: reason = 'Traceback (most recent call last):' traceback = exception.split(reason) if len(traceback) == 2: for call in traceback[1][:-2].split('\\r\\n'): reason += f'{call}\n' raise Exception(f'{"-"* 45}\n{reason.strip()}') else: raise Exception(f'{"-" * 45}\n{exception}') return data.strip() class FileSystem(object): pyboard: Pyboard def __init__(self, pyboard: Pyboard): self.pyboard = pyboard def checksum(self, source: str, data: str) -> str: output = self.terminal(f""" def checksum(data): v = 21 for c in data.decode("utf-8"): v ^= ord(c) return v with open("{source}", "rb") as fh: print(checksum(fh.read())) """) if isinstance(data, bytes): data = data.decode('utf-8') v = 21 for c in data: v ^= ord(c) if int(v) == int(output): return 'OK' return f'{v} != {output}' def get(self, filename: str) -> Tuple[bytes, bool]: if not filename.startswith('/'): filename = '/' + filename output = self.terminal(f""" import sys import ubinascii with open('{filename}', 'rb') as infile: while True: result = infile.read({BUFFER_SIZE}) if result == b'': break sys.stdout.write(ubinascii.hexlify(result)) """) output = binascii.unhexlify(output) return (output, self.checksum(filename, output)) def download(self, source: str, destination: str) -> str: output, checksum = self.get(source) with open(destination, 'wb') as fh: fh.write(output) return checksum def put(self, filename: str, data: bytes) -> Tuple[str, bool]: if not filename.startswith('/'): filename = '/' + filename if not isinstance(data, bytes): data = bytes(data, encoding='utf8') try: if os.path.dirname(filename): self.mkdir(os.path.dirname(filename)) with self.pyboard as terminal: size = len(data) terminal(f"""fh = open("{filename}", "wb")""") for i in range(0, size, BUFFER_SIZE): chunk_size = min(BUFFER_SIZE, size - i) chunk = repr(data[i : i + chunk_size]) terminal(f"""fh.write({chunk})""") terminal("""fh.close()""") except Exception as e: raise e return (filename, self.checksum(filename, data)) def upload(self, source: str, destination: str) -> str: with open(source, 'rb') as fh: _, checksum = self.put(destination, fh.read()) return checksum def ls(self, dirname: str = '/') -> Tuple[int, List, str]: if not dirname.startswith('/'): dirname = '/' + dirname output = self.terminal(f""" try: import os import json except ImportError: import uos as os import ujson as json def ls(dirname): e = [] s = os.stat(dirname) if s[0] == 0x4000: if not dirname.endswith("/"): dirname += "/" for t in os.ilistdir(dirname): if t[1] == 0x4000: e.append((dirname + t[0] + '/', -1)) e.extend(ls(dirname + t[0] + '/')) else: e.append((dirname + t[0], os.stat(dirname + t[0])[6])) else: e.append((dirname, s[6])) return e try: s = 1 r = ls("{dirname}") x = '' except Exception as e: s = 0 r = [("{dirname}", -2)] x = str(e) print(json.dumps([s, r, x])) """) return json.loads(output) def rm(self, filename: str) -> Tuple[int, List, str]: if not filename.startswith('/'): filename = '/' + filename output = self.terminal(f""" try: import os import json except ImportError: import uos as os import ujson as json def ls(dirname): e = [] if not dirname.endswith("/"): dirname += "/" for t in os.ilistdir(dirname): if t[1] == 0x4000: e.append((dirname + t[0] + '/', -1)) e.extend(ls(dirname + t[0] + '/')) else: e.append((dirname + t[0], os.stat(dirname + t[0])[6])) return e def rm(filename): r = [] if os.stat(filename)[0] == 0x4000: e = ls(filename) if filename != '/': e.append((filename, -1)) for f, s in e: if not s == -1: try: os.remove(f) r.append((f, 1, '')) except Exception as e: r.append((f, 0, str(e))) for f, s in e: if s == -1: try: os.rmdir(f) r.append((f, 1, '')) except Exception as e: r.append((f, 0, str(e))) else: try: os.remove(filename) r.append((filename, 1, '')) except Exception as e: r.append((filename, 0, str(e))) return r try: s = 1 r = rm("{filename}") x = '' except Exception as e: s = 0 r = [("{filename}", 0, str(e))] x = str(e) print(json.dumps([s, r, x])) """) return json.loads(output) def mkdir(self, dirname: str) -> List: if not dirname.startswith('/'): dirname = '/' + dirname if dirname.endswith('/'): dirname = dirname[:-1] output = self.terminal(f""" try: import os import json except ImportError: import uos as os import ujson as json r = [] d = [] for zd in str("{dirname}").split("/"): if not zd: continue d.append(zd) zd = "/".join(d) try: os.mkdir(zd) r.append(("/" + zd, 1)) except Exception as e: if str(e).find('EEXIST'): r.append(("/" + zd, 1)) else: r.append(("/" + zd, 0, str(e))) print(json.dumps(r)) """) return json.loads(output) def launch(self, filename: str): try: self.terminal(f""" with open("{filename}", "r") as fh: exec(fh.read()) """, stream_output=True) except Exception as e: print(str(e)) def terminal(self, command: str, stream_output: bool = False) -> str: with self.pyboard as terminal: try: return terminal(command, stream_output=stream_output) except Exception as e: raise e class Picowatch(object): def __init__(self, pyboard: Pyboard, project_name: str = 'src'): self.filesystem = FileSystem(pyboard) project_name = project_name.strip('./').strip('/') if project_name == '/': raise Exception('Project name is incorrect!') self.project_name = project_name.replace(os.sep, '/') self.project_dirname = os.path.join(os.getcwd(), self.project_name).replace(os.sep, '/') signal.signal(signal.SIGINT, lambda a, b: self.interupt()) def interupt(self): self.filesystem.pyboard.send_ctrl_c() def terminal(self, command: str): self.filesystem.terminal(command, stream_output=True) def listing(self, filepath: str = '/'): filepath = filepath.strip('./') tab = Tab(4, 30, 15, 30) tab.labels('[ ]', 'Filename', 'Size (kb)', 'Exception') status, output, exception = self.filesystem.ls(filepath) if status: for filename, size in output: if size == -1: tab.print('[*]', filename, '-') else: tab.print('[*]', filename, f'{size}b') else: tab.print('[?]', filepath, '', str(exception)) def contents(self, filename: str): filename = filename.strip('./').strip('/') content, _ = self.filesystem.get(filename) print('-' * 50) for ln in content.decode('utf-8').split('\n'): print(ln) def upload(self, filepath: str): filepath = filepath.strip('./').strip('/') queue = [] source = os.path.join(self.project_dirname, filepath).replace(os.sep, '/') if os.path.isdir(source): for root, _, files in os.walk(source, followlinks=True): for filename in files: filename = os.path.join(root, filename).replace(os.sep, '/') queue.append((filename, filename.replace(self.project_dirname, ''))) elif os.path.exists(source): queue.append((source, filepath)) tab = Tab(4, 30, 15, 30) tab.labels('[ ]', 'Filename', 'Checksum', 'Exception') for source, destination in queue: destination = destination.strip('/') try: tab.print('[↑]', destination, self.filesystem.upload(source, destination)) except Exception as e: tab.print('[?]', destination, '', str(e)) def download(self, filepath: str): filepath = filepath.strip('./').strip('/') tab = Tab(4, 30, 15, 30) tab.labels('', 'Filename', 'Checksum', 'Exception') status, output, exception = self.filesystem.ls(filepath) if status: for remote, size in output: if size == -1: os.makedirs(os.path.join(self.project_dirname, remote), 777, exist_ok=True) for remote, size in output: if not size == -1: try: tab.print('[↓]', remote, self.filesystem.download(remote, os.path.join(self.project_dirname, remote))) except Exception as e: tab.print('[?]', remote, '', str(e)) else: tab.print('[?]', filepath, '', exception) def delete(self, filepath: str): filepath = filepath.strip('./') tab = Tab(4, 30, 30) tab.labels('[ ]', 'Filename', 'Exception') status, output, exception = self.filesystem.rm(filepath) if status: for filename, checked, exception in output: if checked: tab.print('[-]', filename) else: tab.print('[?]', filename, exception) else: tab.print('[?]', filepath, exception) def compare(self, filepath: str): filepath = filepath.strip('./').strip('/') content, _ = self.filesystem.get(filepath) fh, tempname = tempfile.mkstemp() try: with os.fdopen(fh, 'wb') as tmp: tmp.write(content) subprocess.Popen(f'code --diff "{tempname}" "{os.path.join(self.project_dirname, filepath)}"', stdout=subprocess.PIPE, shell=True).communicate() finally: input('Press Enter to delete temp file.') os.remove(tempname) def status(self, return_output: bool = False): changes = [] try: output = subprocess.check_output(['git', 'status', '-s'], stderr=subprocess.STDOUT) for filename in [f.strip() for f in output.decode('utf-8').split('\n')]: if not filename: continue status, filename = filename.split(' ') if filename.startswith(self.project_name): if status in ['A', 'M', '??']: changes.append((1, filename.replace(self.project_name + '/', ''))) elif status == 'D': changes.append((-1, filename.replace(self.project_name + '/', ''))) except Exception as e: print(e.output.decode('utf-8').strip()) finally: if return_output: return changes tab = Tab(4, 40) tab.labels('[ ]', 'Filename') for status, filename in changes: tab.print('[+]' if status == 1 else '[-]', filename) def push(self): tab = Tab(4, 30, 15, 30) tab.labels('[ ]', 'Filename', 'Checksum', 'Exception') changes = self.status(return_output=True) for filepath in [filename for status, filename in changes if status == -1]: filepath = filepath.strip('/') status, output, exception = self.filesystem.rm(filepath) if status: for filename, checked, exception in output: if checked: tab.print('[-]', filename) else: tab.print('[?]', filename, '', exception) else: tab.print('[?]', filepath, '', exception) queue = [] for filepath in [filename for status, filename in changes if status == 1]: filepath = filepath.strip('/') source = os.path.join(self.project_dirname, filepath).replace(os.sep, '/') if os.path.isdir(source): for root, _, files in os.walk(source, followlinks=True): for file in files: file = os.path.join(root, file).replace(os.sep, '/') queue.append((file, file.replace(self.project_dirname, ''))) elif os.path.exists(source): queue.append((source, filepath)) for source, destination in queue: destination = destination.strip('/') try: tab.print('[↑]', destination, self.filesystem.upload(source, destination)) except Exception as e: tab.print('[?]', destination, '', str(e)) print('Pico board up to date.') def test(self, filename: str): with open(os.path.join(self.project_dirname, filename), 'r') as fh: self.filesystem.terminal(fh.read(), stream_output=True) def launch(self, filepath: str): self.filesystem.launch(filepath) print('Welcome to Picowatch Terminal') picowatch = False while not picowatch: print('-' * 30) device = input('Port: ').strip() baudrate = input('Baudrate (115200): ').strip() or 115200 project_name = input('Project name (src): ').strip() or 'src' try: picowatch = Picowatch(Pyboard(device, baudrate), project_name) print(f'Connected to device: {device} at a baudrate of: {baudrate}. Listening to project: {project_name}') print('-' * 30) except Exception as e: print(str(e)) picowatch.interupt() while True: try: unmessage = input('>>> ').strip() for message in unmessage.split('&'): match message.strip().split(' '): case ['exit']: sys.exit() case ['reboot']: picowatch.terminal('help()') case ['ls' | 'list', *source]: picowatch.listing(source[0] if source else '/') case ['cat' | 'code', source]: picowatch.contents(source) case ['rm' | 'delete', source]: picowatch.delete(source) case ['put' | 'upload', source]: picowatch.upload(source) case ['get' | 'download', source]: picowatch.download(source) case ['diff' | 'compare', filename]: picowatch.compare(filename) case ['status']: picowatch.status(return_output=False) case ['push']: picowatch.push() case ['compile', filename]: pass case ['install', package_name]: pass case ['test', filename]: picowatch.test(filename) case ['!']: picowatch.test('main.py') case ['!!']: picowatch.launch('main.py') case _: if message.startswith('./'): picowatch.launch(message[2:]) elif message: print(f'"{message}" is not recognized.') except Exception as e: print(str(e))