From 6e56c7926922f58dc3a2a11ceffed48968f9daf8 Mon Sep 17 00:00:00 2001 From: Gino D Date: Thu, 29 Dec 2022 17:20:41 +0100 Subject: [PATCH] Factorised codes --- picowatch/lib/pyboard.py | 813 -------------------------------------- picowatch/picowatch.py | 824 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 815 insertions(+), 822 deletions(-) delete mode 100644 picowatch/lib/pyboard.py diff --git a/picowatch/lib/pyboard.py b/picowatch/lib/pyboard.py deleted file mode 100644 index a0e07f5..0000000 --- a/picowatch/lib/pyboard.py +++ /dev/null @@ -1,813 +0,0 @@ -#!/usr/bin/env python - -""" -pyboard interface - -This module provides the Pyboard class, used to communicate with and -control the pyboard over a serial USB connection. - -Example usage: - - import pyboard - pyb = pyboard.Pyboard('/dev/ttyACM0') - -Or: - - pyb = pyboard.Pyboard('192.168.1.1') - -Then: - - pyb.enter_raw_repl() - pyb.exec('pyb.LED(1).on()') - pyb.exit_raw_repl() - -Note: if using Python2 then pyb.exec must be written as pyb.exec_. -To run a script from the local machine on the board and print out the results: - - import pyboard - pyboard.execfile('test.py', device='/dev/ttyACM0') - -This script can also be run directly. To execute a local script, use: - - ./pyboard.py test.py - -Or: - - python pyboard.py test.py - -""" - -# Adafruit MicroPython Tool - File Operations -# Author: Tony DiCola -# Copyright (c) 2016 Adafruit Industries -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -import sys -import ast -import time -import textwrap -import binascii - - -_rawdelay = None - -try: - stdout = sys.stdout.buffer -except AttributeError: - # Python2 doesn't have buffer attr - stdout = sys.stdout - -def stdout_write_bytes(b): - b = b.replace(b"\x04", b"") - stdout.write(b) - stdout.flush() - -class PyboardError(Exception): - pass - -class TelnetToSerial: - def __init__(self, ip, user, password, read_timeout=None): - import telnetlib - self.tn = telnetlib.Telnet(ip, timeout=15) - self.read_timeout = read_timeout - if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout): - self.tn.write(bytes(user, 'ascii') + b"\r\n") - - if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout): - # needed because of internal implementation details of the telnet server - time.sleep(0.2) - self.tn.write(bytes(password, 'ascii') + b"\r\n") - - if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout): - # login succesful - from collections import deque - self.fifo = deque() - return - - raise PyboardError('Failed to establish a telnet connection with the board') - - def __del__(self): - self.close() - - def close(self): - try: - self.tn.close() - except: - # the telnet object might not exist yet, so ignore this one - pass - - def read(self, size=1): - while len(self.fifo) < size: - timeout_count = 0 - data = self.tn.read_eager() - if len(data): - self.fifo.extend(data) - timeout_count = 0 - else: - time.sleep(0.25) - if self.read_timeout is not None and timeout_count > 4 * self.read_timeout: - break - timeout_count += 1 - - data = b'' - while len(data) < size and len(self.fifo) > 0: - data += bytes([self.fifo.popleft()]) - return data - - def write(self, data): - self.tn.write(data) - return len(data) - - def inWaiting(self): - n_waiting = len(self.fifo) - if not n_waiting: - data = self.tn.read_eager() - self.fifo.extend(data) - return len(data) - else: - return n_waiting - -class Pyboard: - def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rawdelay=0): - global _rawdelay - _rawdelay = rawdelay - if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3: - # device looks like an IP address - self.serial = TelnetToSerial(device, user, password, read_timeout=10) - else: - import serial - delayed = False - for attempt in range(wait + 1): - try: - self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1) - break - except (OSError, IOError): # Py2 and Py3 have different errors - if wait == 0: - continue - if attempt == 0: - sys.stdout.write(f'Waiting {wait} seconds for pyboard ') - delayed = True - time.sleep(1) - sys.stdout.write('.') - sys.stdout.flush() - else: - if delayed: - print('') - raise PyboardError('failed to access ' + device) - if delayed: - print('') - - def close(self): - self.serial.close() - - def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): - data = self.serial.read(min_num_bytes) - if data_consumer: - data_consumer(data) - timeout_count = 0 - while True: - if data.endswith(ending): - break - elif self.serial.inWaiting() > 0: - new_data = self.serial.read(1) - data = data + new_data - if data_consumer: - data_consumer(new_data) - timeout_count = 0 - else: - timeout_count += 1 - if timeout is not None and timeout_count >= 100 * timeout: - break - time.sleep(0.01) - return data - - def send_ctrl_a(self): - self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL - - def send_ctrl_b(self): - self.serial.write(b'\r\x02') # ctrl-B: exit raw REPL - - def send_ctrl_c(self): - self.serial.write(b'\r\x03') # ctrl-C twice: interrupt any running program - time.sleep(0.1) - self.serial.write(b'\x03') - time.sleep(0.1) - - def send_ctrl_d(self): - self.serial.write(b'\r\x04') # ctrl-D: soft reset - - def enter_raw_repl(self): - # Brief delay before sending RAW MODE char if requests - if _rawdelay > 0: - time.sleep(_rawdelay) - - # ctrl-C twice: interrupt any running program - self.serial.write(b'\r\x03') - time.sleep(0.1) - self.serial.write(b'\x03') - time.sleep(0.1) - - # flush input (without relying on serial.flushInput()) - n = self.serial.inWaiting() - while n > 0: - self.serial.read(n) - n = self.serial.inWaiting() - - for retry in range(0, 5): - self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL - data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>') - if data.endswith(b'raw REPL; CTRL-B to exit\r\n>'): - break - else: - if retry >= 4: - print(data) - raise PyboardError('could not enter raw repl') - time.sleep(0.2) - - self.serial.write(b'\x04') # ctrl-D: soft reset - data = self.read_until(1, b'soft reboot\r\n') - if not data.endswith(b'soft reboot\r\n'): - print(data) - raise PyboardError('could not enter raw repl') - # By splitting this into 2 reads, it allows boot.py to print stuff, - # which will show up after the soft reboot and before the raw REPL. - # Modification from original pyboard.py below: - # Add a small delay and send Ctrl-C twice after soft reboot to ensure - # any main program loop in main.py is interrupted. - time.sleep(0.5) - self.serial.write(b'\x03') - time.sleep(0.1) # (slight delay before second interrupt - self.serial.write(b'\x03') - # End modification above. - data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n') - if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'): - print(data) - raise PyboardError('could not enter raw repl') - - def exit_raw_repl(self): - self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL - - def follow(self, timeout, data_consumer=None): - # wait for normal output - data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer) - if not data.endswith(b'\x04'): - raise PyboardError('timeout waiting for first EOF reception') - data = data[:-1] - - # wait for error output - data_err = self.read_until(1, b'\x04', timeout=timeout) - if not data_err.endswith(b'\x04'): - raise PyboardError('timeout waiting for second EOF reception') - data_err = data_err[:-1] - - # return normal and error output - return data, data_err - - def exec_raw_no_follow(self, command): - if isinstance(command, bytes): - command_bytes = command - else: - command_bytes = bytes(command, encoding='utf8') - - # check we have a prompt - data = self.read_until(1, b'>') - if not data.endswith(b'>'): - raise PyboardError('could not enter raw repl') - - # write command - for i in range(0, len(command_bytes), 256): - self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))]) - time.sleep(0.01) - self.serial.write(b'\x04') - - # check if we could exec command - data = self.serial.read(2) - if data != b'OK': - raise PyboardError('could not exec command') - - def exec_raw(self, command, timeout=10, data_consumer=None): - self.exec_raw_no_follow(command) - return self.follow(timeout, data_consumer) - - def eval(self, expression): - ret = self.exec_(f'print({expression})') - ret = ret.strip() - return ret - - def exec_(self, command, stream_output=False): - data_consumer = None - if stream_output: - data_consumer = stdout_write_bytes - ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) - if ret_err: - raise PyboardError('exception', ret, ret_err) - return ret - - def execfile(self, filename, stream_output=False): - with open(filename, 'rb') as f: - pyfile = f.read() - return self.exec_(pyfile, stream_output=stream_output) - - def get_time(self): - t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ') - return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6]) - -# in Python2 exec is a keyword so one must use "exec_" -# but for Python3 we want to provide the nicer version "exec" -setattr(Pyboard, "exec", Pyboard.exec_) - -def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'): - pyb = Pyboard(device, baudrate, user, password) - pyb.enter_raw_repl() - output = pyb.execfile(filename) - stdout_write_bytes(output) - pyb.exit_raw_repl() - pyb.close() - -# def main(): -# import argparse -# cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.') -# cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard') -# cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device') -# cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username') -# cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password') -# cmd_parser.add_argument('-c', '--command', help='program passed in as string') -# cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available') -# cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]') -# cmd_parser.add_argument('files', nargs='*', help='input files') -# args = cmd_parser.parse_args() - -# def execbuffer(buf): -# try: -# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) -# pyb.enter_raw_repl() -# ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes) -# pyb.exit_raw_repl() -# pyb.close() -# except PyboardError as er: -# print(er) -# sys.exit(1) -# except KeyboardInterrupt: -# sys.exit(1) -# if ret_err: -# stdout_write_bytes(ret_err) -# sys.exit(1) - -# if args.command is not None: -# execbuffer(args.command.encode('utf-8')) - -# for filename in args.files: -# with open(filename, 'rb') as f: -# pyfile = f.read() -# execbuffer(pyfile) - -# if args.follow or (args.command is None and len(args.files) == 0): -# try: -# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) -# ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes) -# pyb.close() -# except PyboardError as er: -# print(er) -# sys.exit(1) -# except KeyboardInterrupt: -# sys.exit(1) -# if ret_err: -# stdout_write_bytes(ret_err) -# sys.exit(1) - - - -BUFFER_SIZE = 512 # Amount of data to read or write to the serial port at a time. -# This is kept small because small chips and USB to serial -# bridges usually have very small buffers. - - -class DirectoryExistsError(Exception): - pass - - -class Files(object): - """Class to interact with a MicroPython board files over a serial connection. - Provides functions for listing, uploading, and downloading files from the - board's filesystem. - """ - - __raw_repl_on: bool = False - - def __init__(self, pyboard: Pyboard): - """Initialize the MicroPython board files class using the provided pyboard - instance. In most cases you should create a Pyboard instance (from - pyboard.py) which connects to a board over a serial connection and pass - it in, but you can pass in other objects for testing, etc. - """ - self._pyboard = pyboard - - def is_raw_repl_on(self): - return self.__raw_repl_on - - def send_ctrl_a(self): - self._pyboard.send_ctrl_a() # ctrl-A: enter raw REPL - - def send_ctrl_b(self): - self._pyboard.send_ctrl_b() # ctrl-B: exit raw REPL - - def send_ctrl_c(self): - self._pyboard.send_ctrl_c() # ctrl-C twice: interrupt any running program - - def send_ctrl_d(self): - self._pyboard.send_ctrl_d() # ctrl-D: soft reset - - def ls(self, directory="/", long_format=False, recursive=True): - """List the contents of the specified directory (or root if none is - specified). Returns a list of strings with the names of files in the - specified directory. If long_format is True then a list of 2-tuples - with the name and size (in bytes) of the item is returned. Note that - it appears the size of directories is not supported by MicroPython and - will always return 0 (i.e. no recursive size computation). - """ - - # Disabling for now, see https://github.com/adafruit/ampy/issues/55. - # # Make sure directory ends in a slash. - # if not directory.endswith("/"): - # directory += "/" - - # Make sure directory starts with slash, for consistency. - # if not directory.startswith("/"): - # directory = "/" + directory - - command = """\ - try: - import os - except ImportError: - import uos as os\n""" - - if recursive: - command += """\ - def listdir(directory): - result = set() - - def _listdir(dir_or_file): - try: - # if its a directory, then it should provide some children. - children = os.listdir(dir_or_file) - except OSError: - # probably a file. run stat() to confirm. - os.stat(dir_or_file) - result.add(dir_or_file) - else: - # probably a directory, add to result if empty. - if children: - # queue the children to be dealt with in next iteration. - for child in children: - # create the full path. - if dir_or_file == '/': - next = dir_or_file + child - else: - next = dir_or_file + '/' + child - - _listdir(next) - else: - result.add(dir_or_file) - - _listdir(directory) - return sorted(result)\n""" - else: - command += """\ - def listdir(directory): - if directory == '/': - return sorted([directory + f for f in os.listdir(directory)]) - else: - return sorted([directory + '/' + f for f in os.listdir(directory)])\n""" - - # Execute os.listdir() command on the board. - if long_format: - command += f""" - r = [] - for f in listdir('{directory}'): - size = os.stat(f)[6] - r.append(f'{{f}} ({{size}}b)') - print(r) - """ - else: - command += f""" - print(listdir('{directory}')) - """ - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - output = self._pyboard.exec_(textwrap.dedent(command)) - except PyboardError as ex: - # Check if this is an OSError #2, i.e. directory doesn't exist and - # rethrow it as something more descriptive. - if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1: - raise RuntimeError(f"No such directory: {directory}") - else: - raise ex - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - # Parse the result list and return it. - return ast.literal_eval(output.decode("utf-8")) - - def mkdir(self, directory, exists_okay=False): - """Create the specified directory. Note this cannot create a recursive - hierarchy of directories, instead each one should be created separately. - """ - # Execute os.mkdir command on the board. - command = f""" - try: - import os - except ImportError: - import uos as os - os.mkdir('{directory}') - """ - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - self._pyboard.exec_(textwrap.dedent(command)) - except PyboardError as ex: - # Check if this is an OSError #17, i.e. directory already exists. - if ex.args[2].decode("utf-8").find("OSError: [Errno 17] EEXIST") != -1: - if not exists_okay: - raise DirectoryExistsError(f"Directory already exists: {directory}") - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - def put(self, filename, data): - """Create or update the specified file with the provided data. - """ - # Open the file for writing on the board and write chunks of data. - - if filename.startswith('/'): - filename = filename[1:] - - print(f'↑ {filename}', end='', flush=True) - size = len(data) - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - self._pyboard.exec_(f"f = open('{filename}', 'wb')") - # Loop through and write a buffer size chunk of data at a time. - for i in range(0, size, BUFFER_SIZE): - chunk_size = min(BUFFER_SIZE, size - i) - chunk = repr(data[i : i + chunk_size]) - # Make sure to send explicit byte strings (handles python 2 compatibility). - if not chunk.startswith("b"): - chunk = "b" + chunk - self._pyboard.exec_(f"f.write({chunk})") - self._pyboard.exec_("f.close()") - except Exception as ex: - print(' [x]') - raise ex - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - print(f' - {size}b [✓]') - return size - - def get(self, filename): - """Retrieve the contents of the specified file and return its contents - as a byte string. - """ - # Open the file and read it a few bytes at a time and print out the - # raw bytes. Be careful not to overload the UART buffer so only write - # a few bytes at a time, and don't use print since it adds newlines and - # expects string data. - command = f""" - import sys - import ubinascii - with open('{filename}', 'rb') as infile: - while True: - result = infile.read({BUFFER_SIZE}) - if result == b'': - break - len = sys.stdout.write(ubinascii.hexlify(result)) - """ - print(f'↓ {filename}', end='', flush=True) - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - output = self._pyboard.exec_(textwrap.dedent(command)) - except PyboardError as ex: - print(' [x]') - # Check if this is an OSError #2, i.e. file doesn't exist and - # rethrow it as something more descriptive. - try: - if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1: - raise RuntimeError(f"No such file: {filename}") - else: - raise ex - except UnicodeDecodeError: - raise ex - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - print(f' - {int(len(output) / 2)}b [✓]') - return binascii.unhexlify(output) - - def rm(self, filename): - """Remove the specified file or directory.""" - command = f""" - try: - import os - except ImportError: - import uos as os - os.remove('{filename}') - """ - print(f'↶ {filename}', end='') - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - self._pyboard.exec_(textwrap.dedent(command)) - except PyboardError as ex: - print(' [x]') - message = ex.args[2].decode("utf-8") - # Check if this is an OSError #2, i.e. file/directory doesn't exist - # and rethrow it as something more descriptive. - if message.find("OSError: [Errno 2] ENOENT") != -1: - raise RuntimeError(f"No such file/directory: {filename}") - # Check for OSError #13, the directory isn't empty. - if message.find("OSError: [Errno 13] EACCES") != -1: - raise RuntimeError(f"Directory is not empty: {filename}") - else: - raise ex - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - print(' [✓]') - - def rmdir(self, directory, missing_okay=False): - """Forcefully remove the specified directory and all its children.""" - # Build a script to walk an entire directory structure and delete every - # file and subfolder. This is tricky because MicroPython has no os.walk - # or similar function to walk folders, so this code does it manually - # with recursion and changing directories. For each directory it lists - # the files and deletes everything it can, i.e. all the files. Then - # it lists the files again and assumes they are directories (since they - # couldn't be deleted in the first pass) and recursively clears those - # subdirectories. Finally when finished clearing all the children the - # parent directory is deleted. - command = f""" - try: - import os - except ImportError: - import uos as os - def rmdir(directory): - os.chdir(directory) - for f in os.listdir(): - try: - os.remove(f) - except OSError: - pass - for f in os.listdir(): - rmdir(f) - os.chdir('..') - os.rmdir(directory) - rmdir('{directory}') - """ - print(f'↶ {directory}', end='') - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - self._pyboard.exec_(textwrap.dedent(command)) - except PyboardError as ex: - message = ex.args[2].decode("utf-8") - # Check if this is an OSError #2, i.e. directory doesn't exist - # and rethrow it as something more descriptive. - if message.find("OSError: [Errno 2] ENOENT") != -1: - if not missing_okay: - print(' [x]') - raise RuntimeError(f"No such directory: {directory}") - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - print(' [✓]') - - def run(self, filename, wait_output=True, stream_output=True): - """Run the provided script and return its output. If wait_output is True - (default) then wait for the script to finish and then return its output, - otherwise just run the script and don't wait for any output. - If stream_output is True(default) then return None and print outputs to - stdout without buffering. - """ - output = None - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - - if stream_output: - self._pyboard.execfile(filename, stream_output=True) - elif wait_output: - # Run the file and wait for output to return. - output = self._pyboard.execfile(filename) - else: - # Read the file and run it using lower level pyboard functions that - # won't wait for it to finish or return output. - with open(filename, "rb") as infile: - self._pyboard.exec_raw_no_follow(infile.read()) - except Exception as e: - raise e - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - return output - - - def run_on_board(self, filename, wait_output=True, stream_output=True): - """Run the provided script and return its output. If wait_output is True - (default) then wait for the script to finish and then return its output, - otherwise just run the script and don't wait for any output. - If stream_output is True(default) then return None and print outputs to - stdout without buffering. - """ - command = textwrap.dedent(f'exec(open("{filename}").read())') - output = None - - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - - if stream_output: - self._pyboard.exec_(command, stream_output=True) - elif wait_output: - # Run the file and wait for output to return. - output = self._pyboard.exec_(command) - else: - # Read the file and run it using lower level pyboard functions that - # won't wait for it to finish or return output. - self._pyboard.exec_raw_no_follow(command) - except Exception as e: - message = str(e) - - if message.find('OSError: [Errno 2] ENOENT') != -1: - reason = f'"{filename}" does not exists!' - elif message.find('OSError: [Errno 13] EACCES') != -1: - reason = f'"{filename}" access denied!' - elif message.find('OSError: [Errno 21] EISDIR') != -1: - reason = f'"{filename}" is a directory!' - else: - reason = 'Traceback (most recent call last):' - traceback = message.split(reason) - - if len(traceback) == 2: - for call in traceback[1][:-2].split('\\r\\n'): - reason += f'{call}\n' - - raise Exception(reason.strip()) - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - return output - - def exec(self, command): - try: - self.__raw_repl_on = True - self._pyboard.enter_raw_repl() - output = self._pyboard.exec_(textwrap.dedent(command), stream_output=True) - except Exception as e: - message = str(e) - reason = 'Traceback (most recent call last):' - traceback = message.split(reason) - - if len(traceback) == 2: - for call in traceback[1][:-2].split('\\r\\n'): - reason += f'{call}\n' - - raise Exception(reason.strip()) - finally: - self._pyboard.exit_raw_repl() - self.__raw_repl_on = False - - return output \ No newline at end of file diff --git a/picowatch/picowatch.py b/picowatch/picowatch.py index dc66c35..cceb39e 100644 --- a/picowatch/picowatch.py +++ b/picowatch/picowatch.py @@ -1,14 +1,824 @@ +#!/usr/bin/env python + +""" +pyboard interface + +This module provides the Pyboard class, used to communicate with and +control the pyboard over a serial USB connection. + +Example usage: + + import pyboard + pyb = pyboard.Pyboard('/dev/ttyACM0') + +Or: + + pyb = pyboard.Pyboard('192.168.1.1') + +Then: + + pyb.enter_raw_repl() + pyb.exec('pyb.LED(1).on()') + pyb.exit_raw_repl() + +Note: if using Python2 then pyb.exec must be written as pyb.exec_. +To run a script from the local machine on the board and print out the results: + + import pyboard + pyboard.execfile('test.py', device='/dev/ttyACM0') + +This script can also be run directly. To execute a local script, use: + + ./pyboard.py test.py + +Or: + + python pyboard.py test.py + +""" + +# Adafruit MicroPython Tool - File Operations +# Author: Tony DiCola +# Copyright (c) 2016 Adafruit Industries # -import os +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + import sys +import ast import time +import textwrap +import binascii + +import os import signal -from lib.pyboard import Pyboard, Files from watchdog.observers import Observer from watchdog.events import PatternMatchingEventHandler +_rawdelay = None + +try: + stdout = sys.stdout.buffer +except AttributeError: + # Python2 doesn't have buffer attr + stdout = sys.stdout + +def stdout_write_bytes(b): + b = b.replace(b"\x04", b"") + stdout.write(b) + stdout.flush() + +class PyboardError(Exception): + pass + +class TelnetToSerial: + def __init__(self, ip, user, password, read_timeout=None): + import telnetlib + self.tn = telnetlib.Telnet(ip, timeout=15) + self.read_timeout = read_timeout + if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout): + self.tn.write(bytes(user, 'ascii') + b"\r\n") + + if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout): + # needed because of internal implementation details of the telnet server + time.sleep(0.2) + self.tn.write(bytes(password, 'ascii') + b"\r\n") + + if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout): + # login succesful + from collections import deque + self.fifo = deque() + return + + raise PyboardError('Failed to establish a telnet connection with the board') + + def __del__(self): + self.close() + + def close(self): + try: + self.tn.close() + except: + # the telnet object might not exist yet, so ignore this one + pass + + def read(self, size=1): + while len(self.fifo) < size: + timeout_count = 0 + data = self.tn.read_eager() + if len(data): + self.fifo.extend(data) + timeout_count = 0 + else: + time.sleep(0.25) + if self.read_timeout is not None and timeout_count > 4 * self.read_timeout: + break + timeout_count += 1 + + data = b'' + while len(data) < size and len(self.fifo) > 0: + data += bytes([self.fifo.popleft()]) + return data + + def write(self, data): + self.tn.write(data) + return len(data) + + def inWaiting(self): + n_waiting = len(self.fifo) + if not n_waiting: + data = self.tn.read_eager() + self.fifo.extend(data) + return len(data) + else: + return n_waiting + +class Pyboard: + def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rawdelay=0): + global _rawdelay + _rawdelay = rawdelay + if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3: + # device looks like an IP address + self.serial = TelnetToSerial(device, user, password, read_timeout=10) + else: + import serial + delayed = False + for attempt in range(wait + 1): + try: + self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1) + break + except (OSError, IOError): # Py2 and Py3 have different errors + if wait == 0: + continue + if attempt == 0: + sys.stdout.write(f'Waiting {wait} seconds for pyboard ') + delayed = True + time.sleep(1) + sys.stdout.write('.') + sys.stdout.flush() + else: + if delayed: + print('') + raise PyboardError('failed to access ' + device) + if delayed: + print('') + + def close(self): + self.serial.close() + + def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): + data = self.serial.read(min_num_bytes) + if data_consumer: + data_consumer(data) + timeout_count = 0 + while True: + if data.endswith(ending): + break + elif self.serial.inWaiting() > 0: + new_data = self.serial.read(1) + data = data + new_data + if data_consumer: + data_consumer(new_data) + timeout_count = 0 + else: + timeout_count += 1 + if timeout is not None and timeout_count >= 100 * timeout: + break + time.sleep(0.01) + return data + + def send_ctrl_a(self): + self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL + + def send_ctrl_b(self): + self.serial.write(b'\r\x02') # ctrl-B: exit raw REPL + + def send_ctrl_c(self): + self.serial.write(b'\r\x03') # ctrl-C twice: interrupt any running program + time.sleep(0.1) + self.serial.write(b'\x03') + time.sleep(0.1) + + def send_ctrl_d(self): + self.serial.write(b'\r\x04') # ctrl-D: soft reset + + def enter_raw_repl(self): + # Brief delay before sending RAW MODE char if requests + if _rawdelay > 0: + time.sleep(_rawdelay) + + # ctrl-C twice: interrupt any running program + self.serial.write(b'\r\x03') + time.sleep(0.1) + self.serial.write(b'\x03') + time.sleep(0.1) + + # flush input (without relying on serial.flushInput()) + n = self.serial.inWaiting() + while n > 0: + self.serial.read(n) + n = self.serial.inWaiting() + + for retry in range(0, 5): + self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL + data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>') + if data.endswith(b'raw REPL; CTRL-B to exit\r\n>'): + break + else: + if retry >= 4: + print(data) + raise PyboardError('could not enter raw repl') + time.sleep(0.2) + + self.serial.write(b'\x04') # ctrl-D: soft reset + data = self.read_until(1, b'soft reboot\r\n') + if not data.endswith(b'soft reboot\r\n'): + print(data) + raise PyboardError('could not enter raw repl') + # By splitting this into 2 reads, it allows boot.py to print stuff, + # which will show up after the soft reboot and before the raw REPL. + # Modification from original pyboard.py below: + # Add a small delay and send Ctrl-C twice after soft reboot to ensure + # any main program loop in main.py is interrupted. + time.sleep(0.5) + self.serial.write(b'\x03') + time.sleep(0.1) # (slight delay before second interrupt + self.serial.write(b'\x03') + # End modification above. + data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n') + if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'): + print(data) + raise PyboardError('could not enter raw repl') + + def exit_raw_repl(self): + self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL + + def follow(self, timeout, data_consumer=None): + # wait for normal output + data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer) + if not data.endswith(b'\x04'): + raise PyboardError('timeout waiting for first EOF reception') + data = data[:-1] + + # wait for error output + data_err = self.read_until(1, b'\x04', timeout=timeout) + if not data_err.endswith(b'\x04'): + raise PyboardError('timeout waiting for second EOF reception') + data_err = data_err[:-1] + + # return normal and error output + return data, data_err + + def exec_raw_no_follow(self, command): + if isinstance(command, bytes): + command_bytes = command + else: + command_bytes = bytes(command, encoding='utf8') + + # check we have a prompt + data = self.read_until(1, b'>') + if not data.endswith(b'>'): + raise PyboardError('could not enter raw repl') + + # write command + for i in range(0, len(command_bytes), 256): + self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))]) + time.sleep(0.01) + self.serial.write(b'\x04') + + # check if we could exec command + data = self.serial.read(2) + if data != b'OK': + raise PyboardError('could not exec command') + + def exec_raw(self, command, timeout=10, data_consumer=None): + self.exec_raw_no_follow(command) + return self.follow(timeout, data_consumer) + + def eval(self, expression): + ret = self.exec_(f'print({expression})') + ret = ret.strip() + return ret + + def exec_(self, command, stream_output=False): + data_consumer = None + if stream_output: + data_consumer = stdout_write_bytes + ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) + if ret_err: + raise PyboardError('exception', ret, ret_err) + return ret + + def execfile(self, filename, stream_output=False): + with open(filename, 'rb') as f: + pyfile = f.read() + return self.exec_(pyfile, stream_output=stream_output) + + def get_time(self): + t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ') + return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6]) + +# in Python2 exec is a keyword so one must use "exec_" +# but for Python3 we want to provide the nicer version "exec" +setattr(Pyboard, "exec", Pyboard.exec_) + +def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'): + pyb = Pyboard(device, baudrate, user, password) + pyb.enter_raw_repl() + output = pyb.execfile(filename) + stdout_write_bytes(output) + pyb.exit_raw_repl() + pyb.close() + +# def main(): +# import argparse +# cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.') +# cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard') +# cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device') +# cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username') +# cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password') +# cmd_parser.add_argument('-c', '--command', help='program passed in as string') +# cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available') +# cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]') +# cmd_parser.add_argument('files', nargs='*', help='input files') +# args = cmd_parser.parse_args() + +# def execbuffer(buf): +# try: +# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) +# pyb.enter_raw_repl() +# ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes) +# pyb.exit_raw_repl() +# pyb.close() +# except PyboardError as er: +# print(er) +# sys.exit(1) +# except KeyboardInterrupt: +# sys.exit(1) +# if ret_err: +# stdout_write_bytes(ret_err) +# sys.exit(1) + +# if args.command is not None: +# execbuffer(args.command.encode('utf-8')) + +# for filename in args.files: +# with open(filename, 'rb') as f: +# pyfile = f.read() +# execbuffer(pyfile) + +# if args.follow or (args.command is None and len(args.files) == 0): +# try: +# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) +# ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes) +# pyb.close() +# except PyboardError as er: +# print(er) +# sys.exit(1) +# except KeyboardInterrupt: +# sys.exit(1) +# if ret_err: +# stdout_write_bytes(ret_err) +# sys.exit(1) + + + +BUFFER_SIZE = 512 # Amount of data to read or write to the serial port at a time. +# This is kept small because small chips and USB to serial +# bridges usually have very small buffers. + + +class DirectoryExistsError(Exception): + pass + + +class Files(object): + """Class to interact with a MicroPython board files over a serial connection. + Provides functions for listing, uploading, and downloading files from the + board's filesystem. + """ + + __raw_repl_on: bool = False + + def __init__(self, pyboard: Pyboard): + """Initialize the MicroPython board files class using the provided pyboard + instance. In most cases you should create a Pyboard instance (from + pyboard.py) which connects to a board over a serial connection and pass + it in, but you can pass in other objects for testing, etc. + """ + self._pyboard = pyboard + + def is_raw_repl_on(self): + return self.__raw_repl_on + + def send_ctrl_a(self): + self._pyboard.send_ctrl_a() # ctrl-A: enter raw REPL + + def send_ctrl_b(self): + self._pyboard.send_ctrl_b() # ctrl-B: exit raw REPL + + def send_ctrl_c(self): + self._pyboard.send_ctrl_c() # ctrl-C twice: interrupt any running program + + def send_ctrl_d(self): + self._pyboard.send_ctrl_d() # ctrl-D: soft reset + + def ls(self, directory="/", long_format=False, recursive=True): + """List the contents of the specified directory (or root if none is + specified). Returns a list of strings with the names of files in the + specified directory. If long_format is True then a list of 2-tuples + with the name and size (in bytes) of the item is returned. Note that + it appears the size of directories is not supported by MicroPython and + will always return 0 (i.e. no recursive size computation). + """ + + # Disabling for now, see https://github.com/adafruit/ampy/issues/55. + # # Make sure directory ends in a slash. + # if not directory.endswith("/"): + # directory += "/" + + # Make sure directory starts with slash, for consistency. + # if not directory.startswith("/"): + # directory = "/" + directory + + command = """\ + try: + import os + except ImportError: + import uos as os\n""" + + if recursive: + command += """\ + def listdir(directory): + result = set() + + def _listdir(dir_or_file): + try: + # if its a directory, then it should provide some children. + children = os.listdir(dir_or_file) + except OSError: + # probably a file. run stat() to confirm. + os.stat(dir_or_file) + result.add(dir_or_file) + else: + # probably a directory, add to result if empty. + if children: + # queue the children to be dealt with in next iteration. + for child in children: + # create the full path. + if dir_or_file == '/': + next = dir_or_file + child + else: + next = dir_or_file + '/' + child + + _listdir(next) + else: + result.add(dir_or_file) + + _listdir(directory) + return sorted(result)\n""" + else: + command += """\ + def listdir(directory): + if directory == '/': + return sorted([directory + f for f in os.listdir(directory)]) + else: + return sorted([directory + '/' + f for f in os.listdir(directory)])\n""" + + # Execute os.listdir() command on the board. + if long_format: + command += f""" + r = [] + for f in listdir('{directory}'): + size = os.stat(f)[6] + r.append(f'{{f}} ({{size}}b)') + print(r) + """ + else: + command += f""" + print(listdir('{directory}')) + """ + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + output = self._pyboard.exec_(textwrap.dedent(command)) + except PyboardError as ex: + # Check if this is an OSError #2, i.e. directory doesn't exist and + # rethrow it as something more descriptive. + if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1: + raise RuntimeError(f"No such directory: {directory}") + else: + raise ex + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + # Parse the result list and return it. + return ast.literal_eval(output.decode("utf-8")) + + def mkdir(self, directory, exists_okay=False): + """Create the specified directory. Note this cannot create a recursive + hierarchy of directories, instead each one should be created separately. + """ + # Execute os.mkdir command on the board. + command = f""" + try: + import os + except ImportError: + import uos as os + os.mkdir('{directory}') + """ + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + self._pyboard.exec_(textwrap.dedent(command)) + except PyboardError as ex: + # Check if this is an OSError #17, i.e. directory already exists. + if ex.args[2].decode("utf-8").find("OSError: [Errno 17] EEXIST") != -1: + if not exists_okay: + raise DirectoryExistsError(f"Directory already exists: {directory}") + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + def put(self, filename, data): + """Create or update the specified file with the provided data. + """ + # Open the file for writing on the board and write chunks of data. + + if filename.startswith('/'): + filename = filename[1:] + + print(f'↑ {filename}', end='', flush=True) + size = len(data) + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + self._pyboard.exec_(f"f = open('{filename}', 'wb')") + # Loop through and write a buffer size chunk of data at a time. + for i in range(0, size, BUFFER_SIZE): + chunk_size = min(BUFFER_SIZE, size - i) + chunk = repr(data[i : i + chunk_size]) + # Make sure to send explicit byte strings (handles python 2 compatibility). + if not chunk.startswith("b"): + chunk = "b" + chunk + self._pyboard.exec_(f"f.write({chunk})") + self._pyboard.exec_("f.close()") + except Exception as ex: + print(' [x]') + raise ex + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + print(f' - {size}b [✓]') + return size + + def get(self, filename): + """Retrieve the contents of the specified file and return its contents + as a byte string. + """ + # Open the file and read it a few bytes at a time and print out the + # raw bytes. Be careful not to overload the UART buffer so only write + # a few bytes at a time, and don't use print since it adds newlines and + # expects string data. + command = f""" + import sys + import ubinascii + with open('{filename}', 'rb') as infile: + while True: + result = infile.read({BUFFER_SIZE}) + if result == b'': + break + len = sys.stdout.write(ubinascii.hexlify(result)) + """ + print(f'↓ {filename}', end='', flush=True) + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + output = self._pyboard.exec_(textwrap.dedent(command)) + except PyboardError as ex: + print(' [x]') + # Check if this is an OSError #2, i.e. file doesn't exist and + # rethrow it as something more descriptive. + try: + if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1: + raise RuntimeError(f"No such file: {filename}") + else: + raise ex + except UnicodeDecodeError: + raise ex + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + print(f' - {int(len(output) / 2)}b [✓]') + return binascii.unhexlify(output) + + def rm(self, filename): + """Remove the specified file or directory.""" + command = f""" + try: + import os + except ImportError: + import uos as os + os.remove('{filename}') + """ + print(f'↶ {filename}', end='') + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + self._pyboard.exec_(textwrap.dedent(command)) + except PyboardError as ex: + print(' [x]') + message = ex.args[2].decode("utf-8") + # Check if this is an OSError #2, i.e. file/directory doesn't exist + # and rethrow it as something more descriptive. + if message.find("OSError: [Errno 2] ENOENT") != -1: + raise RuntimeError(f"No such file/directory: {filename}") + # Check for OSError #13, the directory isn't empty. + if message.find("OSError: [Errno 13] EACCES") != -1: + raise RuntimeError(f"Directory is not empty: {filename}") + else: + raise ex + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + print(' [✓]') + + def rmdir(self, directory, missing_okay=False): + """Forcefully remove the specified directory and all its children.""" + # Build a script to walk an entire directory structure and delete every + # file and subfolder. This is tricky because MicroPython has no os.walk + # or similar function to walk folders, so this code does it manually + # with recursion and changing directories. For each directory it lists + # the files and deletes everything it can, i.e. all the files. Then + # it lists the files again and assumes they are directories (since they + # couldn't be deleted in the first pass) and recursively clears those + # subdirectories. Finally when finished clearing all the children the + # parent directory is deleted. + command = f""" + try: + import os + except ImportError: + import uos as os + def rmdir(directory): + os.chdir(directory) + for f in os.listdir(): + try: + os.remove(f) + except OSError: + pass + for f in os.listdir(): + rmdir(f) + os.chdir('..') + os.rmdir(directory) + rmdir('{directory}') + """ + print(f'↶ {directory}', end='') + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + self._pyboard.exec_(textwrap.dedent(command)) + except PyboardError as ex: + message = ex.args[2].decode("utf-8") + # Check if this is an OSError #2, i.e. directory doesn't exist + # and rethrow it as something more descriptive. + if message.find("OSError: [Errno 2] ENOENT") != -1: + if not missing_okay: + print(' [x]') + raise RuntimeError(f"No such directory: {directory}") + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + print(' [✓]') + + def run(self, filename, wait_output=True, stream_output=True): + """Run the provided script and return its output. If wait_output is True + (default) then wait for the script to finish and then return its output, + otherwise just run the script and don't wait for any output. + If stream_output is True(default) then return None and print outputs to + stdout without buffering. + """ + output = None + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + + if stream_output: + self._pyboard.execfile(filename, stream_output=True) + elif wait_output: + # Run the file and wait for output to return. + output = self._pyboard.execfile(filename) + else: + # Read the file and run it using lower level pyboard functions that + # won't wait for it to finish or return output. + with open(filename, "rb") as infile: + self._pyboard.exec_raw_no_follow(infile.read()) + except Exception as e: + raise e + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + return output + + + def run_on_board(self, filename, wait_output=True, stream_output=True): + """Run the provided script and return its output. If wait_output is True + (default) then wait for the script to finish and then return its output, + otherwise just run the script and don't wait for any output. + If stream_output is True(default) then return None and print outputs to + stdout without buffering. + """ + command = textwrap.dedent(f'exec(open("{filename}").read())') + output = None + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + + if stream_output: + self._pyboard.exec_(command, stream_output=True) + elif wait_output: + # Run the file and wait for output to return. + output = self._pyboard.exec_(command) + else: + # Read the file and run it using lower level pyboard functions that + # won't wait for it to finish or return output. + self._pyboard.exec_raw_no_follow(command) + except Exception as e: + message = str(e) + + if message.find('OSError: [Errno 2] ENOENT') != -1: + reason = f'"{filename}" does not exists!' + elif message.find('OSError: [Errno 13] EACCES') != -1: + reason = f'"{filename}" access denied!' + elif message.find('OSError: [Errno 21] EISDIR') != -1: + reason = f'"{filename}" is a directory!' + else: + reason = 'Traceback (most recent call last):' + traceback = message.split(reason) + + if len(traceback) == 2: + for call in traceback[1][:-2].split('\\r\\n'): + reason += f'{call}\n' + + raise Exception(reason.strip()) + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + return output + + def exec(self, command): + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + output = self._pyboard.exec_(textwrap.dedent(command), stream_output=True) + except Exception as e: + message = str(e) + reason = 'Traceback (most recent call last):' + traceback = message.split(reason) + + if len(traceback) == 2: + for call in traceback[1][:-2].split('\\r\\n'): + reason += f'{call}\n' + + raise Exception(reason.strip()) + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + return output + + print('Welcome to picowatch lib.') pico = False @@ -168,9 +978,10 @@ watchdog = Observer() watchdog.schedule(watchdog_event, path = WATCHING_DIRECTORY, recursive = True) watchdog.start() + try: while True: - try: + try: message = input('>>> ').strip() while pico.is_raw_repl_on(): @@ -181,8 +992,6 @@ try: sys.exit() case ['reboot' | 'reset']: pico.send_ctrl_d() - case ['autosave', value]: - sessions['autosave'] = (value == '1' or value == 'on') case ['ls']: ls('/') case ['ls' | 'stat', source]: @@ -195,11 +1004,8 @@ try: upload(source) case ['download' | 'backup', source]: download(source) - case ['save']: + case ['' | 'save']: watchdog_callback() - case ['sync', filename]: - watchdog_callback() - launch(filename) case _: if message.startswith('./'): launch(message[2:])