diff --git a/picowatch_d/picowatch.py b/picowatch_d/picowatch.py index 318f249..d1d8c60 100644 --- a/picowatch_d/picowatch.py +++ b/picowatch_d/picowatch.py @@ -414,7 +414,6 @@ class Files(object): Provides functions for listing, uploading, and downloading files from the board's filesystem. """ - __raw_repl_on: bool = False def __init__(self, pyboard: Pyboard): @@ -440,6 +439,27 @@ class Files(object): def send_ctrl_d(self): self._pyboard.send_ctrl_d() # ctrl-D: soft reset + def handle_traceback(self, e: Exception): + message = e.args[2].decode('utf-8') + oserror = message.split('OSError:') + + if len(oserror) == 2: + reason = oserror[1].strip() + + if reason == '-2': + reason = '[Errno -2] EUNKNOWN' + if reason == '39': + reason = '[Errno 39] ENEPTY' + else: + reason = 'Traceback (most recent call last):' + traceback = message.split(reason) + + if len(traceback) == 2: + for call in traceback[1][:-2].split('\\r\\n'): + reason += f'{call}\n' + + raise Exception(reason.strip()) + def ls(self, directory="/", long_format=False, recursive=True): """List the contents of the specified directory (or root if none is specified). Returns a list of strings with the names of files in the @@ -508,7 +528,7 @@ class Files(object): r = [] for f in listdir('{directory}'): size = os.stat(f)[6] - r.append(f'{{f}} ({{size}}b)') + r.append(f'{{f}} - {{size}}b') print(r) """ else: @@ -520,13 +540,8 @@ class Files(object): self.__raw_repl_on = True self._pyboard.enter_raw_repl() output = self._pyboard.exec_(textwrap.dedent(command)) - except PyboardError as ex: - # Check if this is an OSError #2, i.e. directory doesn't exist and - # rethrow it as something more descriptive. - if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1: - raise RuntimeError(f"No such directory: {directory}") - else: - raise ex + except Exception as e: + self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False @@ -534,7 +549,7 @@ class Files(object): # Parse the result list and return it. return ast.literal_eval(output.decode("utf-8")) - def mkdir(self, directory, exists_okay=False): + def mkdir(self, directory, exists_okay=True): """Create the specified directory. Note this cannot create a recursive hierarchy of directories, instead each one should be created separately. """ @@ -551,11 +566,9 @@ class Files(object): self.__raw_repl_on = True self._pyboard.enter_raw_repl() self._pyboard.exec_(textwrap.dedent(command)) - except PyboardError as ex: - # Check if this is an OSError #17, i.e. directory already exists. - if ex.args[2].decode("utf-8").find("OSError: [Errno 17] EEXIST") != -1: - if not exists_okay: - raise DirectoryExistsError(f"Directory already exists: {directory}") + except Exception as e: + if exists_okay == False: + self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False @@ -568,7 +581,7 @@ class Files(object): if filename.startswith('/'): filename = filename[1:] - print(f'↑ {filename}', end='', flush=True) + print(f'+ {filename}', end='', flush=True) size = len(data) try: @@ -586,23 +599,7 @@ class Files(object): self._pyboard.exec_("f.close()") except Exception as e: print(' [x]') - message = str(e) - - if message.find('OSError: [Errno 2] ENOENT') != -1: - reason = f'"{filename}" does not exists!' - elif message.find('OSError: [Errno 13] EACCES') != -1: - reason = f'"{filename}" access denied!' - elif message.find('OSError: [Errno 21] EISDIR') != -1: - reason = f'"{filename}" is a directory!' - else: - reason = 'Traceback (most recent call last):' - traceback = message.split(reason) - - if len(traceback) == 2: - for call in traceback[1][:-2].split('\\r\\n'): - reason += f'{call}\n' - - raise Exception(reason.strip()) + self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False @@ -634,22 +631,14 @@ class Files(object): self.__raw_repl_on = True self._pyboard.enter_raw_repl() output = self._pyboard.exec_(textwrap.dedent(command)) - except PyboardError as ex: + except PyboardError as e: print(' [x]') - # Check if this is an OSError #2, i.e. file doesn't exist and - # rethrow it as something more descriptive. - try: - if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1: - raise RuntimeError(f"No such file: {filename}") - else: - raise ex - except UnicodeDecodeError: - raise ex + self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False - print(f' - {int(len(output) / 2)}b [✓]') + print(f' {int(len(output) / 2)}b [✓]') return binascii.unhexlify(output) def rm(self, filename): @@ -667,18 +656,9 @@ class Files(object): self.__raw_repl_on = True self._pyboard.enter_raw_repl() self._pyboard.exec_(textwrap.dedent(command)) - except PyboardError as ex: + except Exception as e: print(' [x]') - message = ex.args[2].decode("utf-8") - # Check if this is an OSError #2, i.e. file/directory doesn't exist - # and rethrow it as something more descriptive. - if message.find("OSError: [Errno 2] ENOENT") != -1: - raise RuntimeError(f"No such file/directory: {filename}") - # Check for OSError #13, the directory isn't empty. - if message.find("OSError: [Errno 13] EACCES") != -1: - raise RuntimeError(f"Directory is not empty: {filename}") - else: - raise ex + self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False @@ -720,52 +700,15 @@ class Files(object): self.__raw_repl_on = True self._pyboard.enter_raw_repl() self._pyboard.exec_(textwrap.dedent(command)) - except PyboardError as ex: - message = ex.args[2].decode("utf-8") - # Check if this is an OSError #2, i.e. directory doesn't exist - # and rethrow it as something more descriptive. - if message.find("OSError: [Errno 2] ENOENT") != -1: - if not missing_okay: - print(' [x]') - raise RuntimeError(f"No such directory: {directory}") + except Exception as e: + print(' [x]') + self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False print(' [✓]') - def run(self, filename, wait_output=True, stream_output=True): - """Run the provided script and return its output. If wait_output is True - (default) then wait for the script to finish and then return its output, - otherwise just run the script and don't wait for any output. - If stream_output is True(default) then return None and print outputs to - stdout without buffering. - """ - output = None - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - - if stream_output: - self._pyboard.execfile(filename, stream_output=True) - elif wait_output: - # Run the file and wait for output to return. - output = self._pyboard.execfile(filename) - else: - # Read the file and run it using lower level pyboard functions that - # won't wait for it to finish or return output. - with open(filename, "rb") as infile: - self._pyboard.exec_raw_no_follow(infile.read()) - except Exception as e: - raise e - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - return output - - def run_on_board(self, filename, wait_output=True, stream_output=True): """Run the provided script and return its output. If wait_output is True (default) then wait for the script to finish and then return its output, @@ -790,23 +733,7 @@ class Files(object): # won't wait for it to finish or return output. self._pyboard.exec_raw_no_follow(command) except Exception as e: - message = str(e) - - if message.find('OSError: [Errno 2] ENOENT') != -1: - reason = f'"{filename}" does not exists!' - elif message.find('OSError: [Errno 13] EACCES') != -1: - reason = f'"{filename}" access denied!' - elif message.find('OSError: [Errno 21] EISDIR') != -1: - reason = f'"{filename}" is a directory!' - else: - reason = 'Traceback (most recent call last):' - traceback = message.split(reason) - - if len(traceback) == 2: - for call in traceback[1][:-2].split('\\r\\n'): - reason += f'{call}\n' - - raise Exception(reason.strip()) + self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False @@ -819,15 +746,7 @@ class Files(object): self._pyboard.enter_raw_repl() output = self._pyboard.exec_(textwrap.dedent(command), stream_output=True) except Exception as e: - message = str(e) - reason = 'Traceback (most recent call last):' - traceback = message.split(reason) - - if len(traceback) == 2: - for call in traceback[1][:-2].split('\\r\\n'): - reason += f'{call}\n' - - raise Exception(reason.strip()) + self.handle_traceback(e) finally: self._pyboard.exit_raw_repl() self.__raw_repl_on = False @@ -848,7 +767,7 @@ while not pico: print(f'Connected to device: {device} at a baudrate of: {baudrate}') print('-' * 30) except Exception as e: - print(f'Exception: {str(e)}') + print(str(e)) pico.send_ctrl_c() WATCHING_DIRECTORY = './' @@ -880,7 +799,7 @@ def upload(source: str = '', destination: str = ''): time.sleep(.5) except Exception as e: - print(f'Exception: {str(e)}') + print(str(e)) def download(source: str = '/'): @@ -905,7 +824,7 @@ def download(source: str = '/'): time.sleep(.5) except Exception as e: - print(f'Exception: {str(e)}') + print(str(e)) def contents(filename: str): @@ -913,17 +832,33 @@ def contents(filename: str): for ln in pico.get(filename).decode('utf-8').split('\n'): print(ln) except Exception as e: - print(f'Exception: {str(e)}') + print(str(e)) -def delete(source: str): +def delete(source: str, is_directory: bool = False): try: - if source.endswith('/'): - pico.rmdir(source, missing_okay=True) + if is_directory: + for filename in pico.ls(directory=source, long_format=False, recursive=True): + if filename.startswith('/'): + filename = filename[1:] + + pico.rm(filename) + + try: + for filename in pico.ls(directory=source, long_format=False, recursive=True): + if filename.startswith('/'): + filename = filename[1:] + + if not filename.endswith('/'): + filename += '/' + + pico.rm(filename) + except: + pass else: pico.rm(source) except Exception as e: - print(f'Exception: {str(e)}') + print(str(e)) def ls(source: str = '/'): @@ -935,17 +870,17 @@ def ls(source: str = '/'): if filename.startswith('/'): filename = filename[1:] - if len(filename) > 5: + if len(filename) > 6: print('→', filename) except Exception as e: - print(f'Exception: {str(e)}') + print(str(e)) def launch(filename: str = 'main.py'): try: pico.run_on_board(filename) except Exception as e: - print(f'Exception: {str(e)}') + print(str(e)) sessions = {'deleted': set(), 'modified': set()} @@ -998,7 +933,8 @@ watchdog.start() try: while True: try: - message = input('>>> ').strip() + print('>>> ', end='') + message = input().strip() while pico.is_raw_repl_on(): time.sleep(.1) @@ -1012,8 +948,12 @@ try: ls(source[0] if source else '/') case ['cat' | 'open' | 'contents', source]: contents(source) - case ['del' | 'delete' | 'remove', source]: + case ['del' | 'rm' | 'delete', source]: delete(source) + case ['del*' | 'rm*' | 'rmdir' | 'delete*', source]: + delete(source, is_directory=True) + case ['format']: + delete('/', is_directory=True) case ['upl' | 'upload' | 'update', source]: upload(source) case ['download' | 'backup', source]: @@ -1035,7 +975,7 @@ try: elif message: print(f'"{message}" is not recognized.') except Exception as e: - print(f'Exception: {str(e)}') + print(str(e)) except KeyboardInterrupt: watchdog.stop()