#!/usr/bin/env python # Primadiag SAS - Paris # Author: Gino D. # Copyright (c) 2023 Primadiag # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE import os import sys import time import json import signal import tempfile import binascii import textwrap import mpy_cross import subprocess from serial import Serial from dotenv import dotenv_values from typing import List, Optional, Tuple, Union BUFFER_SIZE: int = 256 class Tab(): bordered: bool = False colsize: List = [] text_to_right: List = [] def __init__(self, *colsizes: int, nb_columns: int = 0, bordered: bool = False): self.colsize = list(colsizes) self.bordered = bordered auto_size = nb_columns - len(self.colsize) if nb_columns > 0 and auto_size > 0: terminal_size = os.get_terminal_size() column_size = int((terminal_size.columns - sum(self.colsize)) / auto_size) for i in range(len(self.colsize), nb_columns): self.colsize.append(column_size) def head(self, *texts: str): self.border('=') self.line(*texts, bordered=False) self.border('=') def border(self, text: str = '-'): sys.stdout.write(text * sum(self.colsize) + '\n') def align_to_right(self, *column_num: int): self.text_to_right = [i - 1 for i in column_num] def line(self, *texts: str, bordered: bool = None): lines = {} max_lines = 0 for column_num, text in enumerate(texts[:len(self.colsize)]): max_length = self.colsize[column_num] lineno = -1 lines[column_num] = (max_length, []) for paragraph in str(text).split('\n'): lineno += 1 lines[column_num][1].append([]) for word in paragraph.split(' '): word = word.strip() next_sentence = ' '.join(lines[column_num][1][lineno] + [word]) if len(next_sentence) >= max_length: lineno += 1 lines[column_num][1].append([]) lines[column_num][1][lineno].append(word) max_lines = max(max_lines, lineno) for i in range(0, max_lines + 1): output = '' for column_num, line in lines.items(): width, bag_of_words = line if len(bag_of_words) > i: sentence = ' '.join(bag_of_words[i]) if column_num in self.text_to_right: output += ' ' * (width - len(sentence) - 1) + sentence + ' ' else: output += sentence + ' ' * (width - len(sentence)) else: output += ' ' * width sys.stdout.write(output + '\n') if bordered == False: return if bordered or self.bordered: self.border('-') class Telnet: def __init__(self, IP: str, login: str, password: str): import telnetlib from collections import deque self.tn = telnetlib.Telnet(IP, timeout=15) self.fifo = deque() if b'Login as:' in self.tn.read_until(b'Login as:'): self.tn.write(bytes(login, 'ascii') + b'\r\n') if b'Password:' in self.tn.read_until(b'Password:'): time.sleep(0.2) self.tn.write(bytes(password, 'ascii') + b'\r\n') if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.'): return raise Exception('Failed to establish a Telnet connection with the board') def __del__(self): self.close() def close(self): try: self.tn.close() except: pass def read(self, size: int = 1) -> bytes: timeout = 0 while len(self.fifo) < size and not timeout >= 8: timeout = 0 data = self.tn.read_eager() if len(data): self.fifo.extend(data) timeout = 0 else: timeout += 1 time.sleep(0.25) data = b'' while len(data) < size and len(self.fifo) > 0: data += bytes([self.fifo.popleft()]) return data def write(self, data: bytes): self.tn.write(data) return len(data) def inWaiting(self) -> int: n_waiting = len(self.fifo) if not n_waiting: data = self.tn.read_eager() self.fifo.extend(data) n_waiting = len(data) return n_waiting class Pyboard(object): i: int = 0 serial: Union[Serial, Telnet] def __init__(self, device: str, baudrate: int = 115200, login: str = '', password: str = ''): is_telnet = device and device.count('.') == 3 for _ in range(0, 3): try: if is_telnet: self.serial = Telnet(device, login, password) else: self.serial = Serial(device, baudrate=baudrate, interCharTimeout=1) break except: time.sleep(1) else: raise Exception(f'Failed to access device: {device}') def close(self): self.serial.close() def transfer_status(self) -> int: arrows = ['◜', '◝', '◞', '◟'] sys.stdout.write(f'[∘] Transfering... {arrows[self.i]}\r') sys.stdout.flush() self.i = (self.i + 1) % 4 def stdout_write_bytes(self, data: str): sys.stdout.buffer.write(data.replace(b'\x04', b'')) sys.stdout.buffer.flush() def send_ok(self) -> bytes: self.serial.write(b'\x04') return b'OK' def send_ctrl_a(self) -> bytes: # ctrl-A: enter raw REPL self.serial.write(b'\x01') return b'raw REPL; CTRL-B to exit\r\n>' def send_ctrl_b(self): # ctrl-B: exit raw REPL self.serial.write(b'\x02') def send_ctrl_c(self) -> bytes: # ctrl-C twice: interrupt any running program for _ in range(0, 2): self.serial.write(b'\x03') return b'raw REPL; CTRL-B to exit\r\n' def send_ctrl_d(self) -> bytes: # ctrl-D: soft reset self.serial.write(b'\x04') return b'soft reboot\r\n' def read_until(self, delimiter: bytes, stream_output: bool = False, show_status: bool = False) -> Optional[bytes]: data = self.serial.read(1) if stream_output: self.stdout_write_bytes(data) timeout = 0 max_len = len(delimiter) while not timeout >= 1000: if data.endswith(delimiter): return data elif self.serial.inWaiting() > 0: timeout = 0 stream_data = self.serial.read(1) data += stream_data if stream_output: self.stdout_write_bytes(stream_data) data = data[-max_len:] elif show_status: self.transfer_status() else: timeout += 1 time.sleep(0.0001) def __enter__(self): self.send_ctrl_c() n = self.serial.inWaiting() while n > 0: self.serial.read(n) n = self.serial.inWaiting() for _ in range(0, 5): if self.read_until(self.send_ctrl_a()): break time.sleep(0.01) else: raise Exception('REPL: could not enter') if not self.read_until(self.send_ctrl_d()): raise Exception('REPL: could not soft reboot') if not self.read_until(self.send_ctrl_c()): raise Exception('REPL: could not interrupt after soft reboot') return self.__terminal def __exit__(self, a, b, c): self.send_ctrl_b() def __terminal(self, command: str, stream_output: bool = False) -> Optional[str]: command = textwrap.dedent(command) # send input if not isinstance(command, bytes): command = bytes(command, encoding='utf8') if not self.read_until(b'>'): raise Exception('Terminal: prompt has been lost') for i in range(0, len(command), BUFFER_SIZE): if not stream_output: self.transfer_status() self.serial.write(command[i: min(i + BUFFER_SIZE, len(command))]) time.sleep(0.0001) if not self.read_until(self.send_ok()): raise Exception('Terminal: could not execute command') # catch output data = self.read_until(b'\x04', stream_output=stream_output, show_status=not stream_output) if not data: raise Exception('Terminal: timeout waiting for first EOF reception') exception = self.read_until(b'\x04') if not exception: raise Exception('Terminal: timeout waiting for second EOF reception') data, exception = (data[:-1].decode('utf-8'), exception[:-1].decode('utf-8')) if exception: reason = 'Traceback (most recent call last):' traceback = exception.split(reason) if len(traceback) == 2: for call in traceback[1][:-2].split('\\r\\n'): reason += f'{call}\n' raise Exception('' + reason.strip()) else: raise Exception(exception) return data.strip() class FileSystem(object): pyboard: Pyboard def __init__(self, pyboard: Pyboard): self.pyboard = pyboard def checksum(self, source: str, data: str) -> str: output = self.terminal(f""" def checksum(data): v = 21 for c in data.decode('utf-8'): v ^= ord(c) return v with open('{source}', 'rb') as fh: print(checksum(fh.read())) """) if isinstance(data, bytes): data = data.decode('utf-8') v = 21 for c in data: v ^= ord(c) if int(v) == int(output): return 'OK' return f'{v} != {output}' def get(self, filename: str) -> Tuple[bytes, bool]: if not filename.startswith('/'): filename = '/' + filename output = self.terminal(f""" import sys import ubinascii with open('{filename}', 'rb') as infile: while True: result = infile.read({BUFFER_SIZE}) if result == b'': break sys.stdout.write(ubinascii.hexlify(result)) """) output = binascii.unhexlify(output) if filename.endswith('.mpy'): return (output, '???') return (output, self.checksum(filename, output)) def download(self, source: str, destination: str) -> str: output, checksum = self.get(source) with open(destination, 'wb') as fh: fh.write(output) return checksum def put(self, filename: str, data: bytes) -> Tuple[str, bool]: if not filename.startswith('/'): filename = '/' + filename if not isinstance(data, bytes): data = bytes(data, encoding='utf8') try: if os.path.dirname(filename): self.mkdir(os.path.dirname(filename)) with self.pyboard as terminal: size = len(data) terminal(f"""fh = open('{filename}', 'wb')""") for i in range(0, size, BUFFER_SIZE): chunk_size = min(BUFFER_SIZE, size - i) chunk = repr(data[i : i + chunk_size]) terminal(f"""fh.write({chunk})""") terminal("""fh.close()""") except Exception as e: raise e if filename.endswith('.mpy'): return (filename, '???') return (filename, self.checksum(filename, data)) def upload(self, source: str, destination: str) -> str: with open(source, 'rb') as fh: _, checksum = self.put(destination, fh.read()) return checksum def ls(self, dirname: str = '/') -> Tuple[int, List, str]: dirname = dirname.strip('./').strip('/') output = self.terminal(f""" try: import os import json except ImportError: import uos as os import ujson as json def ls(dirname): e = [] s = os.stat(dirname) if s[0] == 0x4000: if not dirname.endswith('/'): dirname += '/' e.append((dirname, -1)) for t in os.ilistdir(dirname): if dirname.startswith('/'): dirname = dirname[1:] if t[1] == 0x4000: e.extend(ls(dirname + t[0] + '/')) else: e.append((dirname + t[0], os.stat(dirname + t[0])[6])) else: e.append((dirname, s[6])) return sorted(e) try: s = 1 r = ls('{dirname}') x = '' except Exception as e: s = 0 r = [('{dirname}', -2)] x = str(e) print(json.dumps([s, r, x])) """) return json.loads(output) def rm(self, filename: str) -> Tuple[int, List, str]: if not filename.startswith('/'): filename = '/' + filename output = self.terminal(f""" try: import os import json except ImportError: import uos as os import ujson as json def ls(dirname): e = [] if not dirname.endswith('/'): dirname += '/' for t in os.ilistdir(dirname): if dirname.startswith('/'): dirname = dirname[1:] if t[1] == 0x4000: e.append((dirname + t[0] + '/', -1)) e.extend(ls(dirname + t[0] + '/')) else: e.append((dirname + t[0], os.stat(dirname + t[0])[6])) return sorted(e, reverse=True) def rm(filename): r = [] if os.stat(filename)[0] == 0x4000: e = ls(filename) if filename != '/': e.append((filename.strip('/'), -1)) for f, s in e: if not s == -1: try: os.remove(f) r.append((f, 1, '')) except Exception as e: r.append((f, 0, str(e))) for f, s in e: if s == -1: try: os.rmdir(f) r.append((f, 1, '')) except Exception as e: r.append((f, 0, str(e))) else: try: os.remove(filename) r.append((filename, 1, '')) except Exception as e: r.append((filename, 0, str(e))) return r try: s = 1 r = rm('{filename}') x = '' except Exception as e: s = 0 r = [('{filename}', 0, str(e))] x = str(e) print(json.dumps([s, r, x])) """) return json.loads(output) def mkdir(self, dirname: str) -> List: if not dirname.startswith('/'): dirname = '/' + dirname if dirname.endswith('/'): dirname = dirname[:-1] output = self.terminal(f""" try: import os import json except ImportError: import uos as os import ujson as json r = [] d = [] for zd in str('{dirname}').split('/'): if not zd: continue d.append(zd) zd = '/'.join(d) try: os.mkdir(zd) r.append(('/' + zd, 1)) except Exception as e: if str(e).find('EEXIST'): r.append(('/' + zd, 1)) else: r.append(('/' + zd, 0, str(e))) print(json.dumps(r)) """) return json.loads(output) def launch(self, filename: str): try: self.terminal(f""" with open('{filename}', 'r') as fh: exec(fh.read()) """, stream_output=True) except Exception as e: print(str(e)) def terminal(self, command: str, stream_output: bool = False) -> str: with self.pyboard as terminal: try: return terminal(command, stream_output=stream_output) except Exception as e: raise e class Picowatch(object): def __init__(self, pyboard: Pyboard): self.filesystem = FileSystem(pyboard) signal.signal(signal.SIGINT, lambda a, b: self.interupt()) def interupt(self): self.filesystem.pyboard.send_ctrl_c() def terminal(self, command: str): self.filesystem.terminal(command, stream_output=True) def internal_ls(self, filepath: str): queue = [] if filepath == '/': filepath = os.getcwd().replace(os.sep, '/') else: filepath = os.path.join(os.getcwd(), filepath.strip('./').strip('/')).replace(os.sep, '/') if os.path.isfile(filepath): queue = [(filepath, os.stat(filepath)[6])] elif os.path.isdir(filepath): def ls(dirname: str): e = [] if os.path.isdir(dirname): if not dirname.endswith('/'): dirname += '/' for filename in os.listdir(dirname): if filename.startswith('.'): continue filename = dirname + filename if os.path.isdir(filename): e.extend(ls(filename)) else: e.append((filename, os.stat(filename)[6])) return e queue = ls(filepath) return queue def listing(self, filepath: str = '/'): filepath = filepath.strip('./') tab = Tab(4, 30, 15, 100) tab.head('[ ]', 'Filename', 'Size (kb)', 'Exception') status, output, exception = self.filesystem.ls(filepath) if status: for filename, size in output: if size == -1: tab.line('[*]', filename, '-') else: tab.line('[*]', filename, f'{round(size / 1024, 2)} kb') else: tab.line('[?]', filepath, '', str(exception)) def contents(self, filename: str): filename = filename.strip('./').strip('/') content, _ = self.filesystem.get(filename) print('-' * 50) for ln in content.decode('utf-8').split('\n'): print(ln) def upload(self, filepath: str): tab = Tab(4, 30, 15, 15, 100) tab.head('[ ]', 'Filename', 'Size (kb)', 'Checksum', 'Exception') for source, size in self.internal_ls(filepath): destination = source.replace(os.getcwd().replace(os.sep, '/'), '').strip('/') try: tab.line('[↑]', destination, f'{round(size / 1024, 2)} kb', self.filesystem.upload(source, destination)) except Exception as e: tab.line('[?]', destination, '', '', str(e)) def download(self, filepath: str): tab = Tab(4, 30, 15, 100) tab.head('[ ]', 'Filename', 'Checksum', 'Exception') status, output, exception = self.filesystem.ls(filepath.strip('./').strip('/')) if status: for remote, size in output: if size == -1: os.makedirs(os.path.join(os.getcwd(), remote), 777, exist_ok=True) for remote, size in output: destination = os.path.join(os.getcwd(), remote).replace(os.sep, '/') if not size == -1: try: tab.line('[↓]', remote, self.filesystem.download(remote, destination)) except Exception as e: tab.line('[?]', remote, '', str(e)) else: tab.line('[?]', filepath, '', exception) def delete(self, filepath: str): tab = Tab(4, 30, 100) tab.head('[ ]', 'Filename', 'Exception') status, output, exception = self.filesystem.rm(filepath.strip('./')) if status: for filename, checked, exception in output: if checked: tab.line('[-]', filename) else: tab.line('[?]', filename, exception) else: tab.line('[?]', filepath, exception) def compare(self, filepath: str): content, _ = self.filesystem.get(filepath.strip('./').strip('/')) fh, tempname = tempfile.mkstemp() try: with os.fdopen(fh, 'wb') as tmp: tmp.write(content) subprocess.Popen(f'code --diff "{tempname}" "{os.path.join(os.getcwd(), filepath)}"', stdout=subprocess.PIPE, shell=True).communicate() finally: input('Press Enter to delete temp file.') os.remove(tempname) def status(self, return_output: bool = False): changes = [] try: output = subprocess.check_output(['git', 'status', '-s'], stderr=subprocess.STDOUT) for filename in [f.strip() for f in output.decode('utf-8').split('\n')]: if not filename: continue status, filename = filename.split(' ') if status in ['A', 'M', '??']: changes.append((1, filename)) elif status == 'D': changes.append((-1, filename)) except Exception as e: try: print(e.output.decode('utf-8').strip()) except: print(e.decode('utf-8').strip()) finally: if return_output: return changes tab = Tab(4, 40) tab.head('[ ]', 'Filename') for status, filename in changes: tab.line('[+]' if status == 1 else '[-]', filename) def push(self): tab = Tab(4, 30, 15, 100) tab.head('[ ]', 'Filename', 'Size (kb)', 'Checksum', 'Exception') changes = self.status(return_output=True) for filepath in [filename for status, filename in changes if status == -1]: filepath = filepath.strip('/') status, output, exception = self.filesystem.rm(filepath) if status: for filename, checked, exception in output: if checked: tab.line('[-]', filename, '', 'DELETED') else: tab.line('[?]', filename, '', '', exception) else: tab.line('[?]', filepath, '', exception) queue = [] for filepath in [filename for status, filename in changes if status == 1]: queue.extend(self.internal_ls(filepath)) for source, size in queue: destination = source.replace(os.getcwd().replace(os.sep, '/'), '').strip('/') try: tab.line('[↑]', destination, f'{round(size / 1024, 2)} kb', self.filesystem.upload(source, destination)) except Exception as e: tab.line('[?]', destination, '', '', str(e)) print('Pico board up to date.') def compile(self, filename: str): _, error = mpy_cross.run(filename, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True).communicate() if error: print(error.decode('utf-8')) else: print(f'MicroPython File from "{filename}" created!') def test(self, filename: str): with open(os.path.join(os.getcwd(), filename), 'r') as fh: self.filesystem.terminal(fh.read(), stream_output=True) def launch(self, filename: str): self.filesystem.launch(filename) print('Welcome to Picowatch Terminal') print(f'Listening to project: {os.getcwd().replace(os.sep, "/")}/') picowatch = False try: env = dotenv_values('.picowatch') picowatch = Picowatch(Pyboard(env["DEVICE"], env["BAUDRATE"])) print(f'Connected automatically to device: {env["DEVICE"]} at a baudrate of: {env["BAUDRATE"]}') except: while not picowatch: print('-' * 50) device = input('Port: ').strip() baudrate = input('Baudrate (115200): ').strip() or 115200 try: picowatch = Picowatch(Pyboard(device, baudrate)) print('-' * 50) print(f'Connected to device: {device} and baudrate: {baudrate}') with open(os.path.join(os.getcwd(), '.picowatch'), 'w+') as fh: fh.write(f'DEVICE = "{device}"\n') fh.write(f'BAUDRATE = {baudrate}\n') except Exception as e: print(str(e)) print('-' * 50) picowatch.interupt() while True: try: unmessage = input('>>> ').strip() for message in unmessage.split('&'): match message.strip().split(' '): case ['exit']: sys.exit() case ['help']: print('TODO') case ['reboot']: picowatch.terminal('help()') case ['ls' | 'list', *source]: picowatch.listing(source[0] if source else '/') case ['cat' | 'code', source]: picowatch.contents(source) case ['rm' | 'delete', source]: picowatch.delete(source) case ['put' | 'upload', source]: picowatch.upload(source) case ['get' | 'download', source]: picowatch.download(source) case ['diff' | 'compare', filename]: picowatch.compare(filename) case ['status']: picowatch.status(return_output=False) case ['push']: picowatch.push() case ['mpy' | 'compile', filename]: picowatch.compile(filename) case ['install', package_name]: pass case ['test', filename]: picowatch.test(filename) case ['!']: picowatch.test('main.py') case ['!!']: picowatch.launch('main.py') case _: if message.startswith('./'): picowatch.launch(message[2:]) elif message: print(f'"{message}" is not recognized.') except Exception as e: print(str(e))