#!/usr/bin/env python """ pyboard interface This module provides the Pyboard class, used to communicate with and control the pyboard over a serial USB connection. Example usage: import pyboard pyb = pyboard.Pyboard('/dev/ttyACM0') Or: pyb = pyboard.Pyboard('192.168.1.1') Then: pyb.enter_raw_repl() pyb.exec('pyb.LED(1).on()') pyb.exit_raw_repl() Note: if using Python2 then pyb.exec must be written as pyb.exec_. To run a script from the local machine on the board and print out the results: import pyboard pyboard.execfile('test.py', device='/dev/ttyACM0') This script can also be run directly. To execute a local script, use: ./pyboard.py test.py Or: python pyboard.py test.py """ # Adafruit MicroPython Tool - File Operations # Author: Tony DiCola # Copyright (c) 2016 Adafruit Industries # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import sys import ast import time import textwrap import binascii import os import signal from watchdog.observers import Observer from watchdog.events import PatternMatchingEventHandler _rawdelay = None try: stdout = sys.stdout.buffer except AttributeError: # Python2 doesn't have buffer attr stdout = sys.stdout def stdout_write_bytes(b): b = b.replace(b"\x04", b"") stdout.write(b) stdout.flush() class PyboardError(Exception): pass class TelnetToSerial: def __init__(self, ip, user, password, read_timeout=None): import telnetlib self.tn = telnetlib.Telnet(ip, timeout=15) self.read_timeout = read_timeout if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout): self.tn.write(bytes(user, 'ascii') + b"\r\n") if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout): # needed because of internal implementation details of the telnet server time.sleep(0.2) self.tn.write(bytes(password, 'ascii') + b"\r\n") if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout): # login succesful from collections import deque self.fifo = deque() return raise PyboardError('Failed to establish a telnet connection with the board') def __del__(self): self.close() def close(self): try: self.tn.close() except: # the telnet object might not exist yet, so ignore this one pass def read(self, size=1): while len(self.fifo) < size: timeout_count = 0 data = self.tn.read_eager() if len(data): self.fifo.extend(data) timeout_count = 0 else: time.sleep(0.25) if self.read_timeout is not None and timeout_count > 4 * self.read_timeout: break timeout_count += 1 data = b'' while len(data) < size and len(self.fifo) > 0: data += bytes([self.fifo.popleft()]) return data def write(self, data): self.tn.write(data) return len(data) def inWaiting(self): n_waiting = len(self.fifo) if not n_waiting: data = self.tn.read_eager() self.fifo.extend(data) return len(data) else: return n_waiting class Pyboard: def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rawdelay=0): global _rawdelay _rawdelay = rawdelay if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3: # device looks like an IP address self.serial = TelnetToSerial(device, user, password, read_timeout=10) else: import serial delayed = False for attempt in range(wait + 1): try: self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1) break except (OSError, IOError): # Py2 and Py3 have different errors if wait == 0: continue if attempt == 0: sys.stdout.write(f'Waiting {wait} seconds for pyboard ') delayed = True time.sleep(1) sys.stdout.write('.') sys.stdout.flush() else: if delayed: print('') raise PyboardError('failed to access ' + device) if delayed: print('') def close(self): self.serial.close() def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): data = self.serial.read(min_num_bytes) if data_consumer: data_consumer(data) timeout_count = 0 while True: if data.endswith(ending): break elif self.serial.inWaiting() > 0: new_data = self.serial.read(1) data = data + new_data if data_consumer: data_consumer(new_data) timeout_count = 0 else: timeout_count += 1 if timeout is not None and timeout_count >= 100 * timeout: break time.sleep(0.01) return data def send_ctrl_a(self): self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL def send_ctrl_b(self): self.serial.write(b'\r\x02') # ctrl-B: exit raw REPL def send_ctrl_c(self): self.serial.write(b'\r\x03') # ctrl-C twice: interrupt any running program time.sleep(0.1) self.serial.write(b'\x03') time.sleep(0.1) def send_ctrl_d(self): self.serial.write(b'\r\x04') # ctrl-D: soft reset def enter_raw_repl(self): # Brief delay before sending RAW MODE char if requests if _rawdelay > 0: time.sleep(_rawdelay) # ctrl-C twice: interrupt any running program self.serial.write(b'\r\x03') time.sleep(0.1) self.serial.write(b'\x03') time.sleep(0.1) # flush input (without relying on serial.flushInput()) n = self.serial.inWaiting() while n > 0: self.serial.read(n) n = self.serial.inWaiting() for retry in range(0, 5): self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>') if data.endswith(b'raw REPL; CTRL-B to exit\r\n>'): break else: if retry >= 4: print(data) raise PyboardError('could not enter raw repl') time.sleep(0.2) self.serial.write(b'\x04') # ctrl-D: soft reset data = self.read_until(1, b'soft reboot\r\n') if not data.endswith(b'soft reboot\r\n'): print(data) raise PyboardError('could not enter raw repl') # By splitting this into 2 reads, it allows boot.py to print stuff, # which will show up after the soft reboot and before the raw REPL. # Modification from original pyboard.py below: # Add a small delay and send Ctrl-C twice after soft reboot to ensure # any main program loop in main.py is interrupted. time.sleep(0.5) self.serial.write(b'\x03') time.sleep(0.1) # (slight delay before second interrupt self.serial.write(b'\x03') # End modification above. data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n') if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'): print(data) raise PyboardError('could not enter raw repl') def exit_raw_repl(self): self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL def follow(self, timeout, data_consumer=None): # wait for normal output data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer) if not data.endswith(b'\x04'): raise PyboardError('timeout waiting for first EOF reception') data = data[:-1] # wait for error output data_err = self.read_until(1, b'\x04', timeout=timeout) if not data_err.endswith(b'\x04'): raise PyboardError('timeout waiting for second EOF reception') data_err = data_err[:-1] # return normal and error output return data, data_err def exec_raw_no_follow(self, command): if isinstance(command, bytes): command_bytes = command else: command_bytes = bytes(command, encoding='utf8') # check we have a prompt data = self.read_until(1, b'>') if not data.endswith(b'>'): raise PyboardError('could not enter raw repl') # write command for i in range(0, len(command_bytes), 256): self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))]) time.sleep(0.01) self.serial.write(b'\x04') # check if we could exec command data = self.serial.read(2) if data != b'OK': raise PyboardError('could not exec command') def exec_raw(self, command, timeout=10, data_consumer=None): self.exec_raw_no_follow(command) return self.follow(timeout, data_consumer) def eval(self, expression): ret = self.exec_(f'print({expression})') ret = ret.strip() return ret def exec_(self, command, stream_output=False): data_consumer = None if stream_output: data_consumer = stdout_write_bytes ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) if ret_err: raise PyboardError('exception', ret, ret_err) return ret def execfile(self, filename, stream_output=False): with open(filename, 'rb') as f: pyfile = f.read() return self.exec_(pyfile, stream_output=stream_output) def get_time(self): t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ') return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6]) # in Python2 exec is a keyword so one must use "exec_" # but for Python3 we want to provide the nicer version "exec" setattr(Pyboard, "exec", Pyboard.exec_) def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'): pyb = Pyboard(device, baudrate, user, password) pyb.enter_raw_repl() output = pyb.execfile(filename) stdout_write_bytes(output) pyb.exit_raw_repl() pyb.close() # def main(): # import argparse # cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.') # cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard') # cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device') # cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username') # cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password') # cmd_parser.add_argument('-c', '--command', help='program passed in as string') # cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available') # cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]') # cmd_parser.add_argument('files', nargs='*', help='input files') # args = cmd_parser.parse_args() # def execbuffer(buf): # try: # pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) # pyb.enter_raw_repl() # ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes) # pyb.exit_raw_repl() # pyb.close() # except PyboardError as er: # print(er) # sys.exit(1) # except KeyboardInterrupt: # sys.exit(1) # if ret_err: # stdout_write_bytes(ret_err) # sys.exit(1) # if args.command is not None: # execbuffer(args.command.encode('utf-8')) # for filename in args.files: # with open(filename, 'rb') as f: # pyfile = f.read() # execbuffer(pyfile) # if args.follow or (args.command is None and len(args.files) == 0): # try: # pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) # ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes) # pyb.close() # except PyboardError as er: # print(er) # sys.exit(1) # except KeyboardInterrupt: # sys.exit(1) # if ret_err: # stdout_write_bytes(ret_err) # sys.exit(1) BUFFER_SIZE = 512 # Amount of data to read or write to the serial port at a time. # This is kept small because small chips and USB to serial # bridges usually have very small buffers. class DirectoryExistsError(Exception): pass class Files(object): """Class to interact with a MicroPython board files over a serial connection. Provides functions for listing, uploading, and downloading files from the board's filesystem. """ __raw_repl_on: bool = False def __init__(self, pyboard: Pyboard): """Initialize the MicroPython board files class using the provided pyboard instance. In most cases you should create a Pyboard instance (from pyboard.py) which connects to a board over a serial connection and pass it in, but you can pass in other objects for testing, etc. """ self._pyboard = pyboard def is_raw_repl_on(self): return self.__raw_repl_on def send_ctrl_a(self): self._pyboard.send_ctrl_a() # ctrl-A: enter raw REPL def send_ctrl_b(self): self._pyboard.send_ctrl_b() # ctrl-B: exit raw REPL def send_ctrl_c(self): self._pyboard.send_ctrl_c() # ctrl-C twice: interrupt any running program def send_ctrl_d(self): self._pyboard.send_ctrl_d() # ctrl-D: soft reset def handle_traceback(self, e: Exception): message = e.args[2].decode('utf-8') oserror = message.split('OSError:') if len(oserror) == 2: reason = oserror[1].strip() if reason == '-2': reason = '[Errno -2] EUNKNOWN' if reason == '39': reason = '[Errno 39] ENEPTY' else: reason = 'Traceback (most recent call last):' traceback = message.split(reason) if len(traceback) == 2: for call in traceback[1][:-2].split('\\r\\n'): reason += f'{call}\n' raise Exception(reason.strip()) def ls(self, directory="/", long_format=False, recursive=True): """List the contents of the specified directory (or root if none is specified). Returns a list of strings with the names of files in the specified directory. If long_format is True then a list of 2-tuples with the name and size (in bytes) of the item is returned. Note that it appears the size of directories is not supported by MicroPython and will always return 0 (i.e. no recursive size computation). """ # Disabling for now, see https://github.com/adafruit/ampy/issues/55. # # Make sure directory ends in a slash. # if not directory.endswith("/"): # directory += "/" # Make sure directory starts with slash, for consistency. # if not directory.startswith("/"): # directory = "/" + directory command = """\ try: import os except ImportError: import uos as os\n""" if recursive: command += """\ def listdir(directory): result = set() def _listdir(dir_or_file): try: # if its a directory, then it should provide some children. children = os.listdir(dir_or_file) except OSError: # probably a file. run stat() to confirm. os.stat(dir_or_file) result.add(dir_or_file) else: # probably a directory, add to result if empty. if children: # queue the children to be dealt with in next iteration. for child in children: # create the full path. if dir_or_file == '/': next = dir_or_file + child else: next = dir_or_file + '/' + child _listdir(next) else: result.add(dir_or_file) _listdir(directory) return sorted(result)\n""" else: command += """\ def listdir(directory): if directory == '/': return sorted([directory + f for f in os.listdir(directory)]) else: return sorted([directory + '/' + f for f in os.listdir(directory)])\n""" # Execute os.listdir() command on the board. if long_format: command += f""" r = [] for f in listdir('{directory}'): size = os.stat(f)[6] r.append(f'{{f}} - {{size}}b') print(r) """ else: command += f""" print(listdir('{directory}')) """ try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() output = self._pyboard.exec_(textwrap.dedent(command)) except Exception as e: self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False # Parse the result list and return it. return ast.literal_eval(output.decode("utf-8")) def mkdir(self, directory, exists_okay=True): """Create the specified directory. Note this cannot create a recursive hierarchy of directories, instead each one should be created separately. """ # Execute os.mkdir command on the board. command = f""" try: import os except ImportError: import uos as os os.mkdir('{directory}') """ try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() self._pyboard.exec_(textwrap.dedent(command)) except Exception as e: if exists_okay == False: self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False def put(self, filename, data): """Create or update the specified file with the provided data. """ # Open the file for writing on the board and write chunks of data. if filename.startswith('/'): filename = filename[1:] print(f'+ {filename}', end='', flush=True) size = len(data) try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() self._pyboard.exec_(f"f = open('{filename}', 'wb')") # Loop through and write a buffer size chunk of data at a time. for i in range(0, size, BUFFER_SIZE): chunk_size = min(BUFFER_SIZE, size - i) chunk = repr(data[i : i + chunk_size]) # Make sure to send explicit byte strings (handles python 2 compatibility). if not chunk.startswith("b"): chunk = "b" + chunk self._pyboard.exec_(f"f.write({chunk})") self._pyboard.exec_("f.close()") except Exception as e: print(' [x] > ', end='') self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False print(f' - {size}b [✓]') return size def get(self, filename): """Retrieve the contents of the specified file and return its contents as a byte string. """ # Open the file and read it a few bytes at a time and print out the # raw bytes. Be careful not to overload the UART buffer so only write # a few bytes at a time, and don't use print since it adds newlines and # expects string data. command = f""" import sys import ubinascii with open('{filename}', 'rb') as infile: while True: result = infile.read({BUFFER_SIZE}) if result == b'': break len = sys.stdout.write(ubinascii.hexlify(result)) """ print(f'↓ {filename}', end='', flush=True) try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() output = self._pyboard.exec_(textwrap.dedent(command)) except PyboardError as e: print(' [x] > ', end='') self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False print(f' {int(len(output) / 2)}b [✓]') return binascii.unhexlify(output) def rm(self, filename): """Remove the specified file or directory.""" command = f""" try: import os except ImportError: import uos as os os.remove('{filename}') """ print(f'- {filename}', end='') try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() self._pyboard.exec_(textwrap.dedent(command)) except Exception as e: print(' [x] > ', end='') self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False print(' [✓]') def rmdir(self, directory, missing_okay=False): """Forcefully remove the specified directory and all its children.""" # Build a script to walk an entire directory structure and delete every # file and subfolder. This is tricky because MicroPython has no os.walk # or similar function to walk folders, so this code does it manually # with recursion and changing directories. For each directory it lists # the files and deletes everything it can, i.e. all the files. Then # it lists the files again and assumes they are directories (since they # couldn't be deleted in the first pass) and recursively clears those # subdirectories. Finally when finished clearing all the children the # parent directory is deleted. command = f""" try: import os except ImportError: import uos as os def rmdir(directory): os.chdir(directory) for f in os.listdir(): try: os.remove(f) except OSError: pass for f in os.listdir(): rmdir(f) os.chdir('..') os.rmdir(directory) rmdir('{directory}') """ print(f'- {directory}', end='') try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() self._pyboard.exec_(textwrap.dedent(command)) except Exception as e: if not missing_okay: print(' [x] > ', end='') self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False print(' [✓]') def run_on_board(self, filename, wait_output=True, stream_output=True): """Run the provided script and return its output. If wait_output is True (default) then wait for the script to finish and then return its output, otherwise just run the script and don't wait for any output. If stream_output is True(default) then return None and print outputs to stdout without buffering. """ command = textwrap.dedent(f'exec(open("{filename}").read())') output = None try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() if stream_output: self._pyboard.exec_(command, stream_output=True) elif wait_output: # Run the file and wait for output to return. output = self._pyboard.exec_(command) else: # Read the file and run it using lower level pyboard functions that # won't wait for it to finish or return output. self._pyboard.exec_raw_no_follow(command) except Exception as e: self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False return output def exec(self, command): try: self.__raw_repl_on = True self._pyboard.enter_raw_repl() output = self._pyboard.exec_(textwrap.dedent(command), stream_output=True) except Exception as e: self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False return output print('Welcome to picowatch lib.') pico = False while not pico: print('-' * 30) device = input('Port: ').strip() baudrate = input('Baudrate (115200): ').strip() or 115200 try: pico = Files(Pyboard(device=device, baudrate=baudrate)) print(f'Connected to device: {device} at a baudrate of: {baudrate}') print('-' * 30) except Exception as e: print(str(e)) pico.send_ctrl_c() WATCHING_DIRECTORY = './' def upload(source: str = '', destination: str = ''): real_source = os.path.join(WATCHING_DIRECTORY, *source.split('/')) if not destination: destination = source destination = '/'.join(destination.split('\\')) try: if os.path.isdir(real_source): for root, dirs, files in os.walk(real_source, followlinks=True): droot = '/'.join(root.replace(WATCHING_DIRECTORY, '').split(os.sep)) for dir in dirs: pico.mkdir('/'.join([droot, dir]), exists_okay=True) for filename in files: with open(os.path.join(root, filename), 'rb') as fh: pico.put('/'.join([droot, filename]), fh.read()) time.sleep(.5) elif os.path.exists(real_source): dirpath = [] for d in os.path.dirname(destination).split('/'): dirpath.append(d) pico.mkdir('/'.join(dirpath), exists_okay=True) with open(real_source, 'rb') as fh: pico.put(destination, fh.read()) except Exception as e: print(str(e)) def download(source: str = '/'): if not source.startswith('/'): source = f'/{source}' if len(source) > 1 and source.endswith('/'): source = source[:-1] try: for filename in pico.ls(directory=source, long_format=False, recursive=True): filename = filename[1:] if filename.startswith('.'): continue if os.path.dirname(filename): os.makedirs(os.path.dirname(filename), mode=777, exist_ok=True) with open(os.path.join(WATCHING_DIRECTORY, *filename.split('/')), 'wb') as fh: fh.write(pico.get(filename)) except Exception as e: print(str(e)) def contents(filename: str): try: for ln in pico.get(filename).decode('utf-8').split('\n'): print(ln) except Exception as e: print(str(e)) def delete(source: str, is_directory: bool = False): try: if is_directory: if not source.endswith('/'): source += '/' pico.rmdir(source, missing_okay=True) else: pico.rm(source) except Exception as e: print(str(e)) def ls(source: str = '/'): try: if len(source) > 1 and source.endswith('/'): source = source[:-1] for filename in pico.ls(directory=source, long_format=True, recursive=True): if filename.startswith('/'): filename = filename[1:] if len(filename) > 6: print('·', filename) except Exception as e: print(str(e)) def launch(filename: str = 'main.py'): try: pico.run_on_board(filename) except Exception as e: print(str(e)) sessions = {'deleted': set(), 'modified': set()} def on_modified_callback(event): if event.is_directory == True: return source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/') sessions['modified'].add(source) def on_deleted_callback(event): source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/') if event.is_directory == True and not source.endswith('/'): source += '/' elif len(source.split('.')) == 1: source += '/' sessions['deleted'].add(source) def watchdog_callback(): for source in sessions['deleted']: delete(source, is_directory=source.endswith('/')) for source in sessions['modified']: upload(source) sessions['deleted'] = set() sessions['modified'] = set() signal.signal(signal.SIGINT, lambda a, b: pico.send_ctrl_c()) watchdog_event = PatternMatchingEventHandler( patterns = ['*'], ignore_patterns = None, ignore_directories = False, case_sensitive = True ) watchdog_event.on_modified = on_modified_callback watchdog_event.on_deleted = on_deleted_callback watchdog = Observer() watchdog.schedule(watchdog_event, path = WATCHING_DIRECTORY, recursive = True) watchdog.start() try: while True: try: print('>>> ', end='') message = input().strip() while pico.is_raw_repl_on(): time.sleep(.1) match message.split(' '): case ['0' | 'exit']: sys.exit() case ['reboot' | 'reset']: pico.send_ctrl_d() case ['ls' | 'stat', *source]: ls(source[0] if source else '/') case ['cat' | 'open' | 'contents', source]: contents(source) case ['del' | 'rm' | 'delete', source]: delete(source) case ['del*' | 'rm*' | 'rmdir' | 'delete*', source]: delete(source, is_directory=True) case ['format']: delete('/', is_directory=True) case ['upl' | 'upload' | 'update', source]: upload(source) case ['restore']: upload('/') case ['download' | 'transfer', source]: download(source) case ['backup']: download('/') case ['' | 'save' | 'commit']: watchdog_callback() case ['status' | 'staged']: for filename in sessions['deleted']: print('-', filename) for filename in sessions['modified']: print('+', filename) case ['cancel' | 'unstaged']: sessions['deleted'] = set() sessions['modified'] = set() case _: if message.startswith('./'): launch(message[2:]) elif message: print(f'"{message}" is not recognized.') except Exception as e: print(str(e)) except KeyboardInterrupt: watchdog.stop() watchdog.join()