Files
primadiag-pybash/picowatch_d/picowatch.py

1043 lines
37 KiB
Python
Raw Normal View History

2022-12-29 17:20:41 +01:00
#!/usr/bin/env python
"""
pyboard interface
This module provides the Pyboard class, used to communicate with and
control the pyboard over a serial USB connection.
Example usage:
import pyboard
pyb = pyboard.Pyboard('/dev/ttyACM0')
Or:
pyb = pyboard.Pyboard('192.168.1.1')
Then:
pyb.enter_raw_repl()
pyb.exec('pyb.LED(1).on()')
pyb.exit_raw_repl()
Note: if using Python2 then pyb.exec must be written as pyb.exec_.
To run a script from the local machine on the board and print out the results:
import pyboard
pyboard.execfile('test.py', device='/dev/ttyACM0')
This script can also be run directly. To execute a local script, use:
./pyboard.py test.py
Or:
python pyboard.py test.py
"""
# Adafruit MicroPython Tool - File Operations
# Author: Tony DiCola
# Copyright (c) 2016 Adafruit Industries
2022-12-29 16:37:55 +01:00
#
2022-12-29 17:20:41 +01:00
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
2022-12-29 16:37:55 +01:00
import sys
2022-12-29 17:20:41 +01:00
import ast
2022-12-29 16:37:55 +01:00
import time
2022-12-29 17:20:41 +01:00
import textwrap
import binascii
import os
2022-12-29 16:37:55 +01:00
import signal
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
2022-12-29 17:20:41 +01:00
_rawdelay = None
try:
stdout = sys.stdout.buffer
except AttributeError:
# Python2 doesn't have buffer attr
stdout = sys.stdout
def stdout_write_bytes(b):
b = b.replace(b"\x04", b"")
stdout.write(b)
stdout.flush()
class PyboardError(Exception):
pass
class TelnetToSerial:
def __init__(self, ip, user, password, read_timeout=None):
import telnetlib
self.tn = telnetlib.Telnet(ip, timeout=15)
self.read_timeout = read_timeout
if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout):
self.tn.write(bytes(user, 'ascii') + b"\r\n")
if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout):
# needed because of internal implementation details of the telnet server
time.sleep(0.2)
self.tn.write(bytes(password, 'ascii') + b"\r\n")
if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout):
# login succesful
from collections import deque
self.fifo = deque()
return
raise PyboardError('Failed to establish a telnet connection with the board')
def __del__(self):
self.close()
def close(self):
try:
self.tn.close()
except:
# the telnet object might not exist yet, so ignore this one
pass
def read(self, size=1):
while len(self.fifo) < size:
timeout_count = 0
data = self.tn.read_eager()
if len(data):
self.fifo.extend(data)
timeout_count = 0
else:
time.sleep(0.25)
if self.read_timeout is not None and timeout_count > 4 * self.read_timeout:
break
timeout_count += 1
data = b''
while len(data) < size and len(self.fifo) > 0:
data += bytes([self.fifo.popleft()])
return data
def write(self, data):
self.tn.write(data)
return len(data)
def inWaiting(self):
n_waiting = len(self.fifo)
if not n_waiting:
data = self.tn.read_eager()
self.fifo.extend(data)
return len(data)
else:
return n_waiting
class Pyboard:
def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rawdelay=0):
global _rawdelay
_rawdelay = rawdelay
if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
# device looks like an IP address
self.serial = TelnetToSerial(device, user, password, read_timeout=10)
else:
import serial
delayed = False
for attempt in range(wait + 1):
try:
self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1)
break
except (OSError, IOError): # Py2 and Py3 have different errors
if wait == 0:
continue
if attempt == 0:
sys.stdout.write(f'Waiting {wait} seconds for pyboard ')
delayed = True
time.sleep(1)
sys.stdout.write('.')
sys.stdout.flush()
else:
if delayed:
print('')
raise PyboardError('failed to access ' + device)
if delayed:
print('')
def close(self):
self.serial.close()
def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None):
data = self.serial.read(min_num_bytes)
if data_consumer:
data_consumer(data)
timeout_count = 0
while True:
if data.endswith(ending):
break
elif self.serial.inWaiting() > 0:
new_data = self.serial.read(1)
data = data + new_data
if data_consumer:
data_consumer(new_data)
timeout_count = 0
else:
timeout_count += 1
if timeout is not None and timeout_count >= 100 * timeout:
break
time.sleep(0.01)
return data
def send_ctrl_a(self):
self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL
def send_ctrl_b(self):
self.serial.write(b'\r\x02') # ctrl-B: exit raw REPL
def send_ctrl_c(self):
self.serial.write(b'\r\x03') # ctrl-C twice: interrupt any running program
time.sleep(0.1)
self.serial.write(b'\x03')
time.sleep(0.1)
def send_ctrl_d(self):
self.serial.write(b'\r\x04') # ctrl-D: soft reset
def enter_raw_repl(self):
# Brief delay before sending RAW MODE char if requests
if _rawdelay > 0:
time.sleep(_rawdelay)
# ctrl-C twice: interrupt any running program
self.serial.write(b'\r\x03')
time.sleep(0.1)
self.serial.write(b'\x03')
time.sleep(0.1)
# flush input (without relying on serial.flushInput())
n = self.serial.inWaiting()
while n > 0:
self.serial.read(n)
n = self.serial.inWaiting()
for retry in range(0, 5):
self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>')
if data.endswith(b'raw REPL; CTRL-B to exit\r\n>'):
break
else:
if retry >= 4:
print(data)
raise PyboardError('could not enter raw repl')
time.sleep(0.2)
self.serial.write(b'\x04') # ctrl-D: soft reset
data = self.read_until(1, b'soft reboot\r\n')
if not data.endswith(b'soft reboot\r\n'):
print(data)
raise PyboardError('could not enter raw repl')
# By splitting this into 2 reads, it allows boot.py to print stuff,
# which will show up after the soft reboot and before the raw REPL.
# Modification from original pyboard.py below:
# Add a small delay and send Ctrl-C twice after soft reboot to ensure
# any main program loop in main.py is interrupted.
time.sleep(0.5)
self.serial.write(b'\x03')
time.sleep(0.1) # (slight delay before second interrupt
self.serial.write(b'\x03')
# End modification above.
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n')
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'):
print(data)
raise PyboardError('could not enter raw repl')
def exit_raw_repl(self):
self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL
def follow(self, timeout, data_consumer=None):
# wait for normal output
data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer)
if not data.endswith(b'\x04'):
raise PyboardError('timeout waiting for first EOF reception')
data = data[:-1]
# wait for error output
data_err = self.read_until(1, b'\x04', timeout=timeout)
if not data_err.endswith(b'\x04'):
raise PyboardError('timeout waiting for second EOF reception')
data_err = data_err[:-1]
# return normal and error output
return data, data_err
def exec_raw_no_follow(self, command):
if isinstance(command, bytes):
command_bytes = command
else:
command_bytes = bytes(command, encoding='utf8')
# check we have a prompt
data = self.read_until(1, b'>')
if not data.endswith(b'>'):
raise PyboardError('could not enter raw repl')
# write command
for i in range(0, len(command_bytes), 256):
self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))])
time.sleep(0.01)
self.serial.write(b'\x04')
# check if we could exec command
data = self.serial.read(2)
if data != b'OK':
raise PyboardError('could not exec command')
def exec_raw(self, command, timeout=10, data_consumer=None):
self.exec_raw_no_follow(command)
return self.follow(timeout, data_consumer)
def eval(self, expression):
ret = self.exec_(f'print({expression})')
ret = ret.strip()
return ret
def exec_(self, command, stream_output=False):
data_consumer = None
if stream_output:
data_consumer = stdout_write_bytes
ret, ret_err = self.exec_raw(command, data_consumer=data_consumer)
if ret_err:
raise PyboardError('exception', ret, ret_err)
return ret
def execfile(self, filename, stream_output=False):
with open(filename, 'rb') as f:
pyfile = f.read()
return self.exec_(pyfile, stream_output=stream_output)
def get_time(self):
t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ')
return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6])
# in Python2 exec is a keyword so one must use "exec_"
# but for Python3 we want to provide the nicer version "exec"
setattr(Pyboard, "exec", Pyboard.exec_)
def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'):
pyb = Pyboard(device, baudrate, user, password)
pyb.enter_raw_repl()
output = pyb.execfile(filename)
stdout_write_bytes(output)
pyb.exit_raw_repl()
pyb.close()
# def main():
# import argparse
# cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.')
# cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard')
# cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device')
# cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username')
# cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password')
# cmd_parser.add_argument('-c', '--command', help='program passed in as string')
# cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available')
# cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]')
# cmd_parser.add_argument('files', nargs='*', help='input files')
# args = cmd_parser.parse_args()
# def execbuffer(buf):
# try:
# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
# pyb.enter_raw_repl()
# ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes)
# pyb.exit_raw_repl()
# pyb.close()
# except PyboardError as er:
# print(er)
# sys.exit(1)
# except KeyboardInterrupt:
# sys.exit(1)
# if ret_err:
# stdout_write_bytes(ret_err)
# sys.exit(1)
# if args.command is not None:
# execbuffer(args.command.encode('utf-8'))
# for filename in args.files:
# with open(filename, 'rb') as f:
# pyfile = f.read()
# execbuffer(pyfile)
# if args.follow or (args.command is None and len(args.files) == 0):
# try:
# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
# ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes)
# pyb.close()
# except PyboardError as er:
# print(er)
# sys.exit(1)
# except KeyboardInterrupt:
# sys.exit(1)
# if ret_err:
# stdout_write_bytes(ret_err)
# sys.exit(1)
BUFFER_SIZE = 512 # Amount of data to read or write to the serial port at a time.
# This is kept small because small chips and USB to serial
# bridges usually have very small buffers.
class DirectoryExistsError(Exception):
pass
class Files(object):
"""Class to interact with a MicroPython board files over a serial connection.
Provides functions for listing, uploading, and downloading files from the
board's filesystem.
"""
__raw_repl_on: bool = False
def __init__(self, pyboard: Pyboard):
"""Initialize the MicroPython board files class using the provided pyboard
instance. In most cases you should create a Pyboard instance (from
pyboard.py) which connects to a board over a serial connection and pass
it in, but you can pass in other objects for testing, etc.
"""
self._pyboard = pyboard
def is_raw_repl_on(self):
return self.__raw_repl_on
def send_ctrl_a(self):
self._pyboard.send_ctrl_a() # ctrl-A: enter raw REPL
def send_ctrl_b(self):
self._pyboard.send_ctrl_b() # ctrl-B: exit raw REPL
def send_ctrl_c(self):
self._pyboard.send_ctrl_c() # ctrl-C twice: interrupt any running program
def send_ctrl_d(self):
self._pyboard.send_ctrl_d() # ctrl-D: soft reset
def ls(self, directory="/", long_format=False, recursive=True):
"""List the contents of the specified directory (or root if none is
specified). Returns a list of strings with the names of files in the
specified directory. If long_format is True then a list of 2-tuples
with the name and size (in bytes) of the item is returned. Note that
it appears the size of directories is not supported by MicroPython and
will always return 0 (i.e. no recursive size computation).
"""
# Disabling for now, see https://github.com/adafruit/ampy/issues/55.
# # Make sure directory ends in a slash.
# if not directory.endswith("/"):
# directory += "/"
# Make sure directory starts with slash, for consistency.
# if not directory.startswith("/"):
# directory = "/" + directory
command = """\
try:
import os
except ImportError:
import uos as os\n"""
if recursive:
command += """\
def listdir(directory):
result = set()
def _listdir(dir_or_file):
try:
# if its a directory, then it should provide some children.
children = os.listdir(dir_or_file)
except OSError:
# probably a file. run stat() to confirm.
os.stat(dir_or_file)
result.add(dir_or_file)
else:
# probably a directory, add to result if empty.
if children:
# queue the children to be dealt with in next iteration.
for child in children:
# create the full path.
if dir_or_file == '/':
next = dir_or_file + child
else:
next = dir_or_file + '/' + child
_listdir(next)
else:
result.add(dir_or_file)
_listdir(directory)
return sorted(result)\n"""
else:
command += """\
def listdir(directory):
if directory == '/':
return sorted([directory + f for f in os.listdir(directory)])
else:
return sorted([directory + '/' + f for f in os.listdir(directory)])\n"""
# Execute os.listdir() command on the board.
if long_format:
command += f"""
r = []
for f in listdir('{directory}'):
size = os.stat(f)[6]
r.append(f'{{f}} ({{size}}b)')
print(r)
"""
else:
command += f"""
print(listdir('{directory}'))
"""
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
output = self._pyboard.exec_(textwrap.dedent(command))
except PyboardError as ex:
# Check if this is an OSError #2, i.e. directory doesn't exist and
# rethrow it as something more descriptive.
if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1:
raise RuntimeError(f"No such directory: {directory}")
else:
raise ex
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
# Parse the result list and return it.
return ast.literal_eval(output.decode("utf-8"))
def mkdir(self, directory, exists_okay=False):
"""Create the specified directory. Note this cannot create a recursive
hierarchy of directories, instead each one should be created separately.
"""
# Execute os.mkdir command on the board.
command = f"""
try:
import os
except ImportError:
import uos as os
os.mkdir('{directory}')
"""
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
self._pyboard.exec_(textwrap.dedent(command))
except PyboardError as ex:
# Check if this is an OSError #17, i.e. directory already exists.
if ex.args[2].decode("utf-8").find("OSError: [Errno 17] EEXIST") != -1:
if not exists_okay:
raise DirectoryExistsError(f"Directory already exists: {directory}")
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
def put(self, filename, data):
"""Create or update the specified file with the provided data.
"""
# Open the file for writing on the board and write chunks of data.
if filename.startswith('/'):
filename = filename[1:]
print(f'{filename}', end='', flush=True)
size = len(data)
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
self._pyboard.exec_(f"f = open('{filename}', 'wb')")
# Loop through and write a buffer size chunk of data at a time.
for i in range(0, size, BUFFER_SIZE):
chunk_size = min(BUFFER_SIZE, size - i)
chunk = repr(data[i : i + chunk_size])
# Make sure to send explicit byte strings (handles python 2 compatibility).
if not chunk.startswith("b"):
chunk = "b" + chunk
self._pyboard.exec_(f"f.write({chunk})")
self._pyboard.exec_("f.close()")
2022-12-29 18:09:17 +01:00
except Exception as e:
2022-12-29 17:20:41 +01:00
print(' [x]')
2022-12-29 18:09:17 +01:00
message = str(e)
if message.find('OSError: [Errno 2] ENOENT') != -1:
reason = f'"{filename}" does not exists!'
elif message.find('OSError: [Errno 13] EACCES') != -1:
reason = f'"{filename}" access denied!'
elif message.find('OSError: [Errno 21] EISDIR') != -1:
reason = f'"{filename}" is a directory!'
else:
reason = 'Traceback (most recent call last):'
traceback = message.split(reason)
if len(traceback) == 2:
for call in traceback[1][:-2].split('\\r\\n'):
reason += f'{call}\n'
raise Exception(reason.strip())
2022-12-29 17:20:41 +01:00
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
print(f' - {size}b [✓]')
return size
def get(self, filename):
"""Retrieve the contents of the specified file and return its contents
as a byte string.
"""
# Open the file and read it a few bytes at a time and print out the
# raw bytes. Be careful not to overload the UART buffer so only write
# a few bytes at a time, and don't use print since it adds newlines and
# expects string data.
command = f"""
import sys
import ubinascii
with open('{filename}', 'rb') as infile:
while True:
result = infile.read({BUFFER_SIZE})
if result == b'':
break
len = sys.stdout.write(ubinascii.hexlify(result))
"""
print(f'{filename}', end='', flush=True)
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
output = self._pyboard.exec_(textwrap.dedent(command))
except PyboardError as ex:
print(' [x]')
# Check if this is an OSError #2, i.e. file doesn't exist and
# rethrow it as something more descriptive.
try:
if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1:
raise RuntimeError(f"No such file: {filename}")
else:
raise ex
except UnicodeDecodeError:
raise ex
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
print(f' - {int(len(output) / 2)}b [✓]')
return binascii.unhexlify(output)
def rm(self, filename):
"""Remove the specified file or directory."""
command = f"""
try:
import os
except ImportError:
import uos as os
os.remove('{filename}')
"""
2022-12-29 18:09:17 +01:00
print(f' {filename}', end='')
2022-12-29 17:20:41 +01:00
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
self._pyboard.exec_(textwrap.dedent(command))
except PyboardError as ex:
print(' [x]')
message = ex.args[2].decode("utf-8")
# Check if this is an OSError #2, i.e. file/directory doesn't exist
# and rethrow it as something more descriptive.
if message.find("OSError: [Errno 2] ENOENT") != -1:
raise RuntimeError(f"No such file/directory: {filename}")
# Check for OSError #13, the directory isn't empty.
if message.find("OSError: [Errno 13] EACCES") != -1:
raise RuntimeError(f"Directory is not empty: {filename}")
else:
raise ex
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
print(' [✓]')
def rmdir(self, directory, missing_okay=False):
"""Forcefully remove the specified directory and all its children."""
# Build a script to walk an entire directory structure and delete every
# file and subfolder. This is tricky because MicroPython has no os.walk
# or similar function to walk folders, so this code does it manually
# with recursion and changing directories. For each directory it lists
# the files and deletes everything it can, i.e. all the files. Then
# it lists the files again and assumes they are directories (since they
# couldn't be deleted in the first pass) and recursively clears those
# subdirectories. Finally when finished clearing all the children the
# parent directory is deleted.
command = f"""
try:
import os
except ImportError:
import uos as os
def rmdir(directory):
os.chdir(directory)
for f in os.listdir():
try:
os.remove(f)
except OSError:
pass
for f in os.listdir():
rmdir(f)
os.chdir('..')
os.rmdir(directory)
rmdir('{directory}')
"""
2022-12-29 18:09:17 +01:00
print(f' {directory}', end='')
2022-12-29 17:20:41 +01:00
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
self._pyboard.exec_(textwrap.dedent(command))
except PyboardError as ex:
message = ex.args[2].decode("utf-8")
# Check if this is an OSError #2, i.e. directory doesn't exist
# and rethrow it as something more descriptive.
if message.find("OSError: [Errno 2] ENOENT") != -1:
if not missing_okay:
print(' [x]')
raise RuntimeError(f"No such directory: {directory}")
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
print(' [✓]')
def run(self, filename, wait_output=True, stream_output=True):
"""Run the provided script and return its output. If wait_output is True
(default) then wait for the script to finish and then return its output,
otherwise just run the script and don't wait for any output.
If stream_output is True(default) then return None and print outputs to
stdout without buffering.
"""
output = None
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
if stream_output:
self._pyboard.execfile(filename, stream_output=True)
elif wait_output:
# Run the file and wait for output to return.
output = self._pyboard.execfile(filename)
else:
# Read the file and run it using lower level pyboard functions that
# won't wait for it to finish or return output.
with open(filename, "rb") as infile:
self._pyboard.exec_raw_no_follow(infile.read())
except Exception as e:
raise e
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
return output
def run_on_board(self, filename, wait_output=True, stream_output=True):
"""Run the provided script and return its output. If wait_output is True
(default) then wait for the script to finish and then return its output,
otherwise just run the script and don't wait for any output.
If stream_output is True(default) then return None and print outputs to
stdout without buffering.
"""
command = textwrap.dedent(f'exec(open("{filename}").read())')
output = None
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
if stream_output:
self._pyboard.exec_(command, stream_output=True)
elif wait_output:
# Run the file and wait for output to return.
output = self._pyboard.exec_(command)
else:
# Read the file and run it using lower level pyboard functions that
# won't wait for it to finish or return output.
self._pyboard.exec_raw_no_follow(command)
except Exception as e:
message = str(e)
if message.find('OSError: [Errno 2] ENOENT') != -1:
reason = f'"{filename}" does not exists!'
elif message.find('OSError: [Errno 13] EACCES') != -1:
reason = f'"{filename}" access denied!'
elif message.find('OSError: [Errno 21] EISDIR') != -1:
reason = f'"{filename}" is a directory!'
else:
reason = 'Traceback (most recent call last):'
traceback = message.split(reason)
if len(traceback) == 2:
for call in traceback[1][:-2].split('\\r\\n'):
reason += f'{call}\n'
raise Exception(reason.strip())
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
return output
def exec(self, command):
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
output = self._pyboard.exec_(textwrap.dedent(command), stream_output=True)
except Exception as e:
message = str(e)
reason = 'Traceback (most recent call last):'
traceback = message.split(reason)
if len(traceback) == 2:
for call in traceback[1][:-2].split('\\r\\n'):
reason += f'{call}\n'
raise Exception(reason.strip())
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
return output
2022-12-29 16:37:55 +01:00
print('Welcome to picowatch lib.')
pico = False
while not pico:
print('-' * 30)
device = input('Port: ').strip()
baudrate = input('Baudrate (115200): ').strip() or 115200
try:
pico = Files(Pyboard(device=device, baudrate=baudrate))
print(f'Connected to device: {device} at a baudrate of: {baudrate}')
print('-' * 30)
except Exception as e:
print(f'Exception: {str(e)}')
pico.send_ctrl_c()
WATCHING_DIRECTORY = './'
def upload(source: str = '', destination: str = ''):
real_source = os.path.join(WATCHING_DIRECTORY, *source.split('/'))
if not destination:
destination = source
destination = '/'.join(destination.split('\\'))
try:
if os.path.isdir(real_source):
for root, dirs, files in os.walk(real_source, followlinks=True):
droot = '/'.join(root.replace(WATCHING_DIRECTORY, '').split(os.sep))
for dir in dirs:
pico.mkdir('/'.join([droot, dir]), exists_okay=True)
for filename in files:
with open(os.path.join(root, filename), 'rb') as fh:
pico.put('/'.join([droot, filename]), fh.read())
time.sleep(.5)
elif os.path.exists(real_source):
with open(real_source, 'rb') as fh:
pico.put(destination, fh.read())
time.sleep(.5)
except Exception as e:
2022-12-29 18:09:17 +01:00
print(f'Exception: {str(e)}')
2022-12-29 16:37:55 +01:00
def download(source: str = '/'):
if not source.startswith('/'):
source = f'/{source}'
if len(source) > 1 and source.endswith('/'):
source = source[:-1]
try:
for filename in pico.ls(directory=source, long_format=False, recursive=True):
filename = filename[1:]
if filename.startswith('.'):
continue
if os.path.dirname(filename):
os.makedirs(os.path.dirname(filename), mode=777, exist_ok=True)
with open(os.path.join(WATCHING_DIRECTORY, *filename.split('/')), 'wb') as fh:
fh.write(pico.get(filename))
time.sleep(.5)
except Exception as e:
2022-12-29 18:09:17 +01:00
print(f'Exception: {str(e)}')
2022-12-29 16:37:55 +01:00
def contents(filename: str):
try:
for ln in pico.get(filename).decode('utf-8').split('\n'):
print(ln)
except Exception as e:
2022-12-29 18:09:17 +01:00
print(f'Exception: {str(e)}')
2022-12-29 16:37:55 +01:00
def delete(source: str):
try:
if source.endswith('/'):
pico.rmdir(source, missing_okay=True)
else:
pico.rm(source)
except Exception as e:
2022-12-29 18:09:17 +01:00
print(f'Exception: {str(e)}')
2022-12-29 16:37:55 +01:00
def ls(source: str = '/'):
try:
if len(source) > 1 and source.endswith('/'):
source = source[:-1]
for filename in pico.ls(directory=source, long_format=True, recursive=True):
if filename.startswith('/'):
filename = filename[1:]
if len(filename) > 5:
2022-12-29 17:41:23 +01:00
print('', filename)
2022-12-29 16:37:55 +01:00
except Exception as e:
2022-12-29 18:09:17 +01:00
print(f'Exception: {str(e)}')
2022-12-29 16:37:55 +01:00
def launch(filename: str = 'main.py'):
try:
pico.run_on_board(filename)
except Exception as e:
2022-12-29 18:09:17 +01:00
print(f'Exception: {str(e)}')
2022-12-29 16:37:55 +01:00
sessions = {'deleted': set(), 'modified': set()}
def on_modified_callback(event):
if event.is_directory == True:
return
source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/')
sessions['modified'].add(source)
def on_deleted_callback(event):
source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/')
if event.is_directory == True and not source.endswith('/'):
source += '/'
elif len(source.split('.')) == 1:
source += '/'
sessions['deleted'].add(source)
def watchdog_callback():
for source in sessions['deleted']:
delete(source)
for source in sessions['modified']:
upload(source)
sessions['deleted'] = set()
sessions['modified'] = set()
signal.signal(signal.SIGINT, lambda a, b: pico.send_ctrl_c())
watchdog_event = PatternMatchingEventHandler(
patterns = ['*'],
ignore_patterns = None,
ignore_directories = False,
case_sensitive = True
)
watchdog_event.on_modified = on_modified_callback
watchdog_event.on_deleted = on_deleted_callback
watchdog = Observer()
watchdog.schedule(watchdog_event, path = WATCHING_DIRECTORY, recursive = True)
watchdog.start()
2022-12-29 17:20:41 +01:00
2022-12-29 16:37:55 +01:00
try:
while True:
2022-12-29 17:20:41 +01:00
try:
2022-12-29 16:37:55 +01:00
message = input('>>> ').strip()
while pico.is_raw_repl_on():
time.sleep(.1)
match message.split(' '):
case ['0' | 'exit']:
sys.exit()
case ['reboot' | 'reset']:
pico.send_ctrl_d()
2022-12-29 17:50:25 +01:00
case ['ls' | 'stat', *source]:
ls(source[0] if source else '/')
2022-12-29 16:37:55 +01:00
case ['cat' | 'open' | 'contents', source]:
contents(source)
case ['del' | 'delete' | 'remove', source]:
delete(source)
case ['upl' | 'upload' | 'update', source]:
upload(source)
case ['download' | 'backup', source]:
download(source)
2022-12-29 17:50:25 +01:00
case ['' | 'save' | 'commit']:
2022-12-29 16:37:55 +01:00
watchdog_callback()
2022-12-29 17:50:25 +01:00
case ['status' | 'staged']:
2022-12-29 17:41:23 +01:00
for filename in sessions['deleted']:
2022-12-29 18:09:17 +01:00
print('', filename)
2022-12-29 17:41:23 +01:00
for filename in sessions['modified']:
2022-12-29 18:09:17 +01:00
print('+', filename)
2022-12-29 17:50:25 +01:00
case ['cancel' | 'unstaged']:
2022-12-29 17:41:23 +01:00
sessions['deleted'] = set()
sessions['modified'] = set()
2022-12-29 16:37:55 +01:00
case _:
if message.startswith('./'):
launch(message[2:])
elif message:
print(f'"{message}" is not recognized.')
except Exception as e:
print(f'Exception: {str(e)}')
except KeyboardInterrupt:
watchdog.stop()
watchdog.join()