Factorised codes
This commit is contained in:
@@ -1,813 +0,0 @@
|
|||||||
#!/usr/bin/env python
|
|
||||||
|
|
||||||
"""
|
|
||||||
pyboard interface
|
|
||||||
|
|
||||||
This module provides the Pyboard class, used to communicate with and
|
|
||||||
control the pyboard over a serial USB connection.
|
|
||||||
|
|
||||||
Example usage:
|
|
||||||
|
|
||||||
import pyboard
|
|
||||||
pyb = pyboard.Pyboard('/dev/ttyACM0')
|
|
||||||
|
|
||||||
Or:
|
|
||||||
|
|
||||||
pyb = pyboard.Pyboard('192.168.1.1')
|
|
||||||
|
|
||||||
Then:
|
|
||||||
|
|
||||||
pyb.enter_raw_repl()
|
|
||||||
pyb.exec('pyb.LED(1).on()')
|
|
||||||
pyb.exit_raw_repl()
|
|
||||||
|
|
||||||
Note: if using Python2 then pyb.exec must be written as pyb.exec_.
|
|
||||||
To run a script from the local machine on the board and print out the results:
|
|
||||||
|
|
||||||
import pyboard
|
|
||||||
pyboard.execfile('test.py', device='/dev/ttyACM0')
|
|
||||||
|
|
||||||
This script can also be run directly. To execute a local script, use:
|
|
||||||
|
|
||||||
./pyboard.py test.py
|
|
||||||
|
|
||||||
Or:
|
|
||||||
|
|
||||||
python pyboard.py test.py
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Adafruit MicroPython Tool - File Operations
|
|
||||||
# Author: Tony DiCola
|
|
||||||
# Copyright (c) 2016 Adafruit Industries
|
|
||||||
#
|
|
||||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
# of this software and associated documentation files (the "Software"), to deal
|
|
||||||
# in the Software without restriction, including without limitation the rights
|
|
||||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
# copies of the Software, and to permit persons to whom the Software is
|
|
||||||
# furnished to do so, subject to the following conditions:
|
|
||||||
#
|
|
||||||
# The above copyright notice and this permission notice shall be included in all
|
|
||||||
# copies or substantial portions of the Software.
|
|
||||||
#
|
|
||||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
# SOFTWARE.
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import ast
|
|
||||||
import time
|
|
||||||
import textwrap
|
|
||||||
import binascii
|
|
||||||
|
|
||||||
|
|
||||||
_rawdelay = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
stdout = sys.stdout.buffer
|
|
||||||
except AttributeError:
|
|
||||||
# Python2 doesn't have buffer attr
|
|
||||||
stdout = sys.stdout
|
|
||||||
|
|
||||||
def stdout_write_bytes(b):
|
|
||||||
b = b.replace(b"\x04", b"")
|
|
||||||
stdout.write(b)
|
|
||||||
stdout.flush()
|
|
||||||
|
|
||||||
class PyboardError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class TelnetToSerial:
|
|
||||||
def __init__(self, ip, user, password, read_timeout=None):
|
|
||||||
import telnetlib
|
|
||||||
self.tn = telnetlib.Telnet(ip, timeout=15)
|
|
||||||
self.read_timeout = read_timeout
|
|
||||||
if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout):
|
|
||||||
self.tn.write(bytes(user, 'ascii') + b"\r\n")
|
|
||||||
|
|
||||||
if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout):
|
|
||||||
# needed because of internal implementation details of the telnet server
|
|
||||||
time.sleep(0.2)
|
|
||||||
self.tn.write(bytes(password, 'ascii') + b"\r\n")
|
|
||||||
|
|
||||||
if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout):
|
|
||||||
# login succesful
|
|
||||||
from collections import deque
|
|
||||||
self.fifo = deque()
|
|
||||||
return
|
|
||||||
|
|
||||||
raise PyboardError('Failed to establish a telnet connection with the board')
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
self.close()
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
try:
|
|
||||||
self.tn.close()
|
|
||||||
except:
|
|
||||||
# the telnet object might not exist yet, so ignore this one
|
|
||||||
pass
|
|
||||||
|
|
||||||
def read(self, size=1):
|
|
||||||
while len(self.fifo) < size:
|
|
||||||
timeout_count = 0
|
|
||||||
data = self.tn.read_eager()
|
|
||||||
if len(data):
|
|
||||||
self.fifo.extend(data)
|
|
||||||
timeout_count = 0
|
|
||||||
else:
|
|
||||||
time.sleep(0.25)
|
|
||||||
if self.read_timeout is not None and timeout_count > 4 * self.read_timeout:
|
|
||||||
break
|
|
||||||
timeout_count += 1
|
|
||||||
|
|
||||||
data = b''
|
|
||||||
while len(data) < size and len(self.fifo) > 0:
|
|
||||||
data += bytes([self.fifo.popleft()])
|
|
||||||
return data
|
|
||||||
|
|
||||||
def write(self, data):
|
|
||||||
self.tn.write(data)
|
|
||||||
return len(data)
|
|
||||||
|
|
||||||
def inWaiting(self):
|
|
||||||
n_waiting = len(self.fifo)
|
|
||||||
if not n_waiting:
|
|
||||||
data = self.tn.read_eager()
|
|
||||||
self.fifo.extend(data)
|
|
||||||
return len(data)
|
|
||||||
else:
|
|
||||||
return n_waiting
|
|
||||||
|
|
||||||
class Pyboard:
|
|
||||||
def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rawdelay=0):
|
|
||||||
global _rawdelay
|
|
||||||
_rawdelay = rawdelay
|
|
||||||
if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
|
|
||||||
# device looks like an IP address
|
|
||||||
self.serial = TelnetToSerial(device, user, password, read_timeout=10)
|
|
||||||
else:
|
|
||||||
import serial
|
|
||||||
delayed = False
|
|
||||||
for attempt in range(wait + 1):
|
|
||||||
try:
|
|
||||||
self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1)
|
|
||||||
break
|
|
||||||
except (OSError, IOError): # Py2 and Py3 have different errors
|
|
||||||
if wait == 0:
|
|
||||||
continue
|
|
||||||
if attempt == 0:
|
|
||||||
sys.stdout.write(f'Waiting {wait} seconds for pyboard ')
|
|
||||||
delayed = True
|
|
||||||
time.sleep(1)
|
|
||||||
sys.stdout.write('.')
|
|
||||||
sys.stdout.flush()
|
|
||||||
else:
|
|
||||||
if delayed:
|
|
||||||
print('')
|
|
||||||
raise PyboardError('failed to access ' + device)
|
|
||||||
if delayed:
|
|
||||||
print('')
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
self.serial.close()
|
|
||||||
|
|
||||||
def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None):
|
|
||||||
data = self.serial.read(min_num_bytes)
|
|
||||||
if data_consumer:
|
|
||||||
data_consumer(data)
|
|
||||||
timeout_count = 0
|
|
||||||
while True:
|
|
||||||
if data.endswith(ending):
|
|
||||||
break
|
|
||||||
elif self.serial.inWaiting() > 0:
|
|
||||||
new_data = self.serial.read(1)
|
|
||||||
data = data + new_data
|
|
||||||
if data_consumer:
|
|
||||||
data_consumer(new_data)
|
|
||||||
timeout_count = 0
|
|
||||||
else:
|
|
||||||
timeout_count += 1
|
|
||||||
if timeout is not None and timeout_count >= 100 * timeout:
|
|
||||||
break
|
|
||||||
time.sleep(0.01)
|
|
||||||
return data
|
|
||||||
|
|
||||||
def send_ctrl_a(self):
|
|
||||||
self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL
|
|
||||||
|
|
||||||
def send_ctrl_b(self):
|
|
||||||
self.serial.write(b'\r\x02') # ctrl-B: exit raw REPL
|
|
||||||
|
|
||||||
def send_ctrl_c(self):
|
|
||||||
self.serial.write(b'\r\x03') # ctrl-C twice: interrupt any running program
|
|
||||||
time.sleep(0.1)
|
|
||||||
self.serial.write(b'\x03')
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
def send_ctrl_d(self):
|
|
||||||
self.serial.write(b'\r\x04') # ctrl-D: soft reset
|
|
||||||
|
|
||||||
def enter_raw_repl(self):
|
|
||||||
# Brief delay before sending RAW MODE char if requests
|
|
||||||
if _rawdelay > 0:
|
|
||||||
time.sleep(_rawdelay)
|
|
||||||
|
|
||||||
# ctrl-C twice: interrupt any running program
|
|
||||||
self.serial.write(b'\r\x03')
|
|
||||||
time.sleep(0.1)
|
|
||||||
self.serial.write(b'\x03')
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
# flush input (without relying on serial.flushInput())
|
|
||||||
n = self.serial.inWaiting()
|
|
||||||
while n > 0:
|
|
||||||
self.serial.read(n)
|
|
||||||
n = self.serial.inWaiting()
|
|
||||||
|
|
||||||
for retry in range(0, 5):
|
|
||||||
self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL
|
|
||||||
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>')
|
|
||||||
if data.endswith(b'raw REPL; CTRL-B to exit\r\n>'):
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
if retry >= 4:
|
|
||||||
print(data)
|
|
||||||
raise PyboardError('could not enter raw repl')
|
|
||||||
time.sleep(0.2)
|
|
||||||
|
|
||||||
self.serial.write(b'\x04') # ctrl-D: soft reset
|
|
||||||
data = self.read_until(1, b'soft reboot\r\n')
|
|
||||||
if not data.endswith(b'soft reboot\r\n'):
|
|
||||||
print(data)
|
|
||||||
raise PyboardError('could not enter raw repl')
|
|
||||||
# By splitting this into 2 reads, it allows boot.py to print stuff,
|
|
||||||
# which will show up after the soft reboot and before the raw REPL.
|
|
||||||
# Modification from original pyboard.py below:
|
|
||||||
# Add a small delay and send Ctrl-C twice after soft reboot to ensure
|
|
||||||
# any main program loop in main.py is interrupted.
|
|
||||||
time.sleep(0.5)
|
|
||||||
self.serial.write(b'\x03')
|
|
||||||
time.sleep(0.1) # (slight delay before second interrupt
|
|
||||||
self.serial.write(b'\x03')
|
|
||||||
# End modification above.
|
|
||||||
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n')
|
|
||||||
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'):
|
|
||||||
print(data)
|
|
||||||
raise PyboardError('could not enter raw repl')
|
|
||||||
|
|
||||||
def exit_raw_repl(self):
|
|
||||||
self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL
|
|
||||||
|
|
||||||
def follow(self, timeout, data_consumer=None):
|
|
||||||
# wait for normal output
|
|
||||||
data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer)
|
|
||||||
if not data.endswith(b'\x04'):
|
|
||||||
raise PyboardError('timeout waiting for first EOF reception')
|
|
||||||
data = data[:-1]
|
|
||||||
|
|
||||||
# wait for error output
|
|
||||||
data_err = self.read_until(1, b'\x04', timeout=timeout)
|
|
||||||
if not data_err.endswith(b'\x04'):
|
|
||||||
raise PyboardError('timeout waiting for second EOF reception')
|
|
||||||
data_err = data_err[:-1]
|
|
||||||
|
|
||||||
# return normal and error output
|
|
||||||
return data, data_err
|
|
||||||
|
|
||||||
def exec_raw_no_follow(self, command):
|
|
||||||
if isinstance(command, bytes):
|
|
||||||
command_bytes = command
|
|
||||||
else:
|
|
||||||
command_bytes = bytes(command, encoding='utf8')
|
|
||||||
|
|
||||||
# check we have a prompt
|
|
||||||
data = self.read_until(1, b'>')
|
|
||||||
if not data.endswith(b'>'):
|
|
||||||
raise PyboardError('could not enter raw repl')
|
|
||||||
|
|
||||||
# write command
|
|
||||||
for i in range(0, len(command_bytes), 256):
|
|
||||||
self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))])
|
|
||||||
time.sleep(0.01)
|
|
||||||
self.serial.write(b'\x04')
|
|
||||||
|
|
||||||
# check if we could exec command
|
|
||||||
data = self.serial.read(2)
|
|
||||||
if data != b'OK':
|
|
||||||
raise PyboardError('could not exec command')
|
|
||||||
|
|
||||||
def exec_raw(self, command, timeout=10, data_consumer=None):
|
|
||||||
self.exec_raw_no_follow(command)
|
|
||||||
return self.follow(timeout, data_consumer)
|
|
||||||
|
|
||||||
def eval(self, expression):
|
|
||||||
ret = self.exec_(f'print({expression})')
|
|
||||||
ret = ret.strip()
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def exec_(self, command, stream_output=False):
|
|
||||||
data_consumer = None
|
|
||||||
if stream_output:
|
|
||||||
data_consumer = stdout_write_bytes
|
|
||||||
ret, ret_err = self.exec_raw(command, data_consumer=data_consumer)
|
|
||||||
if ret_err:
|
|
||||||
raise PyboardError('exception', ret, ret_err)
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def execfile(self, filename, stream_output=False):
|
|
||||||
with open(filename, 'rb') as f:
|
|
||||||
pyfile = f.read()
|
|
||||||
return self.exec_(pyfile, stream_output=stream_output)
|
|
||||||
|
|
||||||
def get_time(self):
|
|
||||||
t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ')
|
|
||||||
return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6])
|
|
||||||
|
|
||||||
# in Python2 exec is a keyword so one must use "exec_"
|
|
||||||
# but for Python3 we want to provide the nicer version "exec"
|
|
||||||
setattr(Pyboard, "exec", Pyboard.exec_)
|
|
||||||
|
|
||||||
def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'):
|
|
||||||
pyb = Pyboard(device, baudrate, user, password)
|
|
||||||
pyb.enter_raw_repl()
|
|
||||||
output = pyb.execfile(filename)
|
|
||||||
stdout_write_bytes(output)
|
|
||||||
pyb.exit_raw_repl()
|
|
||||||
pyb.close()
|
|
||||||
|
|
||||||
# def main():
|
|
||||||
# import argparse
|
|
||||||
# cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.')
|
|
||||||
# cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard')
|
|
||||||
# cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device')
|
|
||||||
# cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username')
|
|
||||||
# cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password')
|
|
||||||
# cmd_parser.add_argument('-c', '--command', help='program passed in as string')
|
|
||||||
# cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available')
|
|
||||||
# cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]')
|
|
||||||
# cmd_parser.add_argument('files', nargs='*', help='input files')
|
|
||||||
# args = cmd_parser.parse_args()
|
|
||||||
|
|
||||||
# def execbuffer(buf):
|
|
||||||
# try:
|
|
||||||
# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
|
|
||||||
# pyb.enter_raw_repl()
|
|
||||||
# ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes)
|
|
||||||
# pyb.exit_raw_repl()
|
|
||||||
# pyb.close()
|
|
||||||
# except PyboardError as er:
|
|
||||||
# print(er)
|
|
||||||
# sys.exit(1)
|
|
||||||
# except KeyboardInterrupt:
|
|
||||||
# sys.exit(1)
|
|
||||||
# if ret_err:
|
|
||||||
# stdout_write_bytes(ret_err)
|
|
||||||
# sys.exit(1)
|
|
||||||
|
|
||||||
# if args.command is not None:
|
|
||||||
# execbuffer(args.command.encode('utf-8'))
|
|
||||||
|
|
||||||
# for filename in args.files:
|
|
||||||
# with open(filename, 'rb') as f:
|
|
||||||
# pyfile = f.read()
|
|
||||||
# execbuffer(pyfile)
|
|
||||||
|
|
||||||
# if args.follow or (args.command is None and len(args.files) == 0):
|
|
||||||
# try:
|
|
||||||
# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
|
|
||||||
# ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes)
|
|
||||||
# pyb.close()
|
|
||||||
# except PyboardError as er:
|
|
||||||
# print(er)
|
|
||||||
# sys.exit(1)
|
|
||||||
# except KeyboardInterrupt:
|
|
||||||
# sys.exit(1)
|
|
||||||
# if ret_err:
|
|
||||||
# stdout_write_bytes(ret_err)
|
|
||||||
# sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
BUFFER_SIZE = 512 # Amount of data to read or write to the serial port at a time.
|
|
||||||
# This is kept small because small chips and USB to serial
|
|
||||||
# bridges usually have very small buffers.
|
|
||||||
|
|
||||||
|
|
||||||
class DirectoryExistsError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class Files(object):
|
|
||||||
"""Class to interact with a MicroPython board files over a serial connection.
|
|
||||||
Provides functions for listing, uploading, and downloading files from the
|
|
||||||
board's filesystem.
|
|
||||||
"""
|
|
||||||
|
|
||||||
__raw_repl_on: bool = False
|
|
||||||
|
|
||||||
def __init__(self, pyboard: Pyboard):
|
|
||||||
"""Initialize the MicroPython board files class using the provided pyboard
|
|
||||||
instance. In most cases you should create a Pyboard instance (from
|
|
||||||
pyboard.py) which connects to a board over a serial connection and pass
|
|
||||||
it in, but you can pass in other objects for testing, etc.
|
|
||||||
"""
|
|
||||||
self._pyboard = pyboard
|
|
||||||
|
|
||||||
def is_raw_repl_on(self):
|
|
||||||
return self.__raw_repl_on
|
|
||||||
|
|
||||||
def send_ctrl_a(self):
|
|
||||||
self._pyboard.send_ctrl_a() # ctrl-A: enter raw REPL
|
|
||||||
|
|
||||||
def send_ctrl_b(self):
|
|
||||||
self._pyboard.send_ctrl_b() # ctrl-B: exit raw REPL
|
|
||||||
|
|
||||||
def send_ctrl_c(self):
|
|
||||||
self._pyboard.send_ctrl_c() # ctrl-C twice: interrupt any running program
|
|
||||||
|
|
||||||
def send_ctrl_d(self):
|
|
||||||
self._pyboard.send_ctrl_d() # ctrl-D: soft reset
|
|
||||||
|
|
||||||
def ls(self, directory="/", long_format=False, recursive=True):
|
|
||||||
"""List the contents of the specified directory (or root if none is
|
|
||||||
specified). Returns a list of strings with the names of files in the
|
|
||||||
specified directory. If long_format is True then a list of 2-tuples
|
|
||||||
with the name and size (in bytes) of the item is returned. Note that
|
|
||||||
it appears the size of directories is not supported by MicroPython and
|
|
||||||
will always return 0 (i.e. no recursive size computation).
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Disabling for now, see https://github.com/adafruit/ampy/issues/55.
|
|
||||||
# # Make sure directory ends in a slash.
|
|
||||||
# if not directory.endswith("/"):
|
|
||||||
# directory += "/"
|
|
||||||
|
|
||||||
# Make sure directory starts with slash, for consistency.
|
|
||||||
# if not directory.startswith("/"):
|
|
||||||
# directory = "/" + directory
|
|
||||||
|
|
||||||
command = """\
|
|
||||||
try:
|
|
||||||
import os
|
|
||||||
except ImportError:
|
|
||||||
import uos as os\n"""
|
|
||||||
|
|
||||||
if recursive:
|
|
||||||
command += """\
|
|
||||||
def listdir(directory):
|
|
||||||
result = set()
|
|
||||||
|
|
||||||
def _listdir(dir_or_file):
|
|
||||||
try:
|
|
||||||
# if its a directory, then it should provide some children.
|
|
||||||
children = os.listdir(dir_or_file)
|
|
||||||
except OSError:
|
|
||||||
# probably a file. run stat() to confirm.
|
|
||||||
os.stat(dir_or_file)
|
|
||||||
result.add(dir_or_file)
|
|
||||||
else:
|
|
||||||
# probably a directory, add to result if empty.
|
|
||||||
if children:
|
|
||||||
# queue the children to be dealt with in next iteration.
|
|
||||||
for child in children:
|
|
||||||
# create the full path.
|
|
||||||
if dir_or_file == '/':
|
|
||||||
next = dir_or_file + child
|
|
||||||
else:
|
|
||||||
next = dir_or_file + '/' + child
|
|
||||||
|
|
||||||
_listdir(next)
|
|
||||||
else:
|
|
||||||
result.add(dir_or_file)
|
|
||||||
|
|
||||||
_listdir(directory)
|
|
||||||
return sorted(result)\n"""
|
|
||||||
else:
|
|
||||||
command += """\
|
|
||||||
def listdir(directory):
|
|
||||||
if directory == '/':
|
|
||||||
return sorted([directory + f for f in os.listdir(directory)])
|
|
||||||
else:
|
|
||||||
return sorted([directory + '/' + f for f in os.listdir(directory)])\n"""
|
|
||||||
|
|
||||||
# Execute os.listdir() command on the board.
|
|
||||||
if long_format:
|
|
||||||
command += f"""
|
|
||||||
r = []
|
|
||||||
for f in listdir('{directory}'):
|
|
||||||
size = os.stat(f)[6]
|
|
||||||
r.append(f'{{f}} ({{size}}b)')
|
|
||||||
print(r)
|
|
||||||
"""
|
|
||||||
else:
|
|
||||||
command += f"""
|
|
||||||
print(listdir('{directory}'))
|
|
||||||
"""
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.__raw_repl_on = True
|
|
||||||
self._pyboard.enter_raw_repl()
|
|
||||||
output = self._pyboard.exec_(textwrap.dedent(command))
|
|
||||||
except PyboardError as ex:
|
|
||||||
# Check if this is an OSError #2, i.e. directory doesn't exist and
|
|
||||||
# rethrow it as something more descriptive.
|
|
||||||
if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1:
|
|
||||||
raise RuntimeError(f"No such directory: {directory}")
|
|
||||||
else:
|
|
||||||
raise ex
|
|
||||||
finally:
|
|
||||||
self._pyboard.exit_raw_repl()
|
|
||||||
self.__raw_repl_on = False
|
|
||||||
|
|
||||||
# Parse the result list and return it.
|
|
||||||
return ast.literal_eval(output.decode("utf-8"))
|
|
||||||
|
|
||||||
def mkdir(self, directory, exists_okay=False):
|
|
||||||
"""Create the specified directory. Note this cannot create a recursive
|
|
||||||
hierarchy of directories, instead each one should be created separately.
|
|
||||||
"""
|
|
||||||
# Execute os.mkdir command on the board.
|
|
||||||
command = f"""
|
|
||||||
try:
|
|
||||||
import os
|
|
||||||
except ImportError:
|
|
||||||
import uos as os
|
|
||||||
os.mkdir('{directory}')
|
|
||||||
"""
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.__raw_repl_on = True
|
|
||||||
self._pyboard.enter_raw_repl()
|
|
||||||
self._pyboard.exec_(textwrap.dedent(command))
|
|
||||||
except PyboardError as ex:
|
|
||||||
# Check if this is an OSError #17, i.e. directory already exists.
|
|
||||||
if ex.args[2].decode("utf-8").find("OSError: [Errno 17] EEXIST") != -1:
|
|
||||||
if not exists_okay:
|
|
||||||
raise DirectoryExistsError(f"Directory already exists: {directory}")
|
|
||||||
finally:
|
|
||||||
self._pyboard.exit_raw_repl()
|
|
||||||
self.__raw_repl_on = False
|
|
||||||
|
|
||||||
def put(self, filename, data):
|
|
||||||
"""Create or update the specified file with the provided data.
|
|
||||||
"""
|
|
||||||
# Open the file for writing on the board and write chunks of data.
|
|
||||||
|
|
||||||
if filename.startswith('/'):
|
|
||||||
filename = filename[1:]
|
|
||||||
|
|
||||||
print(f'↑ {filename}', end='', flush=True)
|
|
||||||
size = len(data)
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.__raw_repl_on = True
|
|
||||||
self._pyboard.enter_raw_repl()
|
|
||||||
self._pyboard.exec_(f"f = open('{filename}', 'wb')")
|
|
||||||
# Loop through and write a buffer size chunk of data at a time.
|
|
||||||
for i in range(0, size, BUFFER_SIZE):
|
|
||||||
chunk_size = min(BUFFER_SIZE, size - i)
|
|
||||||
chunk = repr(data[i : i + chunk_size])
|
|
||||||
# Make sure to send explicit byte strings (handles python 2 compatibility).
|
|
||||||
if not chunk.startswith("b"):
|
|
||||||
chunk = "b" + chunk
|
|
||||||
self._pyboard.exec_(f"f.write({chunk})")
|
|
||||||
self._pyboard.exec_("f.close()")
|
|
||||||
except Exception as ex:
|
|
||||||
print(' [x]')
|
|
||||||
raise ex
|
|
||||||
finally:
|
|
||||||
self._pyboard.exit_raw_repl()
|
|
||||||
self.__raw_repl_on = False
|
|
||||||
|
|
||||||
print(f' - {size}b [✓]')
|
|
||||||
return size
|
|
||||||
|
|
||||||
def get(self, filename):
|
|
||||||
"""Retrieve the contents of the specified file and return its contents
|
|
||||||
as a byte string.
|
|
||||||
"""
|
|
||||||
# Open the file and read it a few bytes at a time and print out the
|
|
||||||
# raw bytes. Be careful not to overload the UART buffer so only write
|
|
||||||
# a few bytes at a time, and don't use print since it adds newlines and
|
|
||||||
# expects string data.
|
|
||||||
command = f"""
|
|
||||||
import sys
|
|
||||||
import ubinascii
|
|
||||||
with open('{filename}', 'rb') as infile:
|
|
||||||
while True:
|
|
||||||
result = infile.read({BUFFER_SIZE})
|
|
||||||
if result == b'':
|
|
||||||
break
|
|
||||||
len = sys.stdout.write(ubinascii.hexlify(result))
|
|
||||||
"""
|
|
||||||
print(f'↓ {filename}', end='', flush=True)
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.__raw_repl_on = True
|
|
||||||
self._pyboard.enter_raw_repl()
|
|
||||||
output = self._pyboard.exec_(textwrap.dedent(command))
|
|
||||||
except PyboardError as ex:
|
|
||||||
print(' [x]')
|
|
||||||
# Check if this is an OSError #2, i.e. file doesn't exist and
|
|
||||||
# rethrow it as something more descriptive.
|
|
||||||
try:
|
|
||||||
if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1:
|
|
||||||
raise RuntimeError(f"No such file: {filename}")
|
|
||||||
else:
|
|
||||||
raise ex
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
raise ex
|
|
||||||
finally:
|
|
||||||
self._pyboard.exit_raw_repl()
|
|
||||||
self.__raw_repl_on = False
|
|
||||||
|
|
||||||
print(f' - {int(len(output) / 2)}b [✓]')
|
|
||||||
return binascii.unhexlify(output)
|
|
||||||
|
|
||||||
def rm(self, filename):
|
|
||||||
"""Remove the specified file or directory."""
|
|
||||||
command = f"""
|
|
||||||
try:
|
|
||||||
import os
|
|
||||||
except ImportError:
|
|
||||||
import uos as os
|
|
||||||
os.remove('{filename}')
|
|
||||||
"""
|
|
||||||
print(f'↶ {filename}', end='')
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.__raw_repl_on = True
|
|
||||||
self._pyboard.enter_raw_repl()
|
|
||||||
self._pyboard.exec_(textwrap.dedent(command))
|
|
||||||
except PyboardError as ex:
|
|
||||||
print(' [x]')
|
|
||||||
message = ex.args[2].decode("utf-8")
|
|
||||||
# Check if this is an OSError #2, i.e. file/directory doesn't exist
|
|
||||||
# and rethrow it as something more descriptive.
|
|
||||||
if message.find("OSError: [Errno 2] ENOENT") != -1:
|
|
||||||
raise RuntimeError(f"No such file/directory: {filename}")
|
|
||||||
# Check for OSError #13, the directory isn't empty.
|
|
||||||
if message.find("OSError: [Errno 13] EACCES") != -1:
|
|
||||||
raise RuntimeError(f"Directory is not empty: {filename}")
|
|
||||||
else:
|
|
||||||
raise ex
|
|
||||||
finally:
|
|
||||||
self._pyboard.exit_raw_repl()
|
|
||||||
self.__raw_repl_on = False
|
|
||||||
|
|
||||||
print(' [✓]')
|
|
||||||
|
|
||||||
def rmdir(self, directory, missing_okay=False):
|
|
||||||
"""Forcefully remove the specified directory and all its children."""
|
|
||||||
# Build a script to walk an entire directory structure and delete every
|
|
||||||
# file and subfolder. This is tricky because MicroPython has no os.walk
|
|
||||||
# or similar function to walk folders, so this code does it manually
|
|
||||||
# with recursion and changing directories. For each directory it lists
|
|
||||||
# the files and deletes everything it can, i.e. all the files. Then
|
|
||||||
# it lists the files again and assumes they are directories (since they
|
|
||||||
# couldn't be deleted in the first pass) and recursively clears those
|
|
||||||
# subdirectories. Finally when finished clearing all the children the
|
|
||||||
# parent directory is deleted.
|
|
||||||
command = f"""
|
|
||||||
try:
|
|
||||||
import os
|
|
||||||
except ImportError:
|
|
||||||
import uos as os
|
|
||||||
def rmdir(directory):
|
|
||||||
os.chdir(directory)
|
|
||||||
for f in os.listdir():
|
|
||||||
try:
|
|
||||||
os.remove(f)
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
for f in os.listdir():
|
|
||||||
rmdir(f)
|
|
||||||
os.chdir('..')
|
|
||||||
os.rmdir(directory)
|
|
||||||
rmdir('{directory}')
|
|
||||||
"""
|
|
||||||
print(f'↶ {directory}', end='')
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.__raw_repl_on = True
|
|
||||||
self._pyboard.enter_raw_repl()
|
|
||||||
self._pyboard.exec_(textwrap.dedent(command))
|
|
||||||
except PyboardError as ex:
|
|
||||||
message = ex.args[2].decode("utf-8")
|
|
||||||
# Check if this is an OSError #2, i.e. directory doesn't exist
|
|
||||||
# and rethrow it as something more descriptive.
|
|
||||||
if message.find("OSError: [Errno 2] ENOENT") != -1:
|
|
||||||
if not missing_okay:
|
|
||||||
print(' [x]')
|
|
||||||
raise RuntimeError(f"No such directory: {directory}")
|
|
||||||
finally:
|
|
||||||
self._pyboard.exit_raw_repl()
|
|
||||||
self.__raw_repl_on = False
|
|
||||||
|
|
||||||
print(' [✓]')
|
|
||||||
|
|
||||||
def run(self, filename, wait_output=True, stream_output=True):
|
|
||||||
"""Run the provided script and return its output. If wait_output is True
|
|
||||||
(default) then wait for the script to finish and then return its output,
|
|
||||||
otherwise just run the script and don't wait for any output.
|
|
||||||
If stream_output is True(default) then return None and print outputs to
|
|
||||||
stdout without buffering.
|
|
||||||
"""
|
|
||||||
output = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.__raw_repl_on = True
|
|
||||||
self._pyboard.enter_raw_repl()
|
|
||||||
|
|
||||||
if stream_output:
|
|
||||||
self._pyboard.execfile(filename, stream_output=True)
|
|
||||||
elif wait_output:
|
|
||||||
# Run the file and wait for output to return.
|
|
||||||
output = self._pyboard.execfile(filename)
|
|
||||||
else:
|
|
||||||
# Read the file and run it using lower level pyboard functions that
|
|
||||||
# won't wait for it to finish or return output.
|
|
||||||
with open(filename, "rb") as infile:
|
|
||||||
self._pyboard.exec_raw_no_follow(infile.read())
|
|
||||||
except Exception as e:
|
|
||||||
raise e
|
|
||||||
finally:
|
|
||||||
self._pyboard.exit_raw_repl()
|
|
||||||
self.__raw_repl_on = False
|
|
||||||
|
|
||||||
return output
|
|
||||||
|
|
||||||
|
|
||||||
def run_on_board(self, filename, wait_output=True, stream_output=True):
|
|
||||||
"""Run the provided script and return its output. If wait_output is True
|
|
||||||
(default) then wait for the script to finish and then return its output,
|
|
||||||
otherwise just run the script and don't wait for any output.
|
|
||||||
If stream_output is True(default) then return None and print outputs to
|
|
||||||
stdout without buffering.
|
|
||||||
"""
|
|
||||||
command = textwrap.dedent(f'exec(open("{filename}").read())')
|
|
||||||
output = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.__raw_repl_on = True
|
|
||||||
self._pyboard.enter_raw_repl()
|
|
||||||
|
|
||||||
if stream_output:
|
|
||||||
self._pyboard.exec_(command, stream_output=True)
|
|
||||||
elif wait_output:
|
|
||||||
# Run the file and wait for output to return.
|
|
||||||
output = self._pyboard.exec_(command)
|
|
||||||
else:
|
|
||||||
# Read the file and run it using lower level pyboard functions that
|
|
||||||
# won't wait for it to finish or return output.
|
|
||||||
self._pyboard.exec_raw_no_follow(command)
|
|
||||||
except Exception as e:
|
|
||||||
message = str(e)
|
|
||||||
|
|
||||||
if message.find('OSError: [Errno 2] ENOENT') != -1:
|
|
||||||
reason = f'"{filename}" does not exists!'
|
|
||||||
elif message.find('OSError: [Errno 13] EACCES') != -1:
|
|
||||||
reason = f'"{filename}" access denied!'
|
|
||||||
elif message.find('OSError: [Errno 21] EISDIR') != -1:
|
|
||||||
reason = f'"{filename}" is a directory!'
|
|
||||||
else:
|
|
||||||
reason = 'Traceback (most recent call last):'
|
|
||||||
traceback = message.split(reason)
|
|
||||||
|
|
||||||
if len(traceback) == 2:
|
|
||||||
for call in traceback[1][:-2].split('\\r\\n'):
|
|
||||||
reason += f'{call}\n'
|
|
||||||
|
|
||||||
raise Exception(reason.strip())
|
|
||||||
finally:
|
|
||||||
self._pyboard.exit_raw_repl()
|
|
||||||
self.__raw_repl_on = False
|
|
||||||
|
|
||||||
return output
|
|
||||||
|
|
||||||
def exec(self, command):
|
|
||||||
try:
|
|
||||||
self.__raw_repl_on = True
|
|
||||||
self._pyboard.enter_raw_repl()
|
|
||||||
output = self._pyboard.exec_(textwrap.dedent(command), stream_output=True)
|
|
||||||
except Exception as e:
|
|
||||||
message = str(e)
|
|
||||||
reason = 'Traceback (most recent call last):'
|
|
||||||
traceback = message.split(reason)
|
|
||||||
|
|
||||||
if len(traceback) == 2:
|
|
||||||
for call in traceback[1][:-2].split('\\r\\n'):
|
|
||||||
reason += f'{call}\n'
|
|
||||||
|
|
||||||
raise Exception(reason.strip())
|
|
||||||
finally:
|
|
||||||
self._pyboard.exit_raw_repl()
|
|
||||||
self.__raw_repl_on = False
|
|
||||||
|
|
||||||
return output
|
|
||||||
@@ -1,14 +1,824 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
"""
|
||||||
|
pyboard interface
|
||||||
|
|
||||||
|
This module provides the Pyboard class, used to communicate with and
|
||||||
|
control the pyboard over a serial USB connection.
|
||||||
|
|
||||||
|
Example usage:
|
||||||
|
|
||||||
|
import pyboard
|
||||||
|
pyb = pyboard.Pyboard('/dev/ttyACM0')
|
||||||
|
|
||||||
|
Or:
|
||||||
|
|
||||||
|
pyb = pyboard.Pyboard('192.168.1.1')
|
||||||
|
|
||||||
|
Then:
|
||||||
|
|
||||||
|
pyb.enter_raw_repl()
|
||||||
|
pyb.exec('pyb.LED(1).on()')
|
||||||
|
pyb.exit_raw_repl()
|
||||||
|
|
||||||
|
Note: if using Python2 then pyb.exec must be written as pyb.exec_.
|
||||||
|
To run a script from the local machine on the board and print out the results:
|
||||||
|
|
||||||
|
import pyboard
|
||||||
|
pyboard.execfile('test.py', device='/dev/ttyACM0')
|
||||||
|
|
||||||
|
This script can also be run directly. To execute a local script, use:
|
||||||
|
|
||||||
|
./pyboard.py test.py
|
||||||
|
|
||||||
|
Or:
|
||||||
|
|
||||||
|
python pyboard.py test.py
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Adafruit MicroPython Tool - File Operations
|
||||||
|
# Author: Tony DiCola
|
||||||
|
# Copyright (c) 2016 Adafruit Industries
|
||||||
#
|
#
|
||||||
import os
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
# of this software and associated documentation files (the "Software"), to deal
|
||||||
|
# in the Software without restriction, including without limitation the rights
|
||||||
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
# copies of the Software, and to permit persons to whom the Software is
|
||||||
|
# furnished to do so, subject to the following conditions:
|
||||||
|
#
|
||||||
|
# The above copyright notice and this permission notice shall be included in all
|
||||||
|
# copies or substantial portions of the Software.
|
||||||
|
#
|
||||||
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
# SOFTWARE.
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
import ast
|
||||||
import time
|
import time
|
||||||
|
import textwrap
|
||||||
|
import binascii
|
||||||
|
|
||||||
|
import os
|
||||||
import signal
|
import signal
|
||||||
|
|
||||||
from lib.pyboard import Pyboard, Files
|
|
||||||
from watchdog.observers import Observer
|
from watchdog.observers import Observer
|
||||||
from watchdog.events import PatternMatchingEventHandler
|
from watchdog.events import PatternMatchingEventHandler
|
||||||
|
|
||||||
|
|
||||||
|
_rawdelay = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
stdout = sys.stdout.buffer
|
||||||
|
except AttributeError:
|
||||||
|
# Python2 doesn't have buffer attr
|
||||||
|
stdout = sys.stdout
|
||||||
|
|
||||||
|
def stdout_write_bytes(b):
|
||||||
|
b = b.replace(b"\x04", b"")
|
||||||
|
stdout.write(b)
|
||||||
|
stdout.flush()
|
||||||
|
|
||||||
|
class PyboardError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class TelnetToSerial:
|
||||||
|
def __init__(self, ip, user, password, read_timeout=None):
|
||||||
|
import telnetlib
|
||||||
|
self.tn = telnetlib.Telnet(ip, timeout=15)
|
||||||
|
self.read_timeout = read_timeout
|
||||||
|
if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout):
|
||||||
|
self.tn.write(bytes(user, 'ascii') + b"\r\n")
|
||||||
|
|
||||||
|
if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout):
|
||||||
|
# needed because of internal implementation details of the telnet server
|
||||||
|
time.sleep(0.2)
|
||||||
|
self.tn.write(bytes(password, 'ascii') + b"\r\n")
|
||||||
|
|
||||||
|
if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout):
|
||||||
|
# login succesful
|
||||||
|
from collections import deque
|
||||||
|
self.fifo = deque()
|
||||||
|
return
|
||||||
|
|
||||||
|
raise PyboardError('Failed to establish a telnet connection with the board')
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
try:
|
||||||
|
self.tn.close()
|
||||||
|
except:
|
||||||
|
# the telnet object might not exist yet, so ignore this one
|
||||||
|
pass
|
||||||
|
|
||||||
|
def read(self, size=1):
|
||||||
|
while len(self.fifo) < size:
|
||||||
|
timeout_count = 0
|
||||||
|
data = self.tn.read_eager()
|
||||||
|
if len(data):
|
||||||
|
self.fifo.extend(data)
|
||||||
|
timeout_count = 0
|
||||||
|
else:
|
||||||
|
time.sleep(0.25)
|
||||||
|
if self.read_timeout is not None and timeout_count > 4 * self.read_timeout:
|
||||||
|
break
|
||||||
|
timeout_count += 1
|
||||||
|
|
||||||
|
data = b''
|
||||||
|
while len(data) < size and len(self.fifo) > 0:
|
||||||
|
data += bytes([self.fifo.popleft()])
|
||||||
|
return data
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
self.tn.write(data)
|
||||||
|
return len(data)
|
||||||
|
|
||||||
|
def inWaiting(self):
|
||||||
|
n_waiting = len(self.fifo)
|
||||||
|
if not n_waiting:
|
||||||
|
data = self.tn.read_eager()
|
||||||
|
self.fifo.extend(data)
|
||||||
|
return len(data)
|
||||||
|
else:
|
||||||
|
return n_waiting
|
||||||
|
|
||||||
|
class Pyboard:
|
||||||
|
def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0, rawdelay=0):
|
||||||
|
global _rawdelay
|
||||||
|
_rawdelay = rawdelay
|
||||||
|
if device and device[0].isdigit() and device[-1].isdigit() and device.count('.') == 3:
|
||||||
|
# device looks like an IP address
|
||||||
|
self.serial = TelnetToSerial(device, user, password, read_timeout=10)
|
||||||
|
else:
|
||||||
|
import serial
|
||||||
|
delayed = False
|
||||||
|
for attempt in range(wait + 1):
|
||||||
|
try:
|
||||||
|
self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1)
|
||||||
|
break
|
||||||
|
except (OSError, IOError): # Py2 and Py3 have different errors
|
||||||
|
if wait == 0:
|
||||||
|
continue
|
||||||
|
if attempt == 0:
|
||||||
|
sys.stdout.write(f'Waiting {wait} seconds for pyboard ')
|
||||||
|
delayed = True
|
||||||
|
time.sleep(1)
|
||||||
|
sys.stdout.write('.')
|
||||||
|
sys.stdout.flush()
|
||||||
|
else:
|
||||||
|
if delayed:
|
||||||
|
print('')
|
||||||
|
raise PyboardError('failed to access ' + device)
|
||||||
|
if delayed:
|
||||||
|
print('')
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.serial.close()
|
||||||
|
|
||||||
|
def read_until(self, min_num_bytes, ending, timeout=10, data_consumer=None):
|
||||||
|
data = self.serial.read(min_num_bytes)
|
||||||
|
if data_consumer:
|
||||||
|
data_consumer(data)
|
||||||
|
timeout_count = 0
|
||||||
|
while True:
|
||||||
|
if data.endswith(ending):
|
||||||
|
break
|
||||||
|
elif self.serial.inWaiting() > 0:
|
||||||
|
new_data = self.serial.read(1)
|
||||||
|
data = data + new_data
|
||||||
|
if data_consumer:
|
||||||
|
data_consumer(new_data)
|
||||||
|
timeout_count = 0
|
||||||
|
else:
|
||||||
|
timeout_count += 1
|
||||||
|
if timeout is not None and timeout_count >= 100 * timeout:
|
||||||
|
break
|
||||||
|
time.sleep(0.01)
|
||||||
|
return data
|
||||||
|
|
||||||
|
def send_ctrl_a(self):
|
||||||
|
self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL
|
||||||
|
|
||||||
|
def send_ctrl_b(self):
|
||||||
|
self.serial.write(b'\r\x02') # ctrl-B: exit raw REPL
|
||||||
|
|
||||||
|
def send_ctrl_c(self):
|
||||||
|
self.serial.write(b'\r\x03') # ctrl-C twice: interrupt any running program
|
||||||
|
time.sleep(0.1)
|
||||||
|
self.serial.write(b'\x03')
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
def send_ctrl_d(self):
|
||||||
|
self.serial.write(b'\r\x04') # ctrl-D: soft reset
|
||||||
|
|
||||||
|
def enter_raw_repl(self):
|
||||||
|
# Brief delay before sending RAW MODE char if requests
|
||||||
|
if _rawdelay > 0:
|
||||||
|
time.sleep(_rawdelay)
|
||||||
|
|
||||||
|
# ctrl-C twice: interrupt any running program
|
||||||
|
self.serial.write(b'\r\x03')
|
||||||
|
time.sleep(0.1)
|
||||||
|
self.serial.write(b'\x03')
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
# flush input (without relying on serial.flushInput())
|
||||||
|
n = self.serial.inWaiting()
|
||||||
|
while n > 0:
|
||||||
|
self.serial.read(n)
|
||||||
|
n = self.serial.inWaiting()
|
||||||
|
|
||||||
|
for retry in range(0, 5):
|
||||||
|
self.serial.write(b'\r\x01') # ctrl-A: enter raw REPL
|
||||||
|
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n>')
|
||||||
|
if data.endswith(b'raw REPL; CTRL-B to exit\r\n>'):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
if retry >= 4:
|
||||||
|
print(data)
|
||||||
|
raise PyboardError('could not enter raw repl')
|
||||||
|
time.sleep(0.2)
|
||||||
|
|
||||||
|
self.serial.write(b'\x04') # ctrl-D: soft reset
|
||||||
|
data = self.read_until(1, b'soft reboot\r\n')
|
||||||
|
if not data.endswith(b'soft reboot\r\n'):
|
||||||
|
print(data)
|
||||||
|
raise PyboardError('could not enter raw repl')
|
||||||
|
# By splitting this into 2 reads, it allows boot.py to print stuff,
|
||||||
|
# which will show up after the soft reboot and before the raw REPL.
|
||||||
|
# Modification from original pyboard.py below:
|
||||||
|
# Add a small delay and send Ctrl-C twice after soft reboot to ensure
|
||||||
|
# any main program loop in main.py is interrupted.
|
||||||
|
time.sleep(0.5)
|
||||||
|
self.serial.write(b'\x03')
|
||||||
|
time.sleep(0.1) # (slight delay before second interrupt
|
||||||
|
self.serial.write(b'\x03')
|
||||||
|
# End modification above.
|
||||||
|
data = self.read_until(1, b'raw REPL; CTRL-B to exit\r\n')
|
||||||
|
if not data.endswith(b'raw REPL; CTRL-B to exit\r\n'):
|
||||||
|
print(data)
|
||||||
|
raise PyboardError('could not enter raw repl')
|
||||||
|
|
||||||
|
def exit_raw_repl(self):
|
||||||
|
self.serial.write(b'\r\x02') # ctrl-B: enter friendly REPL
|
||||||
|
|
||||||
|
def follow(self, timeout, data_consumer=None):
|
||||||
|
# wait for normal output
|
||||||
|
data = self.read_until(1, b'\x04', timeout=timeout, data_consumer=data_consumer)
|
||||||
|
if not data.endswith(b'\x04'):
|
||||||
|
raise PyboardError('timeout waiting for first EOF reception')
|
||||||
|
data = data[:-1]
|
||||||
|
|
||||||
|
# wait for error output
|
||||||
|
data_err = self.read_until(1, b'\x04', timeout=timeout)
|
||||||
|
if not data_err.endswith(b'\x04'):
|
||||||
|
raise PyboardError('timeout waiting for second EOF reception')
|
||||||
|
data_err = data_err[:-1]
|
||||||
|
|
||||||
|
# return normal and error output
|
||||||
|
return data, data_err
|
||||||
|
|
||||||
|
def exec_raw_no_follow(self, command):
|
||||||
|
if isinstance(command, bytes):
|
||||||
|
command_bytes = command
|
||||||
|
else:
|
||||||
|
command_bytes = bytes(command, encoding='utf8')
|
||||||
|
|
||||||
|
# check we have a prompt
|
||||||
|
data = self.read_until(1, b'>')
|
||||||
|
if not data.endswith(b'>'):
|
||||||
|
raise PyboardError('could not enter raw repl')
|
||||||
|
|
||||||
|
# write command
|
||||||
|
for i in range(0, len(command_bytes), 256):
|
||||||
|
self.serial.write(command_bytes[i:min(i + 256, len(command_bytes))])
|
||||||
|
time.sleep(0.01)
|
||||||
|
self.serial.write(b'\x04')
|
||||||
|
|
||||||
|
# check if we could exec command
|
||||||
|
data = self.serial.read(2)
|
||||||
|
if data != b'OK':
|
||||||
|
raise PyboardError('could not exec command')
|
||||||
|
|
||||||
|
def exec_raw(self, command, timeout=10, data_consumer=None):
|
||||||
|
self.exec_raw_no_follow(command)
|
||||||
|
return self.follow(timeout, data_consumer)
|
||||||
|
|
||||||
|
def eval(self, expression):
|
||||||
|
ret = self.exec_(f'print({expression})')
|
||||||
|
ret = ret.strip()
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def exec_(self, command, stream_output=False):
|
||||||
|
data_consumer = None
|
||||||
|
if stream_output:
|
||||||
|
data_consumer = stdout_write_bytes
|
||||||
|
ret, ret_err = self.exec_raw(command, data_consumer=data_consumer)
|
||||||
|
if ret_err:
|
||||||
|
raise PyboardError('exception', ret, ret_err)
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def execfile(self, filename, stream_output=False):
|
||||||
|
with open(filename, 'rb') as f:
|
||||||
|
pyfile = f.read()
|
||||||
|
return self.exec_(pyfile, stream_output=stream_output)
|
||||||
|
|
||||||
|
def get_time(self):
|
||||||
|
t = str(self.eval('pyb.RTC().datetime()'), encoding='utf8')[1:-1].split(', ')
|
||||||
|
return int(t[4]) * 3600 + int(t[5]) * 60 + int(t[6])
|
||||||
|
|
||||||
|
# in Python2 exec is a keyword so one must use "exec_"
|
||||||
|
# but for Python3 we want to provide the nicer version "exec"
|
||||||
|
setattr(Pyboard, "exec", Pyboard.exec_)
|
||||||
|
|
||||||
|
def execfile(filename, device='/dev/ttyACM0', baudrate=115200, user='micro', password='python'):
|
||||||
|
pyb = Pyboard(device, baudrate, user, password)
|
||||||
|
pyb.enter_raw_repl()
|
||||||
|
output = pyb.execfile(filename)
|
||||||
|
stdout_write_bytes(output)
|
||||||
|
pyb.exit_raw_repl()
|
||||||
|
pyb.close()
|
||||||
|
|
||||||
|
# def main():
|
||||||
|
# import argparse
|
||||||
|
# cmd_parser = argparse.ArgumentParser(description='Run scripts on the pyboard.')
|
||||||
|
# cmd_parser.add_argument('--device', default='/dev/ttyACM0', help='the serial device or the IP address of the pyboard')
|
||||||
|
# cmd_parser.add_argument('-b', '--baudrate', default=115200, help='the baud rate of the serial device')
|
||||||
|
# cmd_parser.add_argument('-u', '--user', default='micro', help='the telnet login username')
|
||||||
|
# cmd_parser.add_argument('-p', '--password', default='python', help='the telnet login password')
|
||||||
|
# cmd_parser.add_argument('-c', '--command', help='program passed in as string')
|
||||||
|
# cmd_parser.add_argument('-w', '--wait', default=0, type=int, help='seconds to wait for USB connected board to become available')
|
||||||
|
# cmd_parser.add_argument('--follow', action='store_true', help='follow the output after running the scripts [default if no scripts given]')
|
||||||
|
# cmd_parser.add_argument('files', nargs='*', help='input files')
|
||||||
|
# args = cmd_parser.parse_args()
|
||||||
|
|
||||||
|
# def execbuffer(buf):
|
||||||
|
# try:
|
||||||
|
# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
|
||||||
|
# pyb.enter_raw_repl()
|
||||||
|
# ret, ret_err = pyb.exec_raw(buf, timeout=None, data_consumer=stdout_write_bytes)
|
||||||
|
# pyb.exit_raw_repl()
|
||||||
|
# pyb.close()
|
||||||
|
# except PyboardError as er:
|
||||||
|
# print(er)
|
||||||
|
# sys.exit(1)
|
||||||
|
# except KeyboardInterrupt:
|
||||||
|
# sys.exit(1)
|
||||||
|
# if ret_err:
|
||||||
|
# stdout_write_bytes(ret_err)
|
||||||
|
# sys.exit(1)
|
||||||
|
|
||||||
|
# if args.command is not None:
|
||||||
|
# execbuffer(args.command.encode('utf-8'))
|
||||||
|
|
||||||
|
# for filename in args.files:
|
||||||
|
# with open(filename, 'rb') as f:
|
||||||
|
# pyfile = f.read()
|
||||||
|
# execbuffer(pyfile)
|
||||||
|
|
||||||
|
# if args.follow or (args.command is None and len(args.files) == 0):
|
||||||
|
# try:
|
||||||
|
# pyb = Pyboard(args.device, args.baudrate, args.user, args.password, args.wait)
|
||||||
|
# ret, ret_err = pyb.follow(timeout=None, data_consumer=stdout_write_bytes)
|
||||||
|
# pyb.close()
|
||||||
|
# except PyboardError as er:
|
||||||
|
# print(er)
|
||||||
|
# sys.exit(1)
|
||||||
|
# except KeyboardInterrupt:
|
||||||
|
# sys.exit(1)
|
||||||
|
# if ret_err:
|
||||||
|
# stdout_write_bytes(ret_err)
|
||||||
|
# sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
BUFFER_SIZE = 512 # Amount of data to read or write to the serial port at a time.
|
||||||
|
# This is kept small because small chips and USB to serial
|
||||||
|
# bridges usually have very small buffers.
|
||||||
|
|
||||||
|
|
||||||
|
class DirectoryExistsError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class Files(object):
|
||||||
|
"""Class to interact with a MicroPython board files over a serial connection.
|
||||||
|
Provides functions for listing, uploading, and downloading files from the
|
||||||
|
board's filesystem.
|
||||||
|
"""
|
||||||
|
|
||||||
|
__raw_repl_on: bool = False
|
||||||
|
|
||||||
|
def __init__(self, pyboard: Pyboard):
|
||||||
|
"""Initialize the MicroPython board files class using the provided pyboard
|
||||||
|
instance. In most cases you should create a Pyboard instance (from
|
||||||
|
pyboard.py) which connects to a board over a serial connection and pass
|
||||||
|
it in, but you can pass in other objects for testing, etc.
|
||||||
|
"""
|
||||||
|
self._pyboard = pyboard
|
||||||
|
|
||||||
|
def is_raw_repl_on(self):
|
||||||
|
return self.__raw_repl_on
|
||||||
|
|
||||||
|
def send_ctrl_a(self):
|
||||||
|
self._pyboard.send_ctrl_a() # ctrl-A: enter raw REPL
|
||||||
|
|
||||||
|
def send_ctrl_b(self):
|
||||||
|
self._pyboard.send_ctrl_b() # ctrl-B: exit raw REPL
|
||||||
|
|
||||||
|
def send_ctrl_c(self):
|
||||||
|
self._pyboard.send_ctrl_c() # ctrl-C twice: interrupt any running program
|
||||||
|
|
||||||
|
def send_ctrl_d(self):
|
||||||
|
self._pyboard.send_ctrl_d() # ctrl-D: soft reset
|
||||||
|
|
||||||
|
def ls(self, directory="/", long_format=False, recursive=True):
|
||||||
|
"""List the contents of the specified directory (or root if none is
|
||||||
|
specified). Returns a list of strings with the names of files in the
|
||||||
|
specified directory. If long_format is True then a list of 2-tuples
|
||||||
|
with the name and size (in bytes) of the item is returned. Note that
|
||||||
|
it appears the size of directories is not supported by MicroPython and
|
||||||
|
will always return 0 (i.e. no recursive size computation).
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Disabling for now, see https://github.com/adafruit/ampy/issues/55.
|
||||||
|
# # Make sure directory ends in a slash.
|
||||||
|
# if not directory.endswith("/"):
|
||||||
|
# directory += "/"
|
||||||
|
|
||||||
|
# Make sure directory starts with slash, for consistency.
|
||||||
|
# if not directory.startswith("/"):
|
||||||
|
# directory = "/" + directory
|
||||||
|
|
||||||
|
command = """\
|
||||||
|
try:
|
||||||
|
import os
|
||||||
|
except ImportError:
|
||||||
|
import uos as os\n"""
|
||||||
|
|
||||||
|
if recursive:
|
||||||
|
command += """\
|
||||||
|
def listdir(directory):
|
||||||
|
result = set()
|
||||||
|
|
||||||
|
def _listdir(dir_or_file):
|
||||||
|
try:
|
||||||
|
# if its a directory, then it should provide some children.
|
||||||
|
children = os.listdir(dir_or_file)
|
||||||
|
except OSError:
|
||||||
|
# probably a file. run stat() to confirm.
|
||||||
|
os.stat(dir_or_file)
|
||||||
|
result.add(dir_or_file)
|
||||||
|
else:
|
||||||
|
# probably a directory, add to result if empty.
|
||||||
|
if children:
|
||||||
|
# queue the children to be dealt with in next iteration.
|
||||||
|
for child in children:
|
||||||
|
# create the full path.
|
||||||
|
if dir_or_file == '/':
|
||||||
|
next = dir_or_file + child
|
||||||
|
else:
|
||||||
|
next = dir_or_file + '/' + child
|
||||||
|
|
||||||
|
_listdir(next)
|
||||||
|
else:
|
||||||
|
result.add(dir_or_file)
|
||||||
|
|
||||||
|
_listdir(directory)
|
||||||
|
return sorted(result)\n"""
|
||||||
|
else:
|
||||||
|
command += """\
|
||||||
|
def listdir(directory):
|
||||||
|
if directory == '/':
|
||||||
|
return sorted([directory + f for f in os.listdir(directory)])
|
||||||
|
else:
|
||||||
|
return sorted([directory + '/' + f for f in os.listdir(directory)])\n"""
|
||||||
|
|
||||||
|
# Execute os.listdir() command on the board.
|
||||||
|
if long_format:
|
||||||
|
command += f"""
|
||||||
|
r = []
|
||||||
|
for f in listdir('{directory}'):
|
||||||
|
size = os.stat(f)[6]
|
||||||
|
r.append(f'{{f}} ({{size}}b)')
|
||||||
|
print(r)
|
||||||
|
"""
|
||||||
|
else:
|
||||||
|
command += f"""
|
||||||
|
print(listdir('{directory}'))
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.__raw_repl_on = True
|
||||||
|
self._pyboard.enter_raw_repl()
|
||||||
|
output = self._pyboard.exec_(textwrap.dedent(command))
|
||||||
|
except PyboardError as ex:
|
||||||
|
# Check if this is an OSError #2, i.e. directory doesn't exist and
|
||||||
|
# rethrow it as something more descriptive.
|
||||||
|
if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1:
|
||||||
|
raise RuntimeError(f"No such directory: {directory}")
|
||||||
|
else:
|
||||||
|
raise ex
|
||||||
|
finally:
|
||||||
|
self._pyboard.exit_raw_repl()
|
||||||
|
self.__raw_repl_on = False
|
||||||
|
|
||||||
|
# Parse the result list and return it.
|
||||||
|
return ast.literal_eval(output.decode("utf-8"))
|
||||||
|
|
||||||
|
def mkdir(self, directory, exists_okay=False):
|
||||||
|
"""Create the specified directory. Note this cannot create a recursive
|
||||||
|
hierarchy of directories, instead each one should be created separately.
|
||||||
|
"""
|
||||||
|
# Execute os.mkdir command on the board.
|
||||||
|
command = f"""
|
||||||
|
try:
|
||||||
|
import os
|
||||||
|
except ImportError:
|
||||||
|
import uos as os
|
||||||
|
os.mkdir('{directory}')
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.__raw_repl_on = True
|
||||||
|
self._pyboard.enter_raw_repl()
|
||||||
|
self._pyboard.exec_(textwrap.dedent(command))
|
||||||
|
except PyboardError as ex:
|
||||||
|
# Check if this is an OSError #17, i.e. directory already exists.
|
||||||
|
if ex.args[2].decode("utf-8").find("OSError: [Errno 17] EEXIST") != -1:
|
||||||
|
if not exists_okay:
|
||||||
|
raise DirectoryExistsError(f"Directory already exists: {directory}")
|
||||||
|
finally:
|
||||||
|
self._pyboard.exit_raw_repl()
|
||||||
|
self.__raw_repl_on = False
|
||||||
|
|
||||||
|
def put(self, filename, data):
|
||||||
|
"""Create or update the specified file with the provided data.
|
||||||
|
"""
|
||||||
|
# Open the file for writing on the board and write chunks of data.
|
||||||
|
|
||||||
|
if filename.startswith('/'):
|
||||||
|
filename = filename[1:]
|
||||||
|
|
||||||
|
print(f'↑ {filename}', end='', flush=True)
|
||||||
|
size = len(data)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.__raw_repl_on = True
|
||||||
|
self._pyboard.enter_raw_repl()
|
||||||
|
self._pyboard.exec_(f"f = open('{filename}', 'wb')")
|
||||||
|
# Loop through and write a buffer size chunk of data at a time.
|
||||||
|
for i in range(0, size, BUFFER_SIZE):
|
||||||
|
chunk_size = min(BUFFER_SIZE, size - i)
|
||||||
|
chunk = repr(data[i : i + chunk_size])
|
||||||
|
# Make sure to send explicit byte strings (handles python 2 compatibility).
|
||||||
|
if not chunk.startswith("b"):
|
||||||
|
chunk = "b" + chunk
|
||||||
|
self._pyboard.exec_(f"f.write({chunk})")
|
||||||
|
self._pyboard.exec_("f.close()")
|
||||||
|
except Exception as ex:
|
||||||
|
print(' [x]')
|
||||||
|
raise ex
|
||||||
|
finally:
|
||||||
|
self._pyboard.exit_raw_repl()
|
||||||
|
self.__raw_repl_on = False
|
||||||
|
|
||||||
|
print(f' - {size}b [✓]')
|
||||||
|
return size
|
||||||
|
|
||||||
|
def get(self, filename):
|
||||||
|
"""Retrieve the contents of the specified file and return its contents
|
||||||
|
as a byte string.
|
||||||
|
"""
|
||||||
|
# Open the file and read it a few bytes at a time and print out the
|
||||||
|
# raw bytes. Be careful not to overload the UART buffer so only write
|
||||||
|
# a few bytes at a time, and don't use print since it adds newlines and
|
||||||
|
# expects string data.
|
||||||
|
command = f"""
|
||||||
|
import sys
|
||||||
|
import ubinascii
|
||||||
|
with open('{filename}', 'rb') as infile:
|
||||||
|
while True:
|
||||||
|
result = infile.read({BUFFER_SIZE})
|
||||||
|
if result == b'':
|
||||||
|
break
|
||||||
|
len = sys.stdout.write(ubinascii.hexlify(result))
|
||||||
|
"""
|
||||||
|
print(f'↓ {filename}', end='', flush=True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.__raw_repl_on = True
|
||||||
|
self._pyboard.enter_raw_repl()
|
||||||
|
output = self._pyboard.exec_(textwrap.dedent(command))
|
||||||
|
except PyboardError as ex:
|
||||||
|
print(' [x]')
|
||||||
|
# Check if this is an OSError #2, i.e. file doesn't exist and
|
||||||
|
# rethrow it as something more descriptive.
|
||||||
|
try:
|
||||||
|
if ex.args[2].decode("utf-8").find("OSError: [Errno 2] ENOENT") != -1:
|
||||||
|
raise RuntimeError(f"No such file: {filename}")
|
||||||
|
else:
|
||||||
|
raise ex
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
raise ex
|
||||||
|
finally:
|
||||||
|
self._pyboard.exit_raw_repl()
|
||||||
|
self.__raw_repl_on = False
|
||||||
|
|
||||||
|
print(f' - {int(len(output) / 2)}b [✓]')
|
||||||
|
return binascii.unhexlify(output)
|
||||||
|
|
||||||
|
def rm(self, filename):
|
||||||
|
"""Remove the specified file or directory."""
|
||||||
|
command = f"""
|
||||||
|
try:
|
||||||
|
import os
|
||||||
|
except ImportError:
|
||||||
|
import uos as os
|
||||||
|
os.remove('{filename}')
|
||||||
|
"""
|
||||||
|
print(f'↶ {filename}', end='')
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.__raw_repl_on = True
|
||||||
|
self._pyboard.enter_raw_repl()
|
||||||
|
self._pyboard.exec_(textwrap.dedent(command))
|
||||||
|
except PyboardError as ex:
|
||||||
|
print(' [x]')
|
||||||
|
message = ex.args[2].decode("utf-8")
|
||||||
|
# Check if this is an OSError #2, i.e. file/directory doesn't exist
|
||||||
|
# and rethrow it as something more descriptive.
|
||||||
|
if message.find("OSError: [Errno 2] ENOENT") != -1:
|
||||||
|
raise RuntimeError(f"No such file/directory: {filename}")
|
||||||
|
# Check for OSError #13, the directory isn't empty.
|
||||||
|
if message.find("OSError: [Errno 13] EACCES") != -1:
|
||||||
|
raise RuntimeError(f"Directory is not empty: {filename}")
|
||||||
|
else:
|
||||||
|
raise ex
|
||||||
|
finally:
|
||||||
|
self._pyboard.exit_raw_repl()
|
||||||
|
self.__raw_repl_on = False
|
||||||
|
|
||||||
|
print(' [✓]')
|
||||||
|
|
||||||
|
def rmdir(self, directory, missing_okay=False):
|
||||||
|
"""Forcefully remove the specified directory and all its children."""
|
||||||
|
# Build a script to walk an entire directory structure and delete every
|
||||||
|
# file and subfolder. This is tricky because MicroPython has no os.walk
|
||||||
|
# or similar function to walk folders, so this code does it manually
|
||||||
|
# with recursion and changing directories. For each directory it lists
|
||||||
|
# the files and deletes everything it can, i.e. all the files. Then
|
||||||
|
# it lists the files again and assumes they are directories (since they
|
||||||
|
# couldn't be deleted in the first pass) and recursively clears those
|
||||||
|
# subdirectories. Finally when finished clearing all the children the
|
||||||
|
# parent directory is deleted.
|
||||||
|
command = f"""
|
||||||
|
try:
|
||||||
|
import os
|
||||||
|
except ImportError:
|
||||||
|
import uos as os
|
||||||
|
def rmdir(directory):
|
||||||
|
os.chdir(directory)
|
||||||
|
for f in os.listdir():
|
||||||
|
try:
|
||||||
|
os.remove(f)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
for f in os.listdir():
|
||||||
|
rmdir(f)
|
||||||
|
os.chdir('..')
|
||||||
|
os.rmdir(directory)
|
||||||
|
rmdir('{directory}')
|
||||||
|
"""
|
||||||
|
print(f'↶ {directory}', end='')
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.__raw_repl_on = True
|
||||||
|
self._pyboard.enter_raw_repl()
|
||||||
|
self._pyboard.exec_(textwrap.dedent(command))
|
||||||
|
except PyboardError as ex:
|
||||||
|
message = ex.args[2].decode("utf-8")
|
||||||
|
# Check if this is an OSError #2, i.e. directory doesn't exist
|
||||||
|
# and rethrow it as something more descriptive.
|
||||||
|
if message.find("OSError: [Errno 2] ENOENT") != -1:
|
||||||
|
if not missing_okay:
|
||||||
|
print(' [x]')
|
||||||
|
raise RuntimeError(f"No such directory: {directory}")
|
||||||
|
finally:
|
||||||
|
self._pyboard.exit_raw_repl()
|
||||||
|
self.__raw_repl_on = False
|
||||||
|
|
||||||
|
print(' [✓]')
|
||||||
|
|
||||||
|
def run(self, filename, wait_output=True, stream_output=True):
|
||||||
|
"""Run the provided script and return its output. If wait_output is True
|
||||||
|
(default) then wait for the script to finish and then return its output,
|
||||||
|
otherwise just run the script and don't wait for any output.
|
||||||
|
If stream_output is True(default) then return None and print outputs to
|
||||||
|
stdout without buffering.
|
||||||
|
"""
|
||||||
|
output = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.__raw_repl_on = True
|
||||||
|
self._pyboard.enter_raw_repl()
|
||||||
|
|
||||||
|
if stream_output:
|
||||||
|
self._pyboard.execfile(filename, stream_output=True)
|
||||||
|
elif wait_output:
|
||||||
|
# Run the file and wait for output to return.
|
||||||
|
output = self._pyboard.execfile(filename)
|
||||||
|
else:
|
||||||
|
# Read the file and run it using lower level pyboard functions that
|
||||||
|
# won't wait for it to finish or return output.
|
||||||
|
with open(filename, "rb") as infile:
|
||||||
|
self._pyboard.exec_raw_no_follow(infile.read())
|
||||||
|
except Exception as e:
|
||||||
|
raise e
|
||||||
|
finally:
|
||||||
|
self._pyboard.exit_raw_repl()
|
||||||
|
self.__raw_repl_on = False
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
|
||||||
|
def run_on_board(self, filename, wait_output=True, stream_output=True):
|
||||||
|
"""Run the provided script and return its output. If wait_output is True
|
||||||
|
(default) then wait for the script to finish and then return its output,
|
||||||
|
otherwise just run the script and don't wait for any output.
|
||||||
|
If stream_output is True(default) then return None and print outputs to
|
||||||
|
stdout without buffering.
|
||||||
|
"""
|
||||||
|
command = textwrap.dedent(f'exec(open("{filename}").read())')
|
||||||
|
output = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.__raw_repl_on = True
|
||||||
|
self._pyboard.enter_raw_repl()
|
||||||
|
|
||||||
|
if stream_output:
|
||||||
|
self._pyboard.exec_(command, stream_output=True)
|
||||||
|
elif wait_output:
|
||||||
|
# Run the file and wait for output to return.
|
||||||
|
output = self._pyboard.exec_(command)
|
||||||
|
else:
|
||||||
|
# Read the file and run it using lower level pyboard functions that
|
||||||
|
# won't wait for it to finish or return output.
|
||||||
|
self._pyboard.exec_raw_no_follow(command)
|
||||||
|
except Exception as e:
|
||||||
|
message = str(e)
|
||||||
|
|
||||||
|
if message.find('OSError: [Errno 2] ENOENT') != -1:
|
||||||
|
reason = f'"{filename}" does not exists!'
|
||||||
|
elif message.find('OSError: [Errno 13] EACCES') != -1:
|
||||||
|
reason = f'"{filename}" access denied!'
|
||||||
|
elif message.find('OSError: [Errno 21] EISDIR') != -1:
|
||||||
|
reason = f'"{filename}" is a directory!'
|
||||||
|
else:
|
||||||
|
reason = 'Traceback (most recent call last):'
|
||||||
|
traceback = message.split(reason)
|
||||||
|
|
||||||
|
if len(traceback) == 2:
|
||||||
|
for call in traceback[1][:-2].split('\\r\\n'):
|
||||||
|
reason += f'{call}\n'
|
||||||
|
|
||||||
|
raise Exception(reason.strip())
|
||||||
|
finally:
|
||||||
|
self._pyboard.exit_raw_repl()
|
||||||
|
self.__raw_repl_on = False
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
def exec(self, command):
|
||||||
|
try:
|
||||||
|
self.__raw_repl_on = True
|
||||||
|
self._pyboard.enter_raw_repl()
|
||||||
|
output = self._pyboard.exec_(textwrap.dedent(command), stream_output=True)
|
||||||
|
except Exception as e:
|
||||||
|
message = str(e)
|
||||||
|
reason = 'Traceback (most recent call last):'
|
||||||
|
traceback = message.split(reason)
|
||||||
|
|
||||||
|
if len(traceback) == 2:
|
||||||
|
for call in traceback[1][:-2].split('\\r\\n'):
|
||||||
|
reason += f'{call}\n'
|
||||||
|
|
||||||
|
raise Exception(reason.strip())
|
||||||
|
finally:
|
||||||
|
self._pyboard.exit_raw_repl()
|
||||||
|
self.__raw_repl_on = False
|
||||||
|
|
||||||
|
return output
|
||||||
|
|
||||||
|
|
||||||
print('Welcome to picowatch lib.')
|
print('Welcome to picowatch lib.')
|
||||||
pico = False
|
pico = False
|
||||||
|
|
||||||
@@ -168,6 +978,7 @@ watchdog = Observer()
|
|||||||
watchdog.schedule(watchdog_event, path = WATCHING_DIRECTORY, recursive = True)
|
watchdog.schedule(watchdog_event, path = WATCHING_DIRECTORY, recursive = True)
|
||||||
watchdog.start()
|
watchdog.start()
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@@ -181,8 +992,6 @@ try:
|
|||||||
sys.exit()
|
sys.exit()
|
||||||
case ['reboot' | 'reset']:
|
case ['reboot' | 'reset']:
|
||||||
pico.send_ctrl_d()
|
pico.send_ctrl_d()
|
||||||
case ['autosave', value]:
|
|
||||||
sessions['autosave'] = (value == '1' or value == 'on')
|
|
||||||
case ['ls']:
|
case ['ls']:
|
||||||
ls('/')
|
ls('/')
|
||||||
case ['ls' | 'stat', source]:
|
case ['ls' | 'stat', source]:
|
||||||
@@ -195,11 +1004,8 @@ try:
|
|||||||
upload(source)
|
upload(source)
|
||||||
case ['download' | 'backup', source]:
|
case ['download' | 'backup', source]:
|
||||||
download(source)
|
download(source)
|
||||||
case ['save']:
|
case ['' | 'save']:
|
||||||
watchdog_callback()
|
watchdog_callback()
|
||||||
case ['sync', filename]:
|
|
||||||
watchdog_callback()
|
|
||||||
launch(filename)
|
|
||||||
case _:
|
case _:
|
||||||
if message.startswith('./'):
|
if message.startswith('./'):
|
||||||
launch(message[2:])
|
launch(message[2:])
|
||||||
|
|||||||
Reference in New Issue
Block a user