#!/usr/bin/env python """ pyboard interface This module provides the Pyboard class, used to communicate with and control the pyboard over a serial USB connection. Example usage: import pyboard pyb = pyboard.Pyboard('/dev/ttyACM0') Or: pyb = pyboard.Pyboard('192.168.1.1') Then: pyb.enter_raw_repl() pyb.exec('pyb.LED(1).on()') pyb.exit_raw_repl() Note: if using Python2 then pyb.exec must be written as pyb.exec_. To run a script from the local machine on the board and print out the results: import pyboard pyboard.execfile('test.py', device='/dev/ttyACM0') This script can also be run directly. To execute a local script, use: ./pyboard.py test.py Or: python pyboard.py test.py """ # Adafruit MicroPython Tool - File Operations # Author: Tony DiCola # Copyright (c) 2016 Adafruit Industries # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import sys import ast import time import textwrap import binascii import os import signal from watchdog.observers import Observer from watchdog.events import PatternMatchingEventHandler _rawdelay = None try: stdout = sys.stdout.buffer except AttributeError: # Python2 doesn't have buffer attr stdout = sys.stdout def stdout_write_bytes(b): b = b.replace(b"\x04", b"") stdout.write(b) stdout.flush() class PyboardError(Exception): pass class TelnetToSerial: def __init__(self, ip, user, password, read_timeout=None): import telnetlib self.tn = telnetlib.Telnet(ip, timeout=15) self.read_timeout = read_timeout if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout): self.tn.write(bytes(user, 'ascii') + b"\r\n") if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout): # needed because of internal implementation details of the telnet server time.sleep(0.2) self.tn.write(bytes(password, 'ascii') + b"\r\n") if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout): # login succesful from collections import deque self.fifo = deque() return raise PyboardError('Failed to establish a telnet connection with the board') def __del__(self): self.close() def close(self): try: self.tn.close() except: # the telnet object might not exist yet, so ignore this one pass def read(self, size=1): while len(self.fifo) < size: timeout_count = 0 data = self.tn.read_eager() if len(data): self.fifo.extend(data) timeout_count = 0 else: time.sleep(0.25) if self.read_timeout is not None and timeout_count > 4 * self.read_timeout: break timeout_count += 1 data = b'' while len(data) < size and len(self.fifo) > 0: data += bytes([self.fifo.popleft()]) return data def write(self, data): self.tn.write(data) return len(data) def inWaiting(self): n_waiting = len(self.fifo) if not n_waiting: data = self.tn.read_eager() self.fifo.extend(data) return len(data) else: return n_waiting class Pyboard: def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rawdelay=0): global _rawdelay _rawdelay = rawdelay if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3: # device looks like an IP address self.serial = TelnetToSerial(device, user, password, read_timeout=10) else: import serial delayed = False for attempt in range(wait + 1): try: self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1) break except (OSError, IOError): # Py2 and Py3 have different errors if wait == 0: continue if attempt == 0: sys.stdout.write(f'Waiting {wait} seconds for pyboard ') delayed = True time.sleep(1) sys.stdout.write('.') sys.stdout.flush() else: if delayed: print('') raise PyboardError('failed to access ' + device) if delayed: print('') def close(self): self.serial.close() def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): data = self.serial.read(min_num_bytes) if data_consumer: data_consumer(data) timeout_count = 0 while True: if data.endswith(ending): break elif self.serial.inWaiting() > 0: new_data = self.serial.read(1) data = data + new_data if data_consumer: data_consumer(new_data) timeout_count = 0 else: timeout_count += 1 if timeout is not None and timeout_count >= 100 * timeout: break time.sleep(0.01) return data def send_ctrl_a(self): self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL def send_ctrl_b(self): self.serial.write(b'\r\x02') # ctrl-B: exit raw REPL def send_ctrl_c(self): self.serial.write(b'\r\x03') # ctrl-C twice: interrupt any running program time.sleep(0.1) self.serial.write(b'\x03') time.sleep(0.1) def send_ctrl_d(self): self.serial.write(b'\r\x04') # ctrl-D: soft reset def enter_raw_repl(self): # Brief delay before sending RAW MODE char if requests if _rawdelay > 0: time.sleep(_rawdelay) # ctrl-C twice: interrupt any running program self.serial.write(b'\r\x03') time.sleep(0.1) self.serial.write(b'\x03') time.sleep(0.1) # flush input (without relying on serial.flushInput()) n = self.serial.inWaiting() while n > 0: self.serial.read(n) n = self.serial.inWaiting() for retry in range(0, 5): self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>') if data.endswith(b'raw REPL; CTRL-B to exit\r\n>'): break else: if retry >= 4: print(data) raise PyboardError('could not enter raw repl') time.sleep(0.2) self.serial.write(b'\x04') # ctrl-D: soft reset data = self.read_until(1, b'soft reboot\r\n') if not data.endswith(b'soft reboot\r\n'): print(data) raise PyboardError('could not enter raw repl') # By splitting this into 2 reads, it allows boot.py to print stuff, # which will show up after the soft reboot and before the raw REPL. # Modification from original pyboard.py below: # Add a small delay and send Ctrl-C twice after soft reboot to ensure # any main program loop in main.py is interrupted. time.sleep(0.5) self.serial.write(b'\x03') time.sleep(0.1) # (slight delay before second interrupt self.serial.write(b'\x03') # End modification above. data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n') if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'): print(data) raise PyboardError('could not enter raw repl') def exit_raw_repl(self): self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL def follow(self, timeout, data_consumer=None): # wait for normal output data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer) if not data.endswith(b'\x04'): raise PyboardError('timeout waiting for first EOF reception') data = data[:-1] # wait for error output data_err = self.read_until(1, b'\x04', timeout=timeout) if not data_err.endswith(b'\x04'): raise PyboardError('timeout waiting for second EOF reception') data_err = data_err[:-1] # return normal and error output return data, data_err def exec_raw_no_follow(self, command): if isinstance(command, bytes): command_bytes = command else: command_bytes = bytes(command, encoding='utf8') # check we have a prompt data = self.read_until(1, b'>') if not data.endswith(b'>'): raise PyboardError('could not enter raw repl') # write command for i in range(0, len(command_bytes), 256): self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))]) time.sleep(0.01) self.serial.write(b'\x04') # check if we could exec command data = self.serial.read(2) if data != b'OK': raise PyboardError('could not exec command') def exec_raw(self, command, timeout=10, data_consumer=None): self.exec_raw_no_follow(command) return self.follow(timeout, data_consumer) def eval(self, expression): ret = self.exec_(f'print({expression})') ret = ret.strip() return ret def exec_(self, command, stream_output=False): data_consumer = None if stream_output: data_consumer = stdout_write_bytes ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) if ret_err: raise PyboardError('exception', ret, ret_err) return ret def execfile(self, filename, stream_output=False): with open(filename, 'rb') as f: pyfile = f.read() return self.exec_(pyfile, stream_output=stream_output) def get_time(self): t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ') return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6]) # in Python2 exec is a keyword so one must use "exec_" # but for Python3 we want to provide the nicer version "exec" setattr(Pyboard, "exec", Pyboard.exec_) def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'): pyb = Pyboard(device, baudrate, user, password) pyb.enter_raw_repl() output = pyb.execfile(filename) stdout_write_bytes(output) pyb.exit_raw_repl() pyb.close() # def main(): # import argparse # cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.') # cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard') # cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device') # cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username') # cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password') # cmd_parser.add_argument('-c', '--command', help='program passed in as string') # cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available') # cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]') # cmd_parser.add_argument('files', nargs='*', help='input files') # args = cmd_parser.parse_args() # def execbuffer(buf): # try: # pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) # pyb.enter_raw_repl() # ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes) # pyb.exit_raw_repl() # pyb.close() # except PyboardError as er: # print(er) # sys.exit(1) # except KeyboardInterrupt: # sys.exit(1) # if ret_err: # stdout_write_bytes(ret_err) # sys.exit(1) # if args.command is not None: # execbuffer(args.command.encode('utf-8')) # for filename in args.files: # with open(filename, 'rb') as f: # pyfile = f.read() # execbuffer(pyfile) # if args.follow or (args.command is None and len(args.files) == 0): # try: # pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) # ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes) # pyb.close() # except PyboardError as er: # print(er) # sys.exit(1) # except KeyboardInterrupt: # sys.exit(1) # if ret_err: # stdout_write_bytes(ret_err) # sys.exit(1) BUFFER_SIZE = 512 # Amount of data to read or write to the serial port at a time. # This is kept small because small chips and USB to serial # bridges usually have very small buffers. class DirectoryExistsError(Exception): pass class Files(object): """Class to interact with a MicroPython board files over a serial connection. Provides functions for listing, uploading, and downloading files from the board's filesystem. """ __raw_repl_on: bool = False def __init__(self, pyboard: Pyboard): """Initialize the MicroPython board files class using the provided pyboard instance. In most cases you should create a Pyboard instance (from pyboard.py) which connects to a board over a serial connection and pass it in, but you can pass in other objects for testing, etc. """ self._pyboard = pyboard def is_raw_repl_on(self): return self.__raw_repl_on def send_ctrl_a(self): self._pyboard.send_ctrl_a() # ctrl-A: enter raw REPL def send_ctrl_b(self): self._pyboard.send_ctrl_b() # ctrl-B: exit raw REPL def send_ctrl_c(self): self._pyboard.send_ctrl_c() # ctrl-C twice: interrupt any running program def send_ctrl_d(self): self._pyboard.send_ctrl_d() # ctrl-D: soft reset def ls(self, directory="/", long_format=False, recursive=True): """List the contents of the specified directory (or root if none is specified). Returns a list of strings with the names of files in the specified directory. If long_format is True then a list of 2-tuples with the name and size (in bytes) of the item is returned. Note that it appears the size of directories is not supported by MicroPython and will always return 0 (i.e. no recursive size computation). """ # Disabling for now, see https://github.com/adafruit/ampy/issues/55. # # Make sure directory ends in a slash. # if not directory.endswith("/"): # directory += "/" # Make sure directory starts with slash, for consistency. # if not directory.startswith("/"): # directory = "/" + directory command = """\ try: import os except ImportError: import uos as os\n""" if recursive: command += """\ def listdir(directory): result = set() def _listdir(dir_or_file): try: # if its a directory, then it should provide some children. children = os.listdir(dir_or_file) except OSError: # probably a file. run stat() to confirm. os.stat(dir_or_file) result.add(dir_or_file) else: # probably a directory, add to result if empty. if children: # queue the children to be dealt with in next iteration. for child in children: # create the full path. if dir_or_file == '/': next = dir_or_file + child else: next = dir_or_file + '/' + child _listdir(next) else: result.add(dir_or_file) _listdir(directory) return sorted(result)\n""" else: command += """\ def listdir(directory): if directory == '/': return sorted([directory + f for f in os.listdir(directory)]) else: return sorted([directory + '/' + f for f in os.listdir(directory)])\n""" # Execute os.listdir() command on the board. if long_format: command += f""" r = [] for f in listdir('{directory}'): size = os.stat(f)[6] r.append(f'{{f}} ({{size}}b)') print(r) """ else: command += f""" print(listdir('{directory}')) """ try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() output = self._pyboard.exec_(textwrap.dedent(command)) except PyboardError as ex: # Check if this is an OSError #2, i.e. directory doesn't exist and # rethrow it as something more descriptive. if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1: raise RuntimeError(f"No such directory: {directory}") else: raise ex finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False # Parse the result list and return it. return ast.literal_eval(output.decode("utf-8")) def mkdir(self, directory, exists_okay=False): """Create the specified directory. Note this cannot create a recursive hierarchy of directories, instead each one should be created separately. """ # Execute os.mkdir command on the board. command = f""" try: import os except ImportError: import uos as os os.mkdir('{directory}') """ try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() self._pyboard.exec_(textwrap.dedent(command)) except PyboardError as ex: # Check if this is an OSError #17, i.e. directory already exists. if ex.args[2].decode("utf-8").find("OSError: [Errno 17] EEXIST") != -1: if not exists_okay: raise DirectoryExistsError(f"Directory already exists: {directory}") finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False def put(self, filename, data): """Create or update the specified file with the provided data. """ # Open the file for writing on the board and write chunks of data. if filename.startswith('/'): filename = filename[1:] print(f'↑ {filename}', end='', flush=True) size = len(data) try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() self._pyboard.exec_(f"f = open('{filename}', 'wb')") # Loop through and write a buffer size chunk of data at a time. for i in range(0, size, BUFFER_SIZE): chunk_size = min(BUFFER_SIZE, size - i) chunk = repr(data[i : i + chunk_size]) # Make sure to send explicit byte strings (handles python 2 compatibility). if not chunk.startswith("b"): chunk = "b" + chunk self._pyboard.exec_(f"f.write({chunk})") self._pyboard.exec_("f.close()") except Exception as ex: print(' [x]') raise ex finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False print(f' - {size}b [✓]') return size def get(self, filename): """Retrieve the contents of the specified file and return its contents as a byte string. """ # Open the file and read it a few bytes at a time and print out the # raw bytes. Be careful not to overload the UART buffer so only write # a few bytes at a time, and don't use print since it adds newlines and # expects string data. command = f""" import sys import ubinascii with open('{filename}', 'rb') as infile: while True: result = infile.read({BUFFER_SIZE}) if result == b'': break len = sys.stdout.write(ubinascii.hexlify(result)) """ print(f'↓ {filename}', end='', flush=True) try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() output = self._pyboard.exec_(textwrap.dedent(command)) except PyboardError as ex: print(' [x]') # Check if this is an OSError #2, i.e. file doesn't exist and # rethrow it as something more descriptive. try: if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1: raise RuntimeError(f"No such file: {filename}") else: raise ex except UnicodeDecodeError: raise ex finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False print(f' - {int(len(output) / 2)}b [✓]') return binascii.unhexlify(output) def rm(self, filename): """Remove the specified file or directory.""" command = f""" try: import os except ImportError: import uos as os os.remove('{filename}') """ print(f'↶ {filename}', end='') try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() self._pyboard.exec_(textwrap.dedent(command)) except PyboardError as ex: print(' [x]') message = ex.args[2].decode("utf-8") # Check if this is an OSError #2, i.e. file/directory doesn't exist # and rethrow it as something more descriptive. if message.find("OSError: [Errno 2] ENOENT") != -1: raise RuntimeError(f"No such file/directory: {filename}") # Check for OSError #13, the directory isn't empty. if message.find("OSError: [Errno 13] EACCES") != -1: raise RuntimeError(f"Directory is not empty: {filename}") else: raise ex finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False print(' [✓]') def rmdir(self, directory, missing_okay=False): """Forcefully remove the specified directory and all its children.""" # Build a script to walk an entire directory structure and delete every # file and subfolder. This is tricky because MicroPython has no os.walk # or similar function to walk folders, so this code does it manually # with recursion and changing directories. For each directory it lists # the files and deletes everything it can, i.e. all the files. Then # it lists the files again and assumes they are directories (since they # couldn't be deleted in the first pass) and recursively clears those # subdirectories. Finally when finished clearing all the children the # parent directory is deleted. command = f""" try: import os except ImportError: import uos as os def rmdir(directory): os.chdir(directory) for f in os.listdir(): try: os.remove(f) except OSError: pass for f in os.listdir(): rmdir(f) os.chdir('..') os.rmdir(directory) rmdir('{directory}') """ print(f'↶ {directory}', end='') try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() self._pyboard.exec_(textwrap.dedent(command)) except PyboardError as ex: message = ex.args[2].decode("utf-8") # Check if this is an OSError #2, i.e. directory doesn't exist # and rethrow it as something more descriptive. if message.find("OSError: [Errno 2] ENOENT") != -1: if not missing_okay: print(' [x]') raise RuntimeError(f"No such directory: {directory}") finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False print(' [✓]') def run(self, filename, wait_output=True, stream_output=True): """Run the provided script and return its output. If wait_output is True (default) then wait for the script to finish and then return its output, otherwise just run the script and don't wait for any output. If stream_output is True(default) then return None and print outputs to stdout without buffering. """ output = None try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() if stream_output: self._pyboard.execfile(filename, stream_output=True) elif wait_output: # Run the file and wait for output to return. output = self._pyboard.execfile(filename) else: # Read the file and run it using lower level pyboard functions that # won't wait for it to finish or return output. with open(filename, "rb") as infile: self._pyboard.exec_raw_no_follow(infile.read()) except Exception as e: raise e finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False return output def run_on_board(self, filename, wait_output=True, stream_output=True): """Run the provided script and return its output. If wait_output is True (default) then wait for the script to finish and then return its output, otherwise just run the script and don't wait for any output. If stream_output is True(default) then return None and print outputs to stdout without buffering. """ command = textwrap.dedent(f'exec(open("{filename}").read())') output = None try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() if stream_output: self._pyboard.exec_(command, stream_output=True) elif wait_output: # Run the file and wait for output to return. output = self._pyboard.exec_(command) else: # Read the file and run it using lower level pyboard functions that # won't wait for it to finish or return output. self._pyboard.exec_raw_no_follow(command) except Exception as e: message = str(e) if message.find('OSError: [Errno 2] ENOENT') != -1: reason = f'"{filename}" does not exists!' elif message.find('OSError: [Errno 13] EACCES') != -1: reason = f'"{filename}" access denied!' elif message.find('OSError: [Errno 21] EISDIR') != -1: reason = f'"{filename}" is a directory!' else: reason = 'Traceback (most recent call last):' traceback = message.split(reason) if len(traceback) == 2: for call in traceback[1][:-2].split('\\r\\n'): reason += f'{call}\n' raise Exception(reason.strip()) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False return output def exec(self, command): try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() output = self._pyboard.exec_(textwrap.dedent(command), stream_output=True) except Exception as e: message = str(e) reason = 'Traceback (most recent call last):' traceback = message.split(reason) if len(traceback) == 2: for call in traceback[1][:-2].split('\\r\\n'): reason += f'{call}\n' raise Exception(reason.strip()) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False return output print('Welcome to picowatch lib.') pico = False while not pico: print('-' * 30) device = input('Port: ').strip() baudrate = input('Baudrate (115200): ').strip() or 115200 try: pico = Files(Pyboard(device=device, baudrate=baudrate)) print(f'Connected to device: {device} at a baudrate of: {baudrate}') print('-' * 30) except Exception as e: print(f'Exception: {str(e)}') pico.send_ctrl_c() WATCHING_DIRECTORY = './' def upload(source: str = '', destination: str = ''): real_source = os.path.join(WATCHING_DIRECTORY, *source.split('/')) if not destination: destination = source destination = '/'.join(destination.split('\\')) try: if os.path.isdir(real_source): for root, dirs, files in os.walk(real_source, followlinks=True): droot = '/'.join(root.replace(WATCHING_DIRECTORY, '').split(os.sep)) for dir in dirs: pico.mkdir('/'.join([droot, dir]), exists_okay=True) for filename in files: with open(os.path.join(root, filename), 'rb') as fh: pico.put('/'.join([droot, filename]), fh.read()) time.sleep(.5) elif os.path.exists(real_source): with open(real_source, 'rb') as fh: pico.put(destination, fh.read()) time.sleep(.5) except Exception as e: print(f'Exeception: upload({source}, {destination}): {str(e)}') def download(source: str = '/'): if not source.startswith('/'): source = f'/{source}' if len(source) > 1 and source.endswith('/'): source = source[:-1] try: for filename in pico.ls(directory=source, long_format=False, recursive=True): filename = filename[1:] if filename.startswith('.'): continue if os.path.dirname(filename): os.makedirs(os.path.dirname(filename), mode=777, exist_ok=True) with open(os.path.join(WATCHING_DIRECTORY, *filename.split('/')), 'wb') as fh: fh.write(pico.get(filename)) time.sleep(.5) except Exception as e: print(f'Exeception: download({source}): {str(e)}') def contents(filename: str): try: for ln in pico.get(filename).decode('utf-8').split('\n'): print(ln) except Exception as e: print(f'Exeception: contents({filename}): {str(e)}') def delete(source: str): try: if source.endswith('/'): pico.rmdir(source, missing_okay=True) else: pico.rm(source) except Exception as e: print(f'Exeception: delete({source}): {str(e)}') def ls(source: str = '/'): try: if len(source) > 1 and source.endswith('/'): source = source[:-1] for filename in pico.ls(directory=source, long_format=True, recursive=True): if filename.startswith('/'): filename = filename[1:] if len(filename) > 5: print('→', filename) except Exception as e: print(f'Exeception: ls({source}): {str(e)}') def launch(filename: str = 'main.py'): try: pico.run_on_board(filename) except Exception as e: print(f'Exeception: launch({filename}): {str(e)}') sessions = {'deleted': set(), 'modified': set()} def on_modified_callback(event): if event.is_directory == True: return source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/') sessions['modified'].add(source) def on_deleted_callback(event): source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/') if event.is_directory == True and not source.endswith('/'): source += '/' elif len(source.split('.')) == 1: source += '/' sessions['deleted'].add(source) def watchdog_callback(): for source in sessions['deleted']: delete(source) for source in sessions['modified']: upload(source) sessions['deleted'] = set() sessions['modified'] = set() signal.signal(signal.SIGINT, lambda a, b: pico.send_ctrl_c()) watchdog_event = PatternMatchingEventHandler( patterns = ['*'], ignore_patterns = None, ignore_directories = False, case_sensitive = True ) watchdog_event.on_modified = on_modified_callback watchdog_event.on_deleted = on_deleted_callback watchdog = Observer() watchdog.schedule(watchdog_event, path = WATCHING_DIRECTORY, recursive = True) watchdog.start() try: while True: try: message = input('>>> ').strip() while pico.is_raw_repl_on(): time.sleep(.1) match message.split(' '): case ['0' | 'exit']: sys.exit() case ['reboot' | 'reset']: pico.send_ctrl_d() case ['ls']: ls('/') case ['ls' | 'stat', source]: ls(source) case ['cat' | 'open' | 'contents', source]: contents(source) case ['del' | 'delete' | 'remove', source]: delete(source) case ['upl' | 'upload' | 'update', source]: upload(source) case ['download' | 'backup', source]: download(source) case ['' | 'save']: watchdog_callback() case ['commits']: for filename in sessions['deleted']: print('↺', filename) for filename in sessions['modified']: print('⇈', filename) case ['revert']: sessions['deleted'] = set() sessions['modified'] = set() case _: if message.startswith('./'): launch(message[2:]) elif message: print(f'"{message}" is not recognized.') except Exception as e: print(f'Exception: {str(e)}') except KeyboardInterrupt: watchdog.stop() watchdog.join()