Files
primadiag-pybash/picowatch_d/picowatch.py
2022-12-29 20:08:50 +01:00

983 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
"""
pyboard interface
This module provides the Pyboard class, used to communicate with and
control the pyboard over a serial USB connection.
Example usage:
import pyboard
pyb = pyboard.Pyboard('/dev/ttyACM0')
Or:
pyb = pyboard.Pyboard('192.168.1.1')
Then:
pyb.enter_raw_repl()
pyb.exec('pyb.LED(1).on()')
pyb.exit_raw_repl()
Note: if using Python2 then pyb.exec must be written as pyb.exec_.
To run a script from the local machine on the board and print out the results:
import pyboard
pyboard.execfile('test.py', device='/dev/ttyACM0')
This script can also be run directly. To execute a local script, use:
./pyboard.py test.py
Or:
python pyboard.py test.py
"""
# Adafruit MicroPython Tool - File Operations
# Author: Tony DiCola
# Copyright (c) 2016 Adafruit Industries
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import sys
import ast
import time
import textwrap
import binascii
import os
import signal
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
_rawdelay = None
try:
stdout = sys.stdout.buffer
except AttributeError:
# Python2 doesn't have buffer attr
stdout = sys.stdout
def stdout_write_bytes(b):
b = b.replace(b"\x04", b"")
stdout.write(b)
stdout.flush()
class PyboardError(Exception):
pass
class TelnetToSerial:
def __init__(self, ip, user, password, read_timeout=None):
import telnetlib
self.tn = telnetlib.Telnet(ip, timeout=15)
self.read_timeout = read_timeout
if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout):
self.tn.write(bytes(user, 'ascii') + b"\r\n")
if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout):
# needed because of internal implementation details of the telnet server
time.sleep(0.2)
self.tn.write(bytes(password, 'ascii') + b"\r\n")
if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout):
# login succesful
from collections import deque
self.fifo = deque()
return
raise PyboardError('Failed to establish a telnet connection with the board')
def __del__(self):
self.close()
def close(self):
try:
self.tn.close()
except:
# the telnet object might not exist yet, so ignore this one
pass
def read(self, size=1):
while len(self.fifo) < size:
timeout_count = 0
data = self.tn.read_eager()
if len(data):
self.fifo.extend(data)
timeout_count = 0
else:
time.sleep(0.25)
if self.read_timeout is not None and timeout_count > 4 * self.read_timeout:
break
timeout_count += 1
data = b''
while len(data) < size and len(self.fifo) > 0:
data += bytes([self.fifo.popleft()])
return data
def write(self, data):
self.tn.write(data)
return len(data)
def inWaiting(self):
n_waiting = len(self.fifo)
if not n_waiting:
data = self.tn.read_eager()
self.fifo.extend(data)
return len(data)
else:
return n_waiting
class Pyboard:
def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rawdelay=0):
global _rawdelay
_rawdelay = rawdelay
if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
# device looks like an IP address
self.serial = TelnetToSerial(device, user, password, read_timeout=10)
else:
import serial
delayed = False
for attempt in range(wait + 1):
try:
self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1)
break
except (OSError, IOError): # Py2 and Py3 have different errors
if wait == 0:
continue
if attempt == 0:
sys.stdout.write(f'Waiting {wait} seconds for pyboard ')
delayed = True
time.sleep(1)
sys.stdout.write('.')
sys.stdout.flush()
else:
if delayed:
print('')
raise PyboardError('failed to access ' + device)
if delayed:
print('')
def close(self):
self.serial.close()
def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None):
data = self.serial.read(min_num_bytes)
if data_consumer:
data_consumer(data)
timeout_count = 0
while True:
if data.endswith(ending):
break
elif self.serial.inWaiting() > 0:
new_data = self.serial.read(1)
data = data + new_data
if data_consumer:
data_consumer(new_data)
timeout_count = 0
else:
timeout_count += 1
if timeout is not None and timeout_count >= 100 * timeout:
break
time.sleep(0.01)
return data
def send_ctrl_a(self):
self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL
def send_ctrl_b(self):
self.serial.write(b'\r\x02') # ctrl-B: exit raw REPL
def send_ctrl_c(self):
self.serial.write(b'\r\x03') # ctrl-C twice: interrupt any running program
time.sleep(0.1)
self.serial.write(b'\x03')
time.sleep(0.1)
def send_ctrl_d(self):
self.serial.write(b'\r\x04') # ctrl-D: soft reset
def enter_raw_repl(self):
# Brief delay before sending RAW MODE char if requests
if _rawdelay > 0:
time.sleep(_rawdelay)
# ctrl-C twice: interrupt any running program
self.serial.write(b'\r\x03')
time.sleep(0.1)
self.serial.write(b'\x03')
time.sleep(0.1)
# flush input (without relying on serial.flushInput())
n = self.serial.inWaiting()
while n > 0:
self.serial.read(n)
n = self.serial.inWaiting()
for retry in range(0, 5):
self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>')
if data.endswith(b'raw REPL; CTRL-B to exit\r\n>'):
break
else:
if retry >= 4:
print(data)
raise PyboardError('could not enter raw repl')
time.sleep(0.2)
self.serial.write(b'\x04') # ctrl-D: soft reset
data = self.read_until(1, b'soft reboot\r\n')
if not data.endswith(b'soft reboot\r\n'):
print(data)
raise PyboardError('could not enter raw repl')
# By splitting this into 2 reads, it allows boot.py to print stuff,
# which will show up after the soft reboot and before the raw REPL.
# Modification from original pyboard.py below:
# Add a small delay and send Ctrl-C twice after soft reboot to ensure
# any main program loop in main.py is interrupted.
time.sleep(0.5)
self.serial.write(b'\x03')
time.sleep(0.1) # (slight delay before second interrupt
self.serial.write(b'\x03')
# End modification above.
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n')
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'):
print(data)
raise PyboardError('could not enter raw repl')
def exit_raw_repl(self):
self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL
def follow(self, timeout, data_consumer=None):
# wait for normal output
data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer)
if not data.endswith(b'\x04'):
raise PyboardError('timeout waiting for first EOF reception')
data = data[:-1]
# wait for error output
data_err = self.read_until(1, b'\x04', timeout=timeout)
if not data_err.endswith(b'\x04'):
raise PyboardError('timeout waiting for second EOF reception')
data_err = data_err[:-1]
# return normal and error output
return data, data_err
def exec_raw_no_follow(self, command):
if isinstance(command, bytes):
command_bytes = command
else:
command_bytes = bytes(command, encoding='utf8')
# check we have a prompt
data = self.read_until(1, b'>')
if not data.endswith(b'>'):
raise PyboardError('could not enter raw repl')
# write command
for i in range(0, len(command_bytes), 256):
self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))])
time.sleep(0.01)
self.serial.write(b'\x04')
# check if we could exec command
data = self.serial.read(2)
if data != b'OK':
raise PyboardError('could not exec command')
def exec_raw(self, command, timeout=10, data_consumer=None):
self.exec_raw_no_follow(command)
return self.follow(timeout, data_consumer)
def eval(self, expression):
ret = self.exec_(f'print({expression})')
ret = ret.strip()
return ret
def exec_(self, command, stream_output=False):
data_consumer = None
if stream_output:
data_consumer = stdout_write_bytes
ret, ret_err = self.exec_raw(command, data_consumer=data_consumer)
if ret_err:
raise PyboardError('exception', ret, ret_err)
return ret
def execfile(self, filename, stream_output=False):
with open(filename, 'rb') as f:
pyfile = f.read()
return self.exec_(pyfile, stream_output=stream_output)
def get_time(self):
t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ')
return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6])
# in Python2 exec is a keyword so one must use "exec_"
# but for Python3 we want to provide the nicer version "exec"
setattr(Pyboard, "exec", Pyboard.exec_)
def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'):
pyb = Pyboard(device, baudrate, user, password)
pyb.enter_raw_repl()
output = pyb.execfile(filename)
stdout_write_bytes(output)
pyb.exit_raw_repl()
pyb.close()
# def main():
# import argparse
# cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.')
# cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard')
# cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device')
# cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username')
# cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password')
# cmd_parser.add_argument('-c', '--command', help='program passed in as string')
# cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available')
# cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]')
# cmd_parser.add_argument('files', nargs='*', help='input files')
# args = cmd_parser.parse_args()
# def execbuffer(buf):
# try:
# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
# pyb.enter_raw_repl()
# ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes)
# pyb.exit_raw_repl()
# pyb.close()
# except PyboardError as er:
# print(er)
# sys.exit(1)
# except KeyboardInterrupt:
# sys.exit(1)
# if ret_err:
# stdout_write_bytes(ret_err)
# sys.exit(1)
# if args.command is not None:
# execbuffer(args.command.encode('utf-8'))
# for filename in args.files:
# with open(filename, 'rb') as f:
# pyfile = f.read()
# execbuffer(pyfile)
# if args.follow or (args.command is None and len(args.files) == 0):
# try:
# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
# ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes)
# pyb.close()
# except PyboardError as er:
# print(er)
# sys.exit(1)
# except KeyboardInterrupt:
# sys.exit(1)
# if ret_err:
# stdout_write_bytes(ret_err)
# sys.exit(1)
BUFFER_SIZE = 512 # Amount of data to read or write to the serial port at a time.
# This is kept small because small chips and USB to serial
# bridges usually have very small buffers.
class DirectoryExistsError(Exception):
pass
class Files(object):
"""Class to interact with a MicroPython board files over a serial connection.
Provides functions for listing, uploading, and downloading files from the
board's filesystem.
"""
__raw_repl_on: bool = False
def __init__(self, pyboard: Pyboard):
"""Initialize the MicroPython board files class using the provided pyboard
instance. In most cases you should create a Pyboard instance (from
pyboard.py) which connects to a board over a serial connection and pass
it in, but you can pass in other objects for testing, etc.
"""
self._pyboard = pyboard
def is_raw_repl_on(self):
return self.__raw_repl_on
def send_ctrl_a(self):
self._pyboard.send_ctrl_a() # ctrl-A: enter raw REPL
def send_ctrl_b(self):
self._pyboard.send_ctrl_b() # ctrl-B: exit raw REPL
def send_ctrl_c(self):
self._pyboard.send_ctrl_c() # ctrl-C twice: interrupt any running program
def send_ctrl_d(self):
self._pyboard.send_ctrl_d() # ctrl-D: soft reset
def handle_traceback(self, e: Exception):
message = e.args[2].decode('utf-8')
oserror = message.split('OSError:')
if len(oserror) == 2:
reason = oserror[1].strip()
if reason == '-2':
reason = '[Errno -2] EUNKNOWN'
if reason == '39':
reason = '[Errno 39] ENEPTY'
else:
reason = 'Traceback (most recent call last):'
traceback = message.split(reason)
if len(traceback) == 2:
for call in traceback[1][:-2].split('\\r\\n'):
reason += f'{call}\n'
raise Exception(reason.strip())
def ls(self, directory="/", long_format=False, recursive=True):
"""List the contents of the specified directory (or root if none is
specified). Returns a list of strings with the names of files in the
specified directory. If long_format is True then a list of 2-tuples
with the name and size (in bytes) of the item is returned. Note that
it appears the size of directories is not supported by MicroPython and
will always return 0 (i.e. no recursive size computation).
"""
# Disabling for now, see https://github.com/adafruit/ampy/issues/55.
# # Make sure directory ends in a slash.
# if not directory.endswith("/"):
# directory += "/"
# Make sure directory starts with slash, for consistency.
# if not directory.startswith("/"):
# directory = "/" + directory
command = """\
try:
import os
except ImportError:
import uos as os\n"""
if recursive:
command += """\
def listdir(directory):
result = set()
def _listdir(dir_or_file):
try:
# if its a directory, then it should provide some children.
children = os.listdir(dir_or_file)
except OSError:
# probably a file. run stat() to confirm.
os.stat(dir_or_file)
result.add(dir_or_file)
else:
# probably a directory, add to result if empty.
if children:
# queue the children to be dealt with in next iteration.
for child in children:
# create the full path.
if dir_or_file == '/':
next = dir_or_file + child
else:
next = dir_or_file + '/' + child
_listdir(next)
else:
result.add(dir_or_file)
_listdir(directory)
return sorted(result)\n"""
else:
command += """\
def listdir(directory):
if directory == '/':
return sorted([directory + f for f in os.listdir(directory)])
else:
return sorted([directory + '/' + f for f in os.listdir(directory)])\n"""
# Execute os.listdir() command on the board.
if long_format:
command += f"""
r = []
for f in listdir('{directory}'):
size = os.stat(f)[6]
r.append(f'{{f}} - {{size}}b')
print(r)
"""
else:
command += f"""
print(listdir('{directory}'))
"""
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
output = self._pyboard.exec_(textwrap.dedent(command))
except Exception as e:
self.handle_traceback(e)
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
# Parse the result list and return it.
return ast.literal_eval(output.decode("utf-8"))
def mkdir(self, directory, exists_okay=True):
"""Create the specified directory. Note this cannot create a recursive
hierarchy of directories, instead each one should be created separately.
"""
# Execute os.mkdir command on the board.
command = f"""
try:
import os
except ImportError:
import uos as os
os.mkdir('{directory}')
"""
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
self._pyboard.exec_(textwrap.dedent(command))
except Exception as e:
if exists_okay == False:
self.handle_traceback(e)
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
def put(self, filename, data):
"""Create or update the specified file with the provided data.
"""
# Open the file for writing on the board and write chunks of data.
if filename.startswith('/'):
filename = filename[1:]
print(f'+ {filename}', end='', flush=True)
size = len(data)
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
self._pyboard.exec_(f"f = open('{filename}', 'wb')")
# Loop through and write a buffer size chunk of data at a time.
for i in range(0, size, BUFFER_SIZE):
chunk_size = min(BUFFER_SIZE, size - i)
chunk = repr(data[i : i + chunk_size])
# Make sure to send explicit byte strings (handles python 2 compatibility).
if not chunk.startswith("b"):
chunk = "b" + chunk
self._pyboard.exec_(f"f.write({chunk})")
self._pyboard.exec_("f.close()")
except Exception as e:
print(' [x]')
self.handle_traceback(e)
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
print(f' - {size}b [✓]')
return size
def get(self, filename):
"""Retrieve the contents of the specified file and return its contents
as a byte string.
"""
# Open the file and read it a few bytes at a time and print out the
# raw bytes. Be careful not to overload the UART buffer so only write
# a few bytes at a time, and don't use print since it adds newlines and
# expects string data.
command = f"""
import sys
import ubinascii
with open('{filename}', 'rb') as infile:
while True:
result = infile.read({BUFFER_SIZE})
if result == b'':
break
len = sys.stdout.write(ubinascii.hexlify(result))
"""
print(f'{filename}', end='', flush=True)
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
output = self._pyboard.exec_(textwrap.dedent(command))
except PyboardError as e:
print(' [x]')
self.handle_traceback(e)
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
print(f' {int(len(output) / 2)}b [✓]')
return binascii.unhexlify(output)
def rm(self, filename):
"""Remove the specified file or directory."""
command = f"""
try:
import os
except ImportError:
import uos as os
os.remove('{filename}')
"""
print(f' {filename}', end='')
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
self._pyboard.exec_(textwrap.dedent(command))
except Exception as e:
print(' [x]')
self.handle_traceback(e)
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
print(' [✓]')
def rmdir(self, directory, missing_okay=False):
"""Forcefully remove the specified directory and all its children."""
# Build a script to walk an entire directory structure and delete every
# file and subfolder. This is tricky because MicroPython has no os.walk
# or similar function to walk folders, so this code does it manually
# with recursion and changing directories. For each directory it lists
# the files and deletes everything it can, i.e. all the files. Then
# it lists the files again and assumes they are directories (since they
# couldn't be deleted in the first pass) and recursively clears those
# subdirectories. Finally when finished clearing all the children the
# parent directory is deleted.
command = f"""
try:
import os
except ImportError:
import uos as os
def rmdir(directory):
os.chdir(directory)
for f in os.listdir():
try:
os.remove(f)
except OSError:
pass
for f in os.listdir():
rmdir(f)
os.chdir('..')
os.rmdir(directory)
rmdir('{directory}')
"""
print(f' {directory}', end='')
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
self._pyboard.exec_(textwrap.dedent(command))
except Exception as e:
print(' [x]')
self.handle_traceback(e)
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
print(' [✓]')
def run_on_board(self, filename, wait_output=True, stream_output=True):
"""Run the provided script and return its output. If wait_output is True
(default) then wait for the script to finish and then return its output,
otherwise just run the script and don't wait for any output.
If stream_output is True(default) then return None and print outputs to
stdout without buffering.
"""
command = textwrap.dedent(f'exec(open("{filename}").read())')
output = None
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
if stream_output:
self._pyboard.exec_(command, stream_output=True)
elif wait_output:
# Run the file and wait for output to return.
output = self._pyboard.exec_(command)
else:
# Read the file and run it using lower level pyboard functions that
# won't wait for it to finish or return output.
self._pyboard.exec_raw_no_follow(command)
except Exception as e:
self.handle_traceback(e)
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
return output
def exec(self, command):
try:
self.__raw_repl_on = True
self._pyboard.enter_raw_repl()
output = self._pyboard.exec_(textwrap.dedent(command), stream_output=True)
except Exception as e:
self.handle_traceback(e)
finally:
self._pyboard.exit_raw_repl()
self.__raw_repl_on = False
return output
print('Welcome to picowatch lib.')
pico = False
while not pico:
print('-' * 30)
device = input('Port: ').strip()
baudrate = input('Baudrate (115200): ').strip() or 115200
try:
pico = Files(Pyboard(device=device, baudrate=baudrate))
print(f'Connected to device: {device} at a baudrate of: {baudrate}')
print('-' * 30)
except Exception as e:
print(str(e))
pico.send_ctrl_c()
WATCHING_DIRECTORY = './'
def upload(source: str = '', destination: str = ''):
real_source = os.path.join(WATCHING_DIRECTORY, *source.split('/'))
if not destination:
destination = source
destination = '/'.join(destination.split('\\'))
try:
if os.path.isdir(real_source):
for root, dirs, files in os.walk(real_source, followlinks=True):
droot = '/'.join(root.replace(WATCHING_DIRECTORY, '').split(os.sep))
for dir in dirs:
pico.mkdir('/'.join([droot, dir]), exists_okay=True)
for filename in files:
with open(os.path.join(root, filename), 'rb') as fh:
pico.put('/'.join([droot, filename]), fh.read())
time.sleep(.5)
elif os.path.exists(real_source):
with open(real_source, 'rb') as fh:
pico.put(destination, fh.read())
time.sleep(.5)
except Exception as e:
print(str(e))
def download(source: str = '/'):
if not source.startswith('/'):
source = f'/{source}'
if len(source) > 1 and source.endswith('/'):
source = source[:-1]
try:
for filename in pico.ls(directory=source, long_format=False, recursive=True):
filename = filename[1:]
if filename.startswith('.'):
continue
if os.path.dirname(filename):
os.makedirs(os.path.dirname(filename), mode=777, exist_ok=True)
with open(os.path.join(WATCHING_DIRECTORY, *filename.split('/')), 'wb') as fh:
fh.write(pico.get(filename))
time.sleep(.5)
except Exception as e:
print(str(e))
def contents(filename: str):
try:
for ln in pico.get(filename).decode('utf-8').split('\n'):
print(ln)
except Exception as e:
print(str(e))
def delete(source: str, is_directory: bool = False):
try:
if is_directory:
for filename in pico.ls(directory=source, long_format=False, recursive=True):
if filename.startswith('/'):
filename = filename[1:]
pico.rm(filename)
try:
for filename in pico.ls(directory=source, long_format=False, recursive=True):
if filename.startswith('/'):
filename = filename[1:]
if not filename.endswith('/'):
filename += '/'
pico.rm(filename)
except:
pass
else:
pico.rm(source)
except Exception as e:
print(str(e))
def ls(source: str = '/'):
try:
if len(source) > 1 and source.endswith('/'):
source = source[:-1]
for filename in pico.ls(directory=source, long_format=True, recursive=True):
if filename.startswith('/'):
filename = filename[1:]
if len(filename) > 6:
print('', filename)
except Exception as e:
print(str(e))
def launch(filename: str = 'main.py'):
try:
pico.run_on_board(filename)
except Exception as e:
print(str(e))
sessions = {'deleted': set(), 'modified': set()}
def on_modified_callback(event):
if event.is_directory == True:
return
source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/')
sessions['modified'].add(source)
def on_deleted_callback(event):
source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/')
if event.is_directory == True and not source.endswith('/'):
source += '/'
elif len(source.split('.')) == 1:
source += '/'
sessions['deleted'].add(source)
def watchdog_callback():
for source in sessions['deleted']:
delete(source)
for source in sessions['modified']:
upload(source)
sessions['deleted'] = set()
sessions['modified'] = set()
signal.signal(signal.SIGINT, lambda a, b: pico.send_ctrl_c())
watchdog_event = PatternMatchingEventHandler(
patterns = ['*'],
ignore_patterns = None,
ignore_directories = False,
case_sensitive = True
)
watchdog_event.on_modified = on_modified_callback
watchdog_event.on_deleted = on_deleted_callback
watchdog = Observer()
watchdog.schedule(watchdog_event, path = WATCHING_DIRECTORY, recursive = True)
watchdog.start()
try:
while True:
try:
print('>>> ', end='')
message = input().strip()
while pico.is_raw_repl_on():
time.sleep(.1)
match message.split(' '):
case ['0' | 'exit']:
sys.exit()
case ['reboot' | 'reset']:
pico.send_ctrl_d()
case ['ls' | 'stat', *source]:
ls(source[0] if source else '/')
case ['cat' | 'open' | 'contents', source]:
contents(source)
case ['del' | 'rm' | 'delete', source]:
delete(source)
case ['del*' | 'rm*' | 'rmdir' | 'delete*', source]:
delete(source, is_directory=True)
case ['format']:
delete('/', is_directory=True)
case ['upl' | 'upload' | 'update', source]:
upload(source)
case ['download' | 'backup', source]:
download(source)
case ['' | 'save' | 'commit']:
watchdog_callback()
case ['status' | 'staged']:
for filename in sessions['deleted']:
print('', filename)
for filename in sessions['modified']:
print('+', filename)
case ['cancel' | 'unstaged']:
sessions['deleted'] = set()
sessions['modified'] = set()
case _:
if message.startswith('./'):
launch(message[2:])
elif message:
print(f'"{message}" is not recognized.')
except Exception as e:
print(str(e))
except KeyboardInterrupt:
watchdog.stop()
watchdog.join()