diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9a30111 --- /dev/null +++ b/.gitignore @@ -0,0 +1,137 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ diff --git a/picowatch/lib/pyboard.py b/picowatch/lib/pyboard.py new file mode 100644 index 0000000..a0e07f5 --- /dev/null +++ b/picowatch/lib/pyboard.py @@ -0,0 +1,813 @@ +#!/usr/bin/env python + +""" +pyboard interface + +This module provides the Pyboard class, used to communicate with and +control the pyboard over a serial USB connection. + +Example usage: + + import pyboard + pyb = pyboard.Pyboard('/dev/ttyACM0') + +Or: + + pyb = pyboard.Pyboard('192.168.1.1') + +Then: + + pyb.enter_raw_repl() + pyb.exec('pyb.LED(1).on()') + pyb.exit_raw_repl() + +Note: if using Python2 then pyb.exec must be written as pyb.exec_. +To run a script from the local machine on the board and print out the results: + + import pyboard + pyboard.execfile('test.py', device='/dev/ttyACM0') + +This script can also be run directly. To execute a local script, use: + + ./pyboard.py test.py + +Or: + + python pyboard.py test.py + +""" + +# Adafruit MicroPython Tool - File Operations +# Author: Tony DiCola +# Copyright (c) 2016 Adafruit Industries +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import sys +import ast +import time +import textwrap +import binascii + + +_rawdelay = None + +try: + stdout = sys.stdout.buffer +except AttributeError: + # Python2 doesn't have buffer attr + stdout = sys.stdout + +def stdout_write_bytes(b): + b = b.replace(b"\x04", b"") + stdout.write(b) + stdout.flush() + +class PyboardError(Exception): + pass + +class TelnetToSerial: + def __init__(self, ip, user, password, read_timeout=None): + import telnetlib + self.tn = telnetlib.Telnet(ip, timeout=15) + self.read_timeout = read_timeout + if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout): + self.tn.write(bytes(user, 'ascii') + b"\r\n") + + if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout): + # needed because of internal implementation details of the telnet server + time.sleep(0.2) + self.tn.write(bytes(password, 'ascii') + b"\r\n") + + if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout): + # login succesful + from collections import deque + self.fifo = deque() + return + + raise PyboardError('Failed to establish a telnet connection with the board') + + def __del__(self): + self.close() + + def close(self): + try: + self.tn.close() + except: + # the telnet object might not exist yet, so ignore this one + pass + + def read(self, size=1): + while len(self.fifo) < size: + timeout_count = 0 + data = self.tn.read_eager() + if len(data): + self.fifo.extend(data) + timeout_count = 0 + else: + time.sleep(0.25) + if self.read_timeout is not None and timeout_count > 4 * self.read_timeout: + break + timeout_count += 1 + + data = b'' + while len(data) < size and len(self.fifo) > 0: + data += bytes([self.fifo.popleft()]) + return data + + def write(self, data): + self.tn.write(data) + return len(data) + + def inWaiting(self): + n_waiting = len(self.fifo) + if not n_waiting: + data = self.tn.read_eager() + self.fifo.extend(data) + return len(data) + else: + return n_waiting + +class Pyboard: + def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rawdelay=0): + global _rawdelay + _rawdelay = rawdelay + if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3: + # device looks like an IP address + self.serial = TelnetToSerial(device, user, password, read_timeout=10) + else: + import serial + delayed = False + for attempt in range(wait + 1): + try: + self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1) + break + except (OSError, IOError): # Py2 and Py3 have different errors + if wait == 0: + continue + if attempt == 0: + sys.stdout.write(f'Waiting {wait} seconds for pyboard ') + delayed = True + time.sleep(1) + sys.stdout.write('.') + sys.stdout.flush() + else: + if delayed: + print('') + raise PyboardError('failed to access ' + device) + if delayed: + print('') + + def close(self): + self.serial.close() + + def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None): + data = self.serial.read(min_num_bytes) + if data_consumer: + data_consumer(data) + timeout_count = 0 + while True: + if data.endswith(ending): + break + elif self.serial.inWaiting() > 0: + new_data = self.serial.read(1) + data = data + new_data + if data_consumer: + data_consumer(new_data) + timeout_count = 0 + else: + timeout_count += 1 + if timeout is not None and timeout_count >= 100 * timeout: + break + time.sleep(0.01) + return data + + def send_ctrl_a(self): + self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL + + def send_ctrl_b(self): + self.serial.write(b'\r\x02') # ctrl-B: exit raw REPL + + def send_ctrl_c(self): + self.serial.write(b'\r\x03') # ctrl-C twice: interrupt any running program + time.sleep(0.1) + self.serial.write(b'\x03') + time.sleep(0.1) + + def send_ctrl_d(self): + self.serial.write(b'\r\x04') # ctrl-D: soft reset + + def enter_raw_repl(self): + # Brief delay before sending RAW MODE char if requests + if _rawdelay > 0: + time.sleep(_rawdelay) + + # ctrl-C twice: interrupt any running program + self.serial.write(b'\r\x03') + time.sleep(0.1) + self.serial.write(b'\x03') + time.sleep(0.1) + + # flush input (without relying on serial.flushInput()) + n = self.serial.inWaiting() + while n > 0: + self.serial.read(n) + n = self.serial.inWaiting() + + for retry in range(0, 5): + self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL + data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>') + if data.endswith(b'raw REPL; CTRL-B to exit\r\n>'): + break + else: + if retry >= 4: + print(data) + raise PyboardError('could not enter raw repl') + time.sleep(0.2) + + self.serial.write(b'\x04') # ctrl-D: soft reset + data = self.read_until(1, b'soft reboot\r\n') + if not data.endswith(b'soft reboot\r\n'): + print(data) + raise PyboardError('could not enter raw repl') + # By splitting this into 2 reads, it allows boot.py to print stuff, + # which will show up after the soft reboot and before the raw REPL. + # Modification from original pyboard.py below: + # Add a small delay and send Ctrl-C twice after soft reboot to ensure + # any main program loop in main.py is interrupted. + time.sleep(0.5) + self.serial.write(b'\x03') + time.sleep(0.1) # (slight delay before second interrupt + self.serial.write(b'\x03') + # End modification above. + data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n') + if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'): + print(data) + raise PyboardError('could not enter raw repl') + + def exit_raw_repl(self): + self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL + + def follow(self, timeout, data_consumer=None): + # wait for normal output + data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer) + if not data.endswith(b'\x04'): + raise PyboardError('timeout waiting for first EOF reception') + data = data[:-1] + + # wait for error output + data_err = self.read_until(1, b'\x04', timeout=timeout) + if not data_err.endswith(b'\x04'): + raise PyboardError('timeout waiting for second EOF reception') + data_err = data_err[:-1] + + # return normal and error output + return data, data_err + + def exec_raw_no_follow(self, command): + if isinstance(command, bytes): + command_bytes = command + else: + command_bytes = bytes(command, encoding='utf8') + + # check we have a prompt + data = self.read_until(1, b'>') + if not data.endswith(b'>'): + raise PyboardError('could not enter raw repl') + + # write command + for i in range(0, len(command_bytes), 256): + self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))]) + time.sleep(0.01) + self.serial.write(b'\x04') + + # check if we could exec command + data = self.serial.read(2) + if data != b'OK': + raise PyboardError('could not exec command') + + def exec_raw(self, command, timeout=10, data_consumer=None): + self.exec_raw_no_follow(command) + return self.follow(timeout, data_consumer) + + def eval(self, expression): + ret = self.exec_(f'print({expression})') + ret = ret.strip() + return ret + + def exec_(self, command, stream_output=False): + data_consumer = None + if stream_output: + data_consumer = stdout_write_bytes + ret, ret_err = self.exec_raw(command, data_consumer=data_consumer) + if ret_err: + raise PyboardError('exception', ret, ret_err) + return ret + + def execfile(self, filename, stream_output=False): + with open(filename, 'rb') as f: + pyfile = f.read() + return self.exec_(pyfile, stream_output=stream_output) + + def get_time(self): + t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ') + return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6]) + +# in Python2 exec is a keyword so one must use "exec_" +# but for Python3 we want to provide the nicer version "exec" +setattr(Pyboard, "exec", Pyboard.exec_) + +def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'): + pyb = Pyboard(device, baudrate, user, password) + pyb.enter_raw_repl() + output = pyb.execfile(filename) + stdout_write_bytes(output) + pyb.exit_raw_repl() + pyb.close() + +# def main(): +# import argparse +# cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.') +# cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard') +# cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device') +# cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username') +# cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password') +# cmd_parser.add_argument('-c', '--command', help='program passed in as string') +# cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available') +# cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]') +# cmd_parser.add_argument('files', nargs='*', help='input files') +# args = cmd_parser.parse_args() + +# def execbuffer(buf): +# try: +# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) +# pyb.enter_raw_repl() +# ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes) +# pyb.exit_raw_repl() +# pyb.close() +# except PyboardError as er: +# print(er) +# sys.exit(1) +# except KeyboardInterrupt: +# sys.exit(1) +# if ret_err: +# stdout_write_bytes(ret_err) +# sys.exit(1) + +# if args.command is not None: +# execbuffer(args.command.encode('utf-8')) + +# for filename in args.files: +# with open(filename, 'rb') as f: +# pyfile = f.read() +# execbuffer(pyfile) + +# if args.follow or (args.command is None and len(args.files) == 0): +# try: +# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait) +# ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes) +# pyb.close() +# except PyboardError as er: +# print(er) +# sys.exit(1) +# except KeyboardInterrupt: +# sys.exit(1) +# if ret_err: +# stdout_write_bytes(ret_err) +# sys.exit(1) + + + +BUFFER_SIZE = 512 # Amount of data to read or write to the serial port at a time. +# This is kept small because small chips and USB to serial +# bridges usually have very small buffers. + + +class DirectoryExistsError(Exception): + pass + + +class Files(object): + """Class to interact with a MicroPython board files over a serial connection. + Provides functions for listing, uploading, and downloading files from the + board's filesystem. + """ + + __raw_repl_on: bool = False + + def __init__(self, pyboard: Pyboard): + """Initialize the MicroPython board files class using the provided pyboard + instance. In most cases you should create a Pyboard instance (from + pyboard.py) which connects to a board over a serial connection and pass + it in, but you can pass in other objects for testing, etc. + """ + self._pyboard = pyboard + + def is_raw_repl_on(self): + return self.__raw_repl_on + + def send_ctrl_a(self): + self._pyboard.send_ctrl_a() # ctrl-A: enter raw REPL + + def send_ctrl_b(self): + self._pyboard.send_ctrl_b() # ctrl-B: exit raw REPL + + def send_ctrl_c(self): + self._pyboard.send_ctrl_c() # ctrl-C twice: interrupt any running program + + def send_ctrl_d(self): + self._pyboard.send_ctrl_d() # ctrl-D: soft reset + + def ls(self, directory="/", long_format=False, recursive=True): + """List the contents of the specified directory (or root if none is + specified). Returns a list of strings with the names of files in the + specified directory. If long_format is True then a list of 2-tuples + with the name and size (in bytes) of the item is returned. Note that + it appears the size of directories is not supported by MicroPython and + will always return 0 (i.e. no recursive size computation). + """ + + # Disabling for now, see https://github.com/adafruit/ampy/issues/55. + # # Make sure directory ends in a slash. + # if not directory.endswith("/"): + # directory += "/" + + # Make sure directory starts with slash, for consistency. + # if not directory.startswith("/"): + # directory = "/" + directory + + command = """\ + try: + import os + except ImportError: + import uos as os\n""" + + if recursive: + command += """\ + def listdir(directory): + result = set() + + def _listdir(dir_or_file): + try: + # if its a directory, then it should provide some children. + children = os.listdir(dir_or_file) + except OSError: + # probably a file. run stat() to confirm. + os.stat(dir_or_file) + result.add(dir_or_file) + else: + # probably a directory, add to result if empty. + if children: + # queue the children to be dealt with in next iteration. + for child in children: + # create the full path. + if dir_or_file == '/': + next = dir_or_file + child + else: + next = dir_or_file + '/' + child + + _listdir(next) + else: + result.add(dir_or_file) + + _listdir(directory) + return sorted(result)\n""" + else: + command += """\ + def listdir(directory): + if directory == '/': + return sorted([directory + f for f in os.listdir(directory)]) + else: + return sorted([directory + '/' + f for f in os.listdir(directory)])\n""" + + # Execute os.listdir() command on the board. + if long_format: + command += f""" + r = [] + for f in listdir('{directory}'): + size = os.stat(f)[6] + r.append(f'{{f}} ({{size}}b)') + print(r) + """ + else: + command += f""" + print(listdir('{directory}')) + """ + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + output = self._pyboard.exec_(textwrap.dedent(command)) + except PyboardError as ex: + # Check if this is an OSError #2, i.e. directory doesn't exist and + # rethrow it as something more descriptive. + if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1: + raise RuntimeError(f"No such directory: {directory}") + else: + raise ex + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + # Parse the result list and return it. + return ast.literal_eval(output.decode("utf-8")) + + def mkdir(self, directory, exists_okay=False): + """Create the specified directory. Note this cannot create a recursive + hierarchy of directories, instead each one should be created separately. + """ + # Execute os.mkdir command on the board. + command = f""" + try: + import os + except ImportError: + import uos as os + os.mkdir('{directory}') + """ + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + self._pyboard.exec_(textwrap.dedent(command)) + except PyboardError as ex: + # Check if this is an OSError #17, i.e. directory already exists. + if ex.args[2].decode("utf-8").find("OSError: [Errno 17] EEXIST") != -1: + if not exists_okay: + raise DirectoryExistsError(f"Directory already exists: {directory}") + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + def put(self, filename, data): + """Create or update the specified file with the provided data. + """ + # Open the file for writing on the board and write chunks of data. + + if filename.startswith('/'): + filename = filename[1:] + + print(f'↑ {filename}', end='', flush=True) + size = len(data) + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + self._pyboard.exec_(f"f = open('{filename}', 'wb')") + # Loop through and write a buffer size chunk of data at a time. + for i in range(0, size, BUFFER_SIZE): + chunk_size = min(BUFFER_SIZE, size - i) + chunk = repr(data[i : i + chunk_size]) + # Make sure to send explicit byte strings (handles python 2 compatibility). + if not chunk.startswith("b"): + chunk = "b" + chunk + self._pyboard.exec_(f"f.write({chunk})") + self._pyboard.exec_("f.close()") + except Exception as ex: + print(' [x]') + raise ex + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + print(f' - {size}b [✓]') + return size + + def get(self, filename): + """Retrieve the contents of the specified file and return its contents + as a byte string. + """ + # Open the file and read it a few bytes at a time and print out the + # raw bytes. Be careful not to overload the UART buffer so only write + # a few bytes at a time, and don't use print since it adds newlines and + # expects string data. + command = f""" + import sys + import ubinascii + with open('{filename}', 'rb') as infile: + while True: + result = infile.read({BUFFER_SIZE}) + if result == b'': + break + len = sys.stdout.write(ubinascii.hexlify(result)) + """ + print(f'↓ {filename}', end='', flush=True) + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + output = self._pyboard.exec_(textwrap.dedent(command)) + except PyboardError as ex: + print(' [x]') + # Check if this is an OSError #2, i.e. file doesn't exist and + # rethrow it as something more descriptive. + try: + if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1: + raise RuntimeError(f"No such file: {filename}") + else: + raise ex + except UnicodeDecodeError: + raise ex + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + print(f' - {int(len(output) / 2)}b [✓]') + return binascii.unhexlify(output) + + def rm(self, filename): + """Remove the specified file or directory.""" + command = f""" + try: + import os + except ImportError: + import uos as os + os.remove('{filename}') + """ + print(f'↶ {filename}', end='') + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + self._pyboard.exec_(textwrap.dedent(command)) + except PyboardError as ex: + print(' [x]') + message = ex.args[2].decode("utf-8") + # Check if this is an OSError #2, i.e. file/directory doesn't exist + # and rethrow it as something more descriptive. + if message.find("OSError: [Errno 2] ENOENT") != -1: + raise RuntimeError(f"No such file/directory: {filename}") + # Check for OSError #13, the directory isn't empty. + if message.find("OSError: [Errno 13] EACCES") != -1: + raise RuntimeError(f"Directory is not empty: {filename}") + else: + raise ex + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + print(' [✓]') + + def rmdir(self, directory, missing_okay=False): + """Forcefully remove the specified directory and all its children.""" + # Build a script to walk an entire directory structure and delete every + # file and subfolder. This is tricky because MicroPython has no os.walk + # or similar function to walk folders, so this code does it manually + # with recursion and changing directories. For each directory it lists + # the files and deletes everything it can, i.e. all the files. Then + # it lists the files again and assumes they are directories (since they + # couldn't be deleted in the first pass) and recursively clears those + # subdirectories. Finally when finished clearing all the children the + # parent directory is deleted. + command = f""" + try: + import os + except ImportError: + import uos as os + def rmdir(directory): + os.chdir(directory) + for f in os.listdir(): + try: + os.remove(f) + except OSError: + pass + for f in os.listdir(): + rmdir(f) + os.chdir('..') + os.rmdir(directory) + rmdir('{directory}') + """ + print(f'↶ {directory}', end='') + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + self._pyboard.exec_(textwrap.dedent(command)) + except PyboardError as ex: + message = ex.args[2].decode("utf-8") + # Check if this is an OSError #2, i.e. directory doesn't exist + # and rethrow it as something more descriptive. + if message.find("OSError: [Errno 2] ENOENT") != -1: + if not missing_okay: + print(' [x]') + raise RuntimeError(f"No such directory: {directory}") + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + print(' [✓]') + + def run(self, filename, wait_output=True, stream_output=True): + """Run the provided script and return its output. If wait_output is True + (default) then wait for the script to finish and then return its output, + otherwise just run the script and don't wait for any output. + If stream_output is True(default) then return None and print outputs to + stdout without buffering. + """ + output = None + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + + if stream_output: + self._pyboard.execfile(filename, stream_output=True) + elif wait_output: + # Run the file and wait for output to return. + output = self._pyboard.execfile(filename) + else: + # Read the file and run it using lower level pyboard functions that + # won't wait for it to finish or return output. + with open(filename, "rb") as infile: + self._pyboard.exec_raw_no_follow(infile.read()) + except Exception as e: + raise e + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + return output + + + def run_on_board(self, filename, wait_output=True, stream_output=True): + """Run the provided script and return its output. If wait_output is True + (default) then wait for the script to finish and then return its output, + otherwise just run the script and don't wait for any output. + If stream_output is True(default) then return None and print outputs to + stdout without buffering. + """ + command = textwrap.dedent(f'exec(open("{filename}").read())') + output = None + + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + + if stream_output: + self._pyboard.exec_(command, stream_output=True) + elif wait_output: + # Run the file and wait for output to return. + output = self._pyboard.exec_(command) + else: + # Read the file and run it using lower level pyboard functions that + # won't wait for it to finish or return output. + self._pyboard.exec_raw_no_follow(command) + except Exception as e: + message = str(e) + + if message.find('OSError: [Errno 2] ENOENT') != -1: + reason = f'"{filename}" does not exists!' + elif message.find('OSError: [Errno 13] EACCES') != -1: + reason = f'"{filename}" access denied!' + elif message.find('OSError: [Errno 21] EISDIR') != -1: + reason = f'"{filename}" is a directory!' + else: + reason = 'Traceback (most recent call last):' + traceback = message.split(reason) + + if len(traceback) == 2: + for call in traceback[1][:-2].split('\\r\\n'): + reason += f'{call}\n' + + raise Exception(reason.strip()) + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + return output + + def exec(self, command): + try: + self.__raw_repl_on = True + self._pyboard.enter_raw_repl() + output = self._pyboard.exec_(textwrap.dedent(command), stream_output=True) + except Exception as e: + message = str(e) + reason = 'Traceback (most recent call last):' + traceback = message.split(reason) + + if len(traceback) == 2: + for call in traceback[1][:-2].split('\\r\\n'): + reason += f'{call}\n' + + raise Exception(reason.strip()) + finally: + self._pyboard.exit_raw_repl() + self.__raw_repl_on = False + + return output \ No newline at end of file diff --git a/picowatch/picowatch.py b/picowatch/picowatch.py new file mode 100644 index 0000000..dc66c35 --- /dev/null +++ b/picowatch/picowatch.py @@ -0,0 +1,213 @@ +# +import os +import sys +import time +import signal + +from lib.pyboard import Pyboard, Files +from watchdog.observers import Observer +from watchdog.events import PatternMatchingEventHandler + + +print('Welcome to picowatch lib.') +pico = False + +while not pico: + print('-' * 30) + device = input('Port: ').strip() + baudrate = input('Baudrate (115200): ').strip() or 115200 + + try: + pico = Files(Pyboard(device=device, baudrate=baudrate)) + print(f'Connected to device: {device} at a baudrate of: {baudrate}') + print('-' * 30) + except Exception as e: + print(f'Exception: {str(e)}') + +pico.send_ctrl_c() +WATCHING_DIRECTORY = './' + +def upload(source: str = '', destination: str = ''): + real_source = os.path.join(WATCHING_DIRECTORY, *source.split('/')) + + if not destination: + destination = source + + destination = '/'.join(destination.split('\\')) + + try: + if os.path.isdir(real_source): + for root, dirs, files in os.walk(real_source, followlinks=True): + droot = '/'.join(root.replace(WATCHING_DIRECTORY, '').split(os.sep)) + + for dir in dirs: + pico.mkdir('/'.join([droot, dir]), exists_okay=True) + + for filename in files: + with open(os.path.join(root, filename), 'rb') as fh: + pico.put('/'.join([droot, filename]), fh.read()) + + time.sleep(.5) + elif os.path.exists(real_source): + with open(real_source, 'rb') as fh: + pico.put(destination, fh.read()) + + time.sleep(.5) + except Exception as e: + print(f'Exeception: upload({source}, {destination}): {str(e)}') + + +def download(source: str = '/'): + if not source.startswith('/'): + source = f'/{source}' + + if len(source) > 1 and source.endswith('/'): + source = source[:-1] + + try: + for filename in pico.ls(directory=source, long_format=False, recursive=True): + filename = filename[1:] + + if filename.startswith('.'): + continue + + if os.path.dirname(filename): + os.makedirs(os.path.dirname(filename), mode=777, exist_ok=True) + + with open(os.path.join(WATCHING_DIRECTORY, *filename.split('/')), 'wb') as fh: + fh.write(pico.get(filename)) + + time.sleep(.5) + except Exception as e: + print(f'Exeception: download({source}): {str(e)}') + + +def contents(filename: str): + try: + for ln in pico.get(filename).decode('utf-8').split('\n'): + print(ln) + except Exception as e: + print(f'Exeception: contents({filename}): {str(e)}') + + +def delete(source: str): + try: + if source.endswith('/'): + pico.rmdir(source, missing_okay=True) + else: + pico.rm(source) + except Exception as e: + print(f'Exeception: delete({source}): {str(e)}') + + +def ls(source: str = '/'): + try: + if len(source) > 1 and source.endswith('/'): + source = source[:-1] + + for filename in pico.ls(directory=source, long_format=True, recursive=True): + if filename.startswith('/'): + filename = filename[1:] + + if len(filename) > 5: + print(f'→ {filename}') + except Exception as e: + print(f'Exeception: ls({source}): {str(e)}') + + +def launch(filename: str = 'main.py'): + try: + pico.run_on_board(filename) + except Exception as e: + print(f'Exeception: launch({filename}): {str(e)}') + + +sessions = {'deleted': set(), 'modified': set()} + +def on_modified_callback(event): + if event.is_directory == True: + return + + source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/') + sessions['modified'].add(source) + + +def on_deleted_callback(event): + source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/') + + if event.is_directory == True and not source.endswith('/'): + source += '/' + elif len(source.split('.')) == 1: + source += '/' + + sessions['deleted'].add(source) + + +def watchdog_callback(): + for source in sessions['deleted']: + delete(source) + + for source in sessions['modified']: + upload(source) + + sessions['deleted'] = set() + sessions['modified'] = set() + + +signal.signal(signal.SIGINT, lambda a, b: pico.send_ctrl_c()) +watchdog_event = PatternMatchingEventHandler( + patterns = ['*'], + ignore_patterns = None, + ignore_directories = False, + case_sensitive = True +) +watchdog_event.on_modified = on_modified_callback +watchdog_event.on_deleted = on_deleted_callback + +watchdog = Observer() +watchdog.schedule(watchdog_event, path = WATCHING_DIRECTORY, recursive = True) +watchdog.start() + +try: + while True: + try: + message = input('>>> ').strip() + + while pico.is_raw_repl_on(): + time.sleep(.1) + + match message.split(' '): + case ['0' | 'exit']: + sys.exit() + case ['reboot' | 'reset']: + pico.send_ctrl_d() + case ['autosave', value]: + sessions['autosave'] = (value == '1' or value == 'on') + case ['ls']: + ls('/') + case ['ls' | 'stat', source]: + ls(source) + case ['cat' | 'open' | 'contents', source]: + contents(source) + case ['del' | 'delete' | 'remove', source]: + delete(source) + case ['upl' | 'upload' | 'update', source]: + upload(source) + case ['download' | 'backup', source]: + download(source) + case ['save']: + watchdog_callback() + case ['sync', filename]: + watchdog_callback() + launch(filename) + case _: + if message.startswith('./'): + launch(message[2:]) + elif message: + print(f'"{message}" is not recognized.') + except Exception as e: + print(f'Exception: {str(e)}') +except KeyboardInterrupt: + watchdog.stop() + +watchdog.join() diff --git a/pybash b/pybash new file mode 100644 index 0000000..cb3d978 --- /dev/null +++ b/pybash @@ -0,0 +1,3 @@ +#!/bin/bash + +python "$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &> /dev/null && pwd)/$1/$1.py" $(pwd) \ No newline at end of file