From a621509fc57f3d7c6f9463eae228d46b71c2020e Mon Sep 17 00:00:00 2001 From: Gino D Date: Fri, 30 Dec 2022 23:33:59 +0100 Subject: [PATCH] Refactoring in progress... --- picowatch_d/picowatch.py | 1267 +++++++++++--------------------------- 1 file changed, 353 insertions(+), 914 deletions(-) diff --git a/picowatch_d/picowatch.py b/picowatch_d/picowatch.py index 148d275..9303c10 100644 --- a/picowatch_d/picowatch.py +++ b/picowatch_d/picowatch.py @@ -1,976 +1,415 @@ -#!/usr/bin/env python - -""" -pyboard interface - -This module provides the Pyboard class, used to communicate with and -control the pyboard over a serial USB connection. - -Example usage: - - import pyboard - pyb = pyboard.Pyboard('/dev/ttyACM0') - -Or: - - pyb = pyboard.Pyboard('192.168.1.1') - -Then: - - pyb.enter_raw_repl() - pyb.exec('pyb.LED(1).on()') - pyb.exit_raw_repl() - -Note: if using Python2 then pyb.exec must be written as pyb.exec_. -To run a script from the local machine on the board and print out the results: - - import pyboard - pyboard.execfile('test.py', device='/dev/ttyACM0') - -This script can also be run directly. To execute a local script, use: - - ./pyboard.py test.py - -Or: - - python pyboard.py test.py - -""" - -# Adafruit MicroPython Tool - File Operations -# Author: Tony DiCola -# Copyright (c) 2016 Adafruit Industries -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -import sys -import ast -import time -import textwrap -import binascii - import os -import signal +import sys +import time +import json +import serial +import binascii +import textwrap -from watchdog.observers import Observer -from watchdog.events import PatternMatchingEventHandler +from typing import Dict, List, Optional, Tuple -_rawdelay = None - -try: - stdout = sys.stdout.buffer -except AttributeError: - # Python2 doesn't have buffer attr - stdout = sys.stdout - -def stdout_write_bytes(b): - b = b.replace(b"\x04", b"") - stdout.write(b) - stdout.flush() - -class PyboardError(Exception): - pass - -class TelnetToSerial: - def __init__(self, ip, user, password, read_timeout=None): - import telnetlib - self.tn = telnetlib.Telnet(ip, timeout=15) - self.read_timeout = read_timeout - if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout): - self.tn.write(bytes(user, 'ascii') + b"\r\n") - - if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout): - # needed because of internal implementation details of the telnet server - time.sleep(0.2) - self.tn.write(bytes(password, 'ascii') + b"\r\n") - - if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout): - # login succesful - from collections import deque - self.fifo = deque() - return - - raise PyboardError('Failed to establish a telnet connection with the board') - - def __del__(self): - self.close() - - def close(self): - try: - self.tn.close() - except: - # the telnet object might not exist yet, so ignore this one - pass - - def read(self, size=1): - while len(self.fifo) < size: - timeout_count = 0 - data = self.tn.read_eager() - if len(data): - self.fifo.extend(data) - timeout_count = 0 - else: - time.sleep(0.25) - if self.read_timeout is not None and timeout_count > 4 * self.read_timeout: - break - timeout_count += 1 - - data = b'' - while len(data) < size and len(self.fifo) > 0: - data += bytes([self.fifo.popleft()]) - return data - - def write(self, data): - self.tn.write(data) - return len(data) - - def inWaiting(self): - n_waiting = len(self.fifo) - if not n_waiting: - data = self.tn.read_eager() - self.fifo.extend(data) - return len(data) - else: - return n_waiting - class Pyboard: - def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rawdelay=0): - global _rawdelay - _rawdelay = rawdelay - if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3: - # device looks like an IP address - self.serial = TelnetToSerial(device, user, password, read_timeout=10) - else: - import serial - delayed = False - for attempt in range(wait + 1): - try: - self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1) - break - except (OSError, IOError): # Py2 and Py3 have different errors - if wait == 0: - continue - if attempt == 0: - sys.stdout.write(f'Waiting {wait} seconds for pyboard ') - delayed = True + + PYBOARD_BUFFER_SIZE: int = 256 + serial: serial.Serial + + def __init__(self, device: str, baudrate: int = 115200): + for _ in range(0, 3): + try: + self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1) + break + except: time.sleep(1) - sys.stdout.write('.') - sys.stdout.flush() - else: - if delayed: - print('') - raise PyboardError('failed to access ' + device) - if delayed: - print('') + else: + raise Exception(f'Failed to access {device}') def close(self): self.serial.close() - def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): - data = self.serial.read(min_num_bytes) - if data_consumer: - data_consumer(data) - timeout_count = 0 - while True: - if data.endswith(ending): - break - elif self.serial.inWaiting() > 0: - new_data = self.serial.read(1) - data = data + new_data - if data_consumer: - data_consumer(new_data) - timeout_count = 0 - else: - timeout_count += 1 - if timeout is not None and timeout_count >= 100 * timeout: - break - time.sleep(0.01) - return data + def stdout_write_bytes(self, data: str): + sys.stdout.buffer.write(data.replace(b'\x04', b'')) + sys.stdout.buffer.flush() - def send_ctrl_a(self): - self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL + def send_ok(self) -> bytes: + self.serial.write(b'\x04') + return b'OK' + + def send_ctrl_a(self) -> bytes: + # ctrl-A: enter raw REPL + self.serial.write(b'\x01') + return b'raw REPL; CTRL-B to exit\r\n>' def send_ctrl_b(self): - self.serial.write(b'\r\x02') # ctrl-B: exit raw REPL - - def send_ctrl_c(self): - self.serial.write(b'\r\x03') # ctrl-C twice: interrupt any running program - time.sleep(0.1) - self.serial.write(b'\x03') - time.sleep(0.1) - - def send_ctrl_d(self): - self.serial.write(b'\r\x04') # ctrl-D: soft reset - - def enter_raw_repl(self): - # Brief delay before sending RAW MODE char if requests - if _rawdelay > 0: - time.sleep(_rawdelay) + # ctrl-B: exit raw REPL + self.serial.write(b'\x02') + def send_ctrl_c(self, max_times: int = 2) -> bytes: # ctrl-C twice: interrupt any running program - self.serial.write(b'\r\x03') - time.sleep(0.1) - self.serial.write(b'\x03') - time.sleep(0.1) + for _ in range(0, max_times): + self.serial.write(b'\x03') + time.sleep(0.1) - # flush input (without relying on serial.flushInput()) + return b'raw REPL; CTRL-B to exit\r\n' + + def send_ctrl_d(self) -> bytes: + # ctrl-D: soft reset + self.serial.write(b'\x04') + return b'soft reboot\r\n' + + def read_until(self, delimiter: bytes, stream_output: bool = False) -> Optional[bytes]: + data = self.serial.read(1) + + if stream_output: + self.stdout_write_bytes(data) + + timeout = 0 + max_len = len(delimiter) + + while not timeout >= 1000: + if data.endswith(delimiter): + return data + elif self.serial.inWaiting() > 0: + timeout = 0 + stream_data = self.serial.read(1) + data += stream_data + + if stream_output: + self.stdout_write_bytes(stream_data) + data = data[-max_len:] + else: + timeout += 1 + time.sleep(0.001) + + def __enter__(self): + self.send_ctrl_c() n = self.serial.inWaiting() + while n > 0: self.serial.read(n) n = self.serial.inWaiting() - for retry in range(0, 5): - self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL - data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>') - if data.endswith(b'raw REPL; CTRL-B to exit\r\n>'): + for _ in range(0, 5): + if self.read_until(self.send_ctrl_a()): break - else: - if retry >= 4: - print(data) - raise PyboardError('could not enter raw repl') - time.sleep(0.2) - self.serial.write(b'\x04') # ctrl-D: soft reset - data = self.read_until(1, b'soft reboot\r\n') - if not data.endswith(b'soft reboot\r\n'): - print(data) - raise PyboardError('could not enter raw repl') - # By splitting this into 2 reads, it allows boot.py to print stuff, - # which will show up after the soft reboot and before the raw REPL. - # Modification from original pyboard.py below: - # Add a small delay and send Ctrl-C twice after soft reboot to ensure - # any main program loop in main.py is interrupted. - time.sleep(0.5) - self.serial.write(b'\x03') - time.sleep(0.1) # (slight delay before second interrupt - self.serial.write(b'\x03') - # End modification above. - data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n') - if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'): - print(data) - raise PyboardError('could not enter raw repl') - - def exit_raw_repl(self): - self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL - - def follow(self, timeout, data_consumer=None): - # wait for normal output - data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer) - if not data.endswith(b'\x04'): - raise PyboardError('timeout waiting for first EOF reception') - data = data[:-1] - - # wait for error output - data_err = self.read_until(1, b'\x04', timeout=timeout) - if not data_err.endswith(b'\x04'): - raise PyboardError('timeout waiting for second EOF reception') - data_err = data_err[:-1] - - # return normal and error output - return data, data_err - - def exec_raw_no_follow(self, command): - if isinstance(command, bytes): - command_bytes = command - else: - command_bytes = bytes(command, encoding='utf8') - - # check we have a prompt - data = self.read_until(1, b'>') - if not data.endswith(b'>'): - raise PyboardError('could not enter raw repl') - - # write command - for i in range(0, len(command_bytes), 256): - self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))]) time.sleep(0.01) - self.serial.write(b'\x04') - - # check if we could exec command - data = self.serial.read(2) - if data != b'OK': - raise PyboardError('could not exec command') - - def exec_raw(self, command, timeout=10, data_consumer=None): - self.exec_raw_no_follow(command) - return self.follow(timeout, data_consumer) - - def eval(self, expression): - ret = self.exec_(f'print({expression})') - ret = ret.strip() - return ret - - def exec_(self, command, stream_output=False): - data_consumer = None - if stream_output: - data_consumer = stdout_write_bytes - ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) - if ret_err: - raise PyboardError('exception', ret, ret_err) - return ret - - def execfile(self, filename, stream_output=False): - with open(filename, 'rb') as f: - pyfile = f.read() - return self.exec_(pyfile, stream_output=stream_output) - - def get_time(self): - t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ') - return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6]) - -# in Python2 exec is a keyword so one must use "exec_" -# but for Python3 we want to provide the nicer version "exec" -setattr(Pyboard, "exec", Pyboard.exec_) - -def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'): - pyb = Pyboard(device, baudrate, user, password) - pyb.enter_raw_repl() - output = pyb.execfile(filename) - stdout_write_bytes(output) - pyb.exit_raw_repl() - pyb.close() - -# def main(): -# import argparse -# cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.') -# cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard') -# cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device') -# cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username') -# cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password') -# cmd_parser.add_argument('-c', '--command', help='program passed in as string') -# cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available') -# cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]') -# cmd_parser.add_argument('files', nargs='*', help='input files') -# args = cmd_parser.parse_args() - -# def execbuffer(buf): -# try: -# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) -# pyb.enter_raw_repl() -# ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes) -# pyb.exit_raw_repl() -# pyb.close() -# except PyboardError as er: -# print(er) -# sys.exit(1) -# except KeyboardInterrupt: -# sys.exit(1) -# if ret_err: -# stdout_write_bytes(ret_err) -# sys.exit(1) - -# if args.command is not None: -# execbuffer(args.command.encode('utf-8')) - -# for filename in args.files: -# with open(filename, 'rb') as f: -# pyfile = f.read() -# execbuffer(pyfile) - -# if args.follow or (args.command is None and len(args.files) == 0): -# try: -# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) -# ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes) -# pyb.close() -# except PyboardError as er: -# print(er) -# sys.exit(1) -# except KeyboardInterrupt: -# sys.exit(1) -# if ret_err: -# stdout_write_bytes(ret_err) -# sys.exit(1) - - - -BUFFER_SIZE = 512 # Amount of data to read or write to the serial port at a time. -# This is kept small because small chips and USB to serial -# bridges usually have very small buffers. - - -class DirectoryExistsError(Exception): - pass - - -class Files(object): - """Class to interact with a MicroPython board files over a serial connection. - Provides functions for listing, uploading, and downloading files from the - board's filesystem. - """ - __raw_repl_on: bool = False - - def __init__(self, pyboard: Pyboard): - """Initialize the MicroPython board files class using the provided pyboard - instance. In most cases you should create a Pyboard instance (from - pyboard.py) which connects to a board over a serial connection and pass - it in, but you can pass in other objects for testing, etc. - """ - self._pyboard = pyboard - - def is_raw_repl_on(self): - return self.__raw_repl_on - - def send_ctrl_a(self): - self._pyboard.send_ctrl_a() # ctrl-A: enter raw REPL - - def send_ctrl_b(self): - self._pyboard.send_ctrl_b() # ctrl-B: exit raw REPL - - def send_ctrl_c(self): - self._pyboard.send_ctrl_c() # ctrl-C twice: interrupt any running program - - def send_ctrl_d(self): - self._pyboard.send_ctrl_d() # ctrl-D: soft reset - - def handle_traceback(self, e: Exception): - message = e.args[2].decode('utf-8') - oserror = message.split('OSError:') - - if len(oserror) == 2: - reason = oserror[1].strip() - - if reason == '-2': - reason = '[Errno -2] EUNKNOWN' - if reason == '39': - reason = '[Errno 39] ENEPTY' else: + raise Exception('REPL: could not enter') + + if not self.read_until(self.send_ctrl_d()): + raise Exception('REPL: could not soft reboot') + + if not self.read_until(self.send_ctrl_c()): + raise Exception('REPL: could not interrupt after soft reboot') + + return self.__terminal + + def __exit__(self, a, b, c): + self.send_ctrl_b() + + def __terminal(self, command: str, stream_output: bool = False, catch_output: bool = True) -> Optional[str]: + command = textwrap.dedent(command) + # send input + if not isinstance(command, bytes): + command = bytes(command, encoding='utf8') + + if not self.read_until(b'>'): + raise Exception('Terminal: prompt has been lost') + + for i in range(0, len(command), Pyboard.PYBOARD_BUFFER_SIZE): + self.serial.write(command[i: min(i + Pyboard.PYBOARD_BUFFER_SIZE, len(command))]) + time.sleep(0.001) + + if not self.read_until(self.send_ok()): + raise Exception('Terminal: could not execute command') + + if not catch_output: + return + + # catch output + data = self.read_until(b'\x04', stream_output=stream_output) + + if not data: + raise Exception('Terminal: timeout waiting for first EOF reception') + + exception = self.read_until(b'\x04') + + if not exception: + raise Exception('Terminal: timeout waiting for second EOF reception') + + data, exception = (data[:-1].decode('utf-8'), exception[:-1].decode('utf-8')) + + if exception: reason = 'Traceback (most recent call last):' - traceback = message.split(reason) + traceback = exception.split(reason) if len(traceback) == 2: for call in traceback[1][:-2].split('\\r\\n'): reason += f'{call}\n' - raise Exception(reason.strip()) + raise Exception(reason.strip()) + else: + raise Exception(exception) - def ls(self, directory="/", long_format=False, recursive=True): - """List the contents of the specified directory (or root if none is - specified). Returns a list of strings with the names of files in the - specified directory. If long_format is True then a list of 2-tuples - with the name and size (in bytes) of the item is returned. Note that - it appears the size of directories is not supported by MicroPython and - will always return 0 (i.e. no recursive size computation). - """ + return data.strip() - # Disabling for now, see https://github.com/adafruit/ampy/issues/55. - # # Make sure directory ends in a slash. - # if not directory.endswith("/"): - # directory += "/" - # Make sure directory starts with slash, for consistency. - # if not directory.startswith("/"): - # directory = "/" + directory +class Picowatch(object): + _pyboard: Pyboard - command = """\ - try: - import os - except ImportError: - import uos as os\n""" + @property + def pyboard(self) -> Optional[Pyboard]: + return self._pyboard - if recursive: - command += """\ - def listdir(directory): - result = set() + def __init__(self, pyboard: Pyboard): + self._pyboard = pyboard - def _listdir(dir_or_file): - try: - # if its a directory, then it should provide some children. - children = os.listdir(dir_or_file) - except OSError: - # probably a file. run stat() to confirm. - os.stat(dir_or_file) - result.add(dir_or_file) - else: - # probably a directory, add to result if empty. - if children: - # queue the children to be dealt with in next iteration. - for child in children: - # create the full path. - if dir_or_file == '/': - next = dir_or_file + child - else: - next = dir_or_file + '/' + child - - _listdir(next) - else: - result.add(dir_or_file) + def checksum(self, source: str, data: str = '') -> bool: + output = self.terminal(f""" + def checksum(data): + v = 21 + for c in data.decode("utf-8"): + v ^= ord(c) + return v + + with open("{source}", "rb") as fh: + print(checksum(fh.read())) + """) - _listdir(directory) - return sorted(result)\n""" - else: - command += """\ - def listdir(directory): - if directory == '/': - return sorted([directory + f for f in os.listdir(directory)]) - else: - return sorted([directory + '/' + f for f in os.listdir(directory)])\n""" + if isinstance(data, bytes): + data = data.decode('utf-8') - # Execute os.listdir() command on the board. - if long_format: - command += f""" - r = [] - for f in listdir('{directory}'): - size = os.stat(f)[6] - r.append(f'{{f}} - {{size}}b') - print(r) - """ - else: - command += f""" - print(listdir('{directory}')) - """ - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - output = self._pyboard.exec_(textwrap.dedent(command)) - except Exception as e: - self.handle_traceback(e) - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False + v = 21 + for c in data: + v ^= ord(c) - # Parse the result list and return it. - return ast.literal_eval(output.decode("utf-8")) + return str(v) == output - def mkdir(self, directory, exists_okay=True): - """Create the specified directory. Note this cannot create a recursive - hierarchy of directories, instead each one should be created separately. - """ - # Execute os.mkdir command on the board. - command = f""" - try: - import os - except ImportError: - import uos as os - os.mkdir('{directory}') - """ - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - self._pyboard.exec_(textwrap.dedent(command)) - except Exception as e: - if exists_okay == False: - self.handle_traceback(e) - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False + def get(self, filename: str) -> Tuple[bytes, bool]: + if not filename.startswith('/'): + filename = '/' + filename - def put(self, filename, data): - """Create or update the specified file with the provided data. - """ - # Open the file for writing on the board and write chunks of data. - - if filename.startswith('/'): - filename = filename[1:] - - print(f'+ {filename}', end='', flush=True) - size = len(data) - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - self._pyboard.exec_(f"f = open('{filename}', 'wb')") - # Loop through and write a buffer size chunk of data at a time. - for i in range(0, size, BUFFER_SIZE): - chunk_size = min(BUFFER_SIZE, size - i) - chunk = repr(data[i : i + chunk_size]) - # Make sure to send explicit byte strings (handles python 2 compatibility). - if not chunk.startswith("b"): - chunk = "b" + chunk - self._pyboard.exec_(f"f.write({chunk})") - self._pyboard.exec_("f.close()") - except Exception as e: - print(' [x] > ', end='') - self.handle_traceback(e) - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - print(f' - {size}b [✓]') - return size - - def get(self, filename): - """Retrieve the contents of the specified file and return its contents - as a byte string. - """ - # Open the file and read it a few bytes at a time and print out the - # raw bytes. Be careful not to overload the UART buffer so only write - # a few bytes at a time, and don't use print since it adds newlines and - # expects string data. - command = f""" + output = self.terminal(f""" import sys import ubinascii with open('{filename}', 'rb') as infile: while True: - result = infile.read({BUFFER_SIZE}) + result = infile.read({Pyboard.PYBOARD_BUFFER_SIZE}) if result == b'': break - len = sys.stdout.write(ubinascii.hexlify(result)) - """ - print(f'↓ {filename}', end='', flush=True) + sys.stdout.write(ubinascii.hexlify(result)) + """) + output = binascii.unhexlify(output) - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - output = self._pyboard.exec_(textwrap.dedent(command)) - except PyboardError as e: - print(' [x] > ', end='') - self.handle_traceback(e) - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False + return (output, self.checksum(filename, output)) - print(f' {int(len(output) / 2)}b [✓]') - return binascii.unhexlify(output) + def save(self, source: str, destination: str) -> Tuple[str, bool]: + output, checksum = self.get(source) - def rm(self, filename): - """Remove the specified file or directory.""" - command = f""" - try: - import os - except ImportError: - import uos as os - os.remove('{filename}') - """ - print(f'- {filename}', end='') - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - self._pyboard.exec_(textwrap.dedent(command)) - except Exception as e: - print(' [x] > ', end='') - self.handle_traceback(e) - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - print(' [✓]') - - def rmdir(self, directory, missing_okay=False): - """Forcefully remove the specified directory and all its children.""" - # Build a script to walk an entire directory structure and delete every - # file and subfolder. This is tricky because MicroPython has no os.walk - # or similar function to walk folders, so this code does it manually - # with recursion and changing directories. For each directory it lists - # the files and deletes everything it can, i.e. all the files. Then - # it lists the files again and assumes they are directories (since they - # couldn't be deleted in the first pass) and recursively clears those - # subdirectories. Finally when finished clearing all the children the - # parent directory is deleted. - command = f""" - try: - import os - except ImportError: - import uos as os - def rmdir(directory): - os.chdir(directory) - for f in os.listdir(): - try: - os.remove(f) - except OSError: - pass - for f in os.listdir(): - rmdir(f) - os.chdir('..') - os.rmdir(directory) - rmdir('{directory}') - """ - print(f'- {directory}', end='') - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - self._pyboard.exec_(textwrap.dedent(command)) - except Exception as e: - if not missing_okay: - print(' [x] > ', end='') - self.handle_traceback(e) - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - print(' [✓]') - - def run_on_board(self, filename, wait_output=True, stream_output=True): - """Run the provided script and return its output. If wait_output is True - (default) then wait for the script to finish and then return its output, - otherwise just run the script and don't wait for any output. - If stream_output is True(default) then return None and print outputs to - stdout without buffering. - """ - command = textwrap.dedent(f'exec(open("{filename}").read())') - output = None - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - - if stream_output: - self._pyboard.exec_(command, stream_output=True) - elif wait_output: - # Run the file and wait for output to return. - output = self._pyboard.exec_(command) - else: - # Read the file and run it using lower level pyboard functions that - # won't wait for it to finish or return output. - self._pyboard.exec_raw_no_follow(command) - except Exception as e: - self.handle_traceback(e) - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - return output - - def exec(self, command): - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - output = self._pyboard.exec_(textwrap.dedent(command), stream_output=True) - except Exception as e: - self.handle_traceback(e) - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - return output - - -print('Welcome to picowatch lib.') -pico = False - -while not pico: - print('-' * 30) - device = input('Port: ').strip() - baudrate = input('Baudrate (115200): ').strip() or 115200 - - try: - pico = Files(Pyboard(device=device, baudrate=baudrate)) - print(f'Connected to device: {device} at a baudrate of: {baudrate}') - print('-' * 30) - except Exception as e: - print(str(e)) - -pico.send_ctrl_c() -WATCHING_DIRECTORY = './' - -def upload(source: str = '', destination: str = ''): - real_source = os.path.join(WATCHING_DIRECTORY, *source.split('/')) - - if not destination: - destination = source - - destination = '/'.join(destination.split('\\')) - - try: - if os.path.isdir(real_source): - for root, dirs, files in os.walk(real_source, followlinks=True): - droot = '/'.join(root.replace(WATCHING_DIRECTORY, '').split(os.sep)) - - for dir in dirs: - pico.mkdir('/'.join([droot, dir]), exists_okay=True) - - for filename in files: - with open(os.path.join(root, filename), 'rb') as fh: - pico.put('/'.join([droot, filename]), fh.read()) - - time.sleep(.5) - elif os.path.exists(real_source): - dirpath = [] - - for d in os.path.dirname(destination).split('/'): - dirpath.append(d) - pico.mkdir('/'.join(dirpath), exists_okay=True) - - with open(real_source, 'rb') as fh: - pico.put(destination, fh.read()) - except Exception as e: - print(str(e)) - - -def download(source: str = '/'): - if not source.startswith('/'): - source = f'/{source}' - - if len(source) > 1 and source.endswith('/'): - source = source[:-1] - - try: - for filename in pico.ls(directory=source, long_format=False, recursive=True): - filename = filename[1:] + with open(destination, 'wb') as fh: + fh.write(output) - if filename.startswith('.'): - continue + return (destination, checksum) - if os.path.dirname(filename): - os.makedirs(os.path.dirname(filename), mode=777, exist_ok=True) + def put(self, filename: str, data: bytes) -> Tuple[str, bool]: + if not filename.startswith('/'): + filename = '/' + filename - with open(os.path.join(WATCHING_DIRECTORY, *filename.split('/')), 'wb') as fh: - fh.write(pico.get(filename)) - except Exception as e: - print(str(e)) + if not isinstance(data, bytes): + data = bytes(data, encoding='utf8') - -def contents(filename: str): - try: - for ln in pico.get(filename).decode('utf-8').split('\n'): - print(ln) - except Exception as e: - print(str(e)) - - -def delete(source: str, is_directory: bool = False): - try: - if is_directory: - if not source.endswith('/'): - source += '/' - - pico.rmdir(source, missing_okay=True) - else: - pico.rm(source) - except Exception as e: - print(str(e)) - - -def ls(source: str = '/'): - try: - if len(source) > 1 and source.endswith('/'): - source = source[:-1] - - for filename in pico.ls(directory=source, long_format=True, recursive=True): - if filename.startswith('/'): - filename = filename[1:] - - if len(filename) > 6: - print('·', filename) - except Exception as e: - print(str(e)) - - -def launch(filename: str = 'main.py'): - try: - pico.run_on_board(filename) - except Exception as e: - print(str(e)) - - -sessions = {'deleted': set(), 'modified': set()} - -def on_modified_callback(event): - if event.is_directory == True: - return - - source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/') - sessions['modified'].add(source) - - -def on_deleted_callback(event): - source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/') - - if event.is_directory == True and not source.endswith('/'): - source += '/' - elif len(source.split('.')) == 1: - source += '/' - - sessions['deleted'].add(source) - - -def watchdog_callback(): - for source in sessions['deleted']: - delete(source, is_directory=source.endswith('/')) - - for source in sessions['modified']: - upload(source) - - sessions['deleted'] = set() - sessions['modified'] = set() - - -signal.signal(signal.SIGINT, lambda a, b: pico.send_ctrl_c()) -watchdog_event = PatternMatchingEventHandler( - patterns = ['*'], - ignore_patterns = None, - ignore_directories = False, - case_sensitive = True -) -watchdog_event.on_modified = on_modified_callback -watchdog_event.on_deleted = on_deleted_callback - -watchdog = Observer() -watchdog.schedule(watchdog_event, path = WATCHING_DIRECTORY, recursive = True) -watchdog.start() - - -try: - while True: try: - print('>>> ', end='') - message = input().strip() + if os.path.dirname(filename): + self.mkdir(os.path.dirname(filename)) - while pico.is_raw_repl_on(): - time.sleep(.1) + with self._pyboard as terminal: + size = len(data) + terminal(f"""fh = open("{filename}", "wb")""") - match message.split(' '): - case ['0' | 'exit']: - sys.exit() - case ['reboot' | 'reset']: - pico.send_ctrl_d() - case ['ls' | 'stat', *source]: - ls(source[0] if source else '/') - case ['cat' | 'open' | 'contents', source]: - contents(source) - case ['del' | 'rm' | 'delete', source]: - delete(source) - case ['del*' | 'rm*' | 'rmdir' | 'delete*', source]: - delete(source, is_directory=True) - case ['format']: - delete('/', is_directory=True) - case ['upl' | 'upload' | 'update', source]: - upload(source) - case ['restore']: - upload('/') - case ['download' | 'transfer', source]: - download(source) - case ['backup']: - download('/') - case ['' | 'save' | 'commit']: - watchdog_callback() - case ['status' | 'staged']: - for filename in sessions['deleted']: - print('-', filename) + for i in range(0, size, Pyboard.PYBOARD_BUFFER_SIZE): + chunk_size = min(Pyboard.PYBOARD_BUFFER_SIZE, size - i) + chunk = repr(data[i : i + chunk_size]) + terminal(f"""fh.write({chunk})""") - for filename in sessions['modified']: - print('+', filename) - case ['cancel' | 'unstaged']: - sessions['deleted'] = set() - sessions['modified'] = set() - case _: - if message.startswith('./'): - launch(message[2:]) - elif message: - print(f'"{message}" is not recognized.') + terminal("""fh.close()""") + except Exception as e: + raise e + + return (filename, self.checksum(filename, data)) + + def copy(self, source: str, destination: str) -> Tuple[str, int, bool]: + with open(source, 'rb') as fh: + return self.put(destination, fh.read()) + + def ls(self, dirname: str = '/') -> List[Dict[str, int]]: + if not dirname.startswith('/'): + dirname = '/' + dirname + + if not dirname.endswith('/'): + dirname += '/' + + output = self.terminal(f""" + try: + import os + import json + except ImportError: + import uos as os + import ujson as json + + def ls(dirname): + d = [] + f = [] + if not dirname.endswith("/"): + dirname += "/" + for t in os.ilistdir(dirname): + if t[1] == 0x4000: + d.append(dirname + t[0] + '/') + zd, zf = ls(dirname + t[0] + '/') + d.extend(zd) + f.extend(zf) + else: + f.append((dirname + t[0], os.stat(dirname + t[0])[6])) + return d, f + + print(json.dumps(ls("{dirname}"))) + """) + + return json.loads(output) + + def rm(self, filename: str) -> List: + if not filename.startswith('/'): + filename = '/' + filename + + output = self.terminal(f""" + try: + import os + import json + except ImportError: + import uos as os + import ujson as json + + def ls(dirname): + d = [] + f = [] + if not dirname.endswith("/"): + dirname += "/" + for t in os.ilistdir(dirname): + if t[1] == 0x4000: + d.append(dirname + t[0] + '/') + zd, zf = ls(dirname + t[0] + '/') + d.extend(zd) + f.extend(zf) + else: + f.append(dirname + t[0]) + return d, f + + def rm(filename): + r = [] + if os.stat(filename)[0] == 0x4000: + d, f = ls(filename) + for zf in f: + try: + os.remove(zf) + r.append((zf, 1)) + except Exception as e: + r.append((zf, 0, str(e))) + for zd in d: + try: + os.rmdir(zd) + r.append((zd, 1)) + except Exception as e: + r.append((zd, 0, str(e))) + try: + os.rmdir(filename) + r.append((filename, 1)) + except Exception as e: + r.append((filename, 0, str(e))) + else: + try: + os.remove(filename) + r.append((filename, 1)) + except Exception as e: + r.append((filename, 0, str(e))) + return r + + try: + print(json.dumps(rm("{filename}"))) + except Exception as e: + print(json.dumps([("{filename}", 0, str(e))])) + """) + + return json.loads(output) + + def mkdir(self, dirname: str) -> List: + if not dirname.startswith('/'): + dirname = '/' + dirname + + if dirname.endswith('/'): + dirname = dirname[:-1] + + output = self.terminal(f""" + try: + import os + import json + except ImportError: + import uos as os + import ujson as json + + r = [] + d = [] + for zd in str("{dirname}").split("/"): + if not zd: + continue + d.append(zd) + zd = "/".join(d) + + try: + os.mkdir(zd) + r.append(("/" + zd, 1)) + except Exception as e: + if str(e).find('EEXIST'): + r.append(("/" + zd, 1)) + else: + r.append(("/" + zd, 0, str(e))) + + print(json.dumps(r)) + """) + + return json.loads(output) + + def launch(self, filename: str): + try: + self.terminal(f""" + with open("{filename}", "r") as fh: + exec(fh.read()) + """, stream_output=True) except Exception as e: print(str(e)) -except KeyboardInterrupt: - watchdog.stop() -watchdog.join() + def devmode(self, filename: str): + with self._pyboard as terminal: + try: + with open(filename, 'r') as fh: + terminal(fh.read(), stream_output=True) + except Exception as e: + print(str(e)) + + def terminal(self, command: str, stream_output: bool = False) -> str: + with self._pyboard as terminal: + try: + return terminal(command, stream_output=stream_output) + except Exception as e: + raise e + +# picowatch = Picowatch(Pyboard('COM5')) + +# with open('./setenv.py', 'rb') as fh: +# print(picowatch.put('setenv.py', fh.read())) + +# print(picowatch.get('setenv.py')) + +# print(picowatch.save('main.py', 'cpmain.py')) +# print(picowatch.copy('cpmain.py', 'main.py')) + +# picowatch.devmode('main.py') \ No newline at end of file