# import os import sys import time import signal from lib.pyboard import Pyboard, Files from watchdog.observers import Observer from watchdog.events import PatternMatchingEventHandler print('Welcome to picowatch lib.') pico = False while not pico: print('-' * 30) device = input('Port: ').strip() baudrate = input('Baudrate (115200): ').strip() or 115200 try: pico = Files(Pyboard(device=device, baudrate=baudrate)) print(f'Connected to device: {device} at a baudrate of: {baudrate}') print('-' * 30) except Exception as e: print(f'Exception: {str(e)}') pico.send_ctrl_c() WATCHING_DIRECTORY = './' def upload(source: str = '', destination: str = ''): real_source = os.path.join(WATCHING_DIRECTORY, *source.split('/')) if not destination: destination = source destination = '/'.join(destination.split('\\')) try: if os.path.isdir(real_source): for root, dirs, files in os.walk(real_source, followlinks=True): droot = '/'.join(root.replace(WATCHING_DIRECTORY, '').split(os.sep)) for dir in dirs: pico.mkdir('/'.join([droot, dir]), exists_okay=True) for filename in files: with open(os.path.join(root, filename), 'rb') as fh: pico.put('/'.join([droot, filename]), fh.read()) time.sleep(.5) elif os.path.exists(real_source): with open(real_source, 'rb') as fh: pico.put(destination, fh.read()) time.sleep(.5) except Exception as e: print(f'Exeception: upload({source}, {destination}): {str(e)}') def download(source: str = '/'): if not source.startswith('/'): source = f'/{source}' if len(source) > 1 and source.endswith('/'): source = source[:-1] try: for filename in pico.ls(directory=source, long_format=False, recursive=True): filename = filename[1:] if filename.startswith('.'): continue if os.path.dirname(filename): os.makedirs(os.path.dirname(filename), mode=777, exist_ok=True) with open(os.path.join(WATCHING_DIRECTORY, *filename.split('/')), 'wb') as fh: fh.write(pico.get(filename)) time.sleep(.5) except Exception as e: print(f'Exeception: download({source}): {str(e)}') def contents(filename: str): try: for ln in pico.get(filename).decode('utf-8').split('\n'): print(ln) except Exception as e: print(f'Exeception: contents({filename}): {str(e)}') def delete(source: str): try: if source.endswith('/'): pico.rmdir(source, missing_okay=True) else: pico.rm(source) except Exception as e: print(f'Exeception: delete({source}): {str(e)}') def ls(source: str = '/'): try: if len(source) > 1 and source.endswith('/'): source = source[:-1] for filename in pico.ls(directory=source, long_format=True, recursive=True): if filename.startswith('/'): filename = filename[1:] if len(filename) > 5: print(f'→ {filename}') except Exception as e: print(f'Exeception: ls({source}): {str(e)}') def launch(filename: str = 'main.py'): try: pico.run_on_board(filename) except Exception as e: print(f'Exeception: launch({filename}): {str(e)}') sessions = {'deleted': set(), 'modified': set()} def on_modified_callback(event): if event.is_directory == True: return source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/') sessions['modified'].add(source) def on_deleted_callback(event): source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/') if event.is_directory == True and not source.endswith('/'): source += '/' elif len(source.split('.')) == 1: source += '/' sessions['deleted'].add(source) def watchdog_callback(): for source in sessions['deleted']: delete(source) for source in sessions['modified']: upload(source) sessions['deleted'] = set() sessions['modified'] = set() signal.signal(signal.SIGINT, lambda a, b: pico.send_ctrl_c()) watchdog_event = PatternMatchingEventHandler( patterns = ['*'], ignore_patterns = None, ignore_directories = False, case_sensitive = True ) watchdog_event.on_modified = on_modified_callback watchdog_event.on_deleted = on_deleted_callback watchdog = Observer() watchdog.schedule(watchdog_event, path = WATCHING_DIRECTORY, recursive = True) watchdog.start() try: while True: try: message = input('>>> ').strip() while pico.is_raw_repl_on(): time.sleep(.1) match message.split(' '): case ['0' | 'exit']: sys.exit() case ['reboot' | 'reset']: pico.send_ctrl_d() case ['autosave', value]: sessions['autosave'] = (value == '1' or value == 'on') case ['ls']: ls('/') case ['ls' | 'stat', source]: ls(source) case ['cat' | 'open' | 'contents', source]: contents(source) case ['del' | 'delete' | 'remove', source]: delete(source) case ['upl' | 'upload' | 'update', source]: upload(source) case ['download' | 'backup', source]: download(source) case ['save']: watchdog_callback() case ['sync', filename]: watchdog_callback() launch(filename) case _: if message.startswith('./'): launch(message[2:]) elif message: print(f'"{message}" is not recognized.') except Exception as e: print(f'Exception: {str(e)}') except KeyboardInterrupt: watchdog.stop() watchdog.join()