2022-12-31 01:56:48 +01:00
|
|
|
#!/usr/bin/env python
|
|
|
|
|
|
|
|
|
|
# Primadiag SAS - Paris
|
|
|
|
|
# Author: Gino D.
|
|
|
|
|
# Copyright (c) 2023 Primadiag
|
|
|
|
|
#
|
|
|
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
|
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
|
|
|
# in the Software without restriction, including without limitation the rights
|
|
|
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
|
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
|
|
|
# furnished to do so, subject to the following conditions:
|
|
|
|
|
#
|
|
|
|
|
# The above copyright notice and this permission notice shall be included in all
|
|
|
|
|
# copies or substantial portions of the Software.
|
|
|
|
|
#
|
|
|
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
|
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
|
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
|
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
|
|
|
# SOFTWARE
|
|
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
import os
|
2022-12-29 16:37:55 +01:00
|
|
|
import sys
|
|
|
|
|
import time
|
2022-12-30 23:33:59 +01:00
|
|
|
import json
|
2022-12-31 03:04:11 +01:00
|
|
|
import signal
|
2022-12-29 17:20:41 +01:00
|
|
|
import binascii
|
2022-12-30 23:33:59 +01:00
|
|
|
import textwrap
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-31 03:36:55 +01:00
|
|
|
from serial import Serial
|
2022-12-31 01:56:48 +01:00
|
|
|
from watchdog.observers import Observer
|
|
|
|
|
from watchdog.events import PatternMatchingEventHandler
|
2022-12-31 03:36:55 +01:00
|
|
|
from typing import Dict, List, Optional, Tuple, Union
|
2022-12-29 17:20:41 +01:00
|
|
|
|
|
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
BUFFER_SIZE: int = 256
|
2022-12-31 04:11:56 +01:00
|
|
|
PRINT_STRPAD = '\t\t =>'
|
2022-12-31 01:56:48 +01:00
|
|
|
|
|
|
|
|
|
2022-12-31 03:36:55 +01:00
|
|
|
class Telnet:
|
|
|
|
|
|
|
|
|
|
def __init__(self, IP: str, login: str, password: str):
|
|
|
|
|
import telnetlib
|
|
|
|
|
from collections import deque
|
|
|
|
|
|
|
|
|
|
self.tn = telnetlib.Telnet(IP, timeout=15)
|
|
|
|
|
self.fifo = deque()
|
|
|
|
|
|
|
|
|
|
if b'Login as:' in self.tn.read_until(b'Login as:'):
|
|
|
|
|
self.tn.write(bytes(login, 'ascii') + b'\r\n')
|
|
|
|
|
|
|
|
|
|
if b'Password:' in self.tn.read_until(b'Password:'):
|
|
|
|
|
time.sleep(0.2)
|
|
|
|
|
self.tn.write(bytes(password, 'ascii') + b'\r\n')
|
|
|
|
|
|
|
|
|
|
if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.'):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
raise Exception('Failed to establish a Telnet connection with the board')
|
|
|
|
|
|
|
|
|
|
def __del__(self):
|
|
|
|
|
self.close()
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
try:
|
|
|
|
|
self.tn.close()
|
|
|
|
|
except:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def read(self, size: int = 1) -> bytes:
|
|
|
|
|
timeout = 0
|
|
|
|
|
|
|
|
|
|
while len(self.fifo) < size and not timeout >= 8:
|
|
|
|
|
timeout = 0
|
|
|
|
|
data = self.tn.read_eager()
|
|
|
|
|
|
|
|
|
|
if len(data):
|
|
|
|
|
self.fifo.extend(data)
|
|
|
|
|
timeout = 0
|
|
|
|
|
else:
|
|
|
|
|
timeout += 1
|
|
|
|
|
time.sleep(0.25)
|
|
|
|
|
|
|
|
|
|
data = b''
|
|
|
|
|
|
|
|
|
|
while len(data) < size and len(self.fifo) > 0:
|
|
|
|
|
data += bytes([self.fifo.popleft()])
|
|
|
|
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
def write(self, data: bytes):
|
|
|
|
|
self.tn.write(data)
|
|
|
|
|
return len(data)
|
|
|
|
|
|
|
|
|
|
def inWaiting(self) -> int:
|
|
|
|
|
n_waiting = len(self.fifo)
|
|
|
|
|
|
|
|
|
|
if not n_waiting:
|
|
|
|
|
data = self.tn.read_eager()
|
|
|
|
|
self.fifo.extend(data)
|
|
|
|
|
n_waiting = len(data)
|
|
|
|
|
|
|
|
|
|
return n_waiting
|
2022-12-31 01:56:48 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class Pyboard(object):
|
2022-12-31 03:36:55 +01:00
|
|
|
serial: Union[Serial, Telnet]
|
|
|
|
|
|
|
|
|
|
def __init__(self, device: str, baudrate: int = 115200, login: str = '', password: str = ''):
|
|
|
|
|
is_telnet = device and device.count('.') == 3
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
for _ in range(0, 3):
|
|
|
|
|
try:
|
2022-12-31 03:36:55 +01:00
|
|
|
if is_telnet:
|
|
|
|
|
self.serial = Telnet(device, login, password)
|
|
|
|
|
else:
|
|
|
|
|
self.serial = Serial(device, baudrate=baudrate, interCharTimeout=1)
|
2022-12-30 23:33:59 +01:00
|
|
|
break
|
|
|
|
|
except:
|
|
|
|
|
time.sleep(1)
|
|
|
|
|
else:
|
|
|
|
|
raise Exception(f'Failed to access {device}')
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def close(self):
|
|
|
|
|
self.serial.close()
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def stdout_write_bytes(self, data: str):
|
|
|
|
|
sys.stdout.buffer.write(data.replace(b'\x04', b''))
|
|
|
|
|
sys.stdout.buffer.flush()
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def send_ok(self) -> bytes:
|
|
|
|
|
self.serial.write(b'\x04')
|
|
|
|
|
return b'OK'
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def send_ctrl_a(self) -> bytes:
|
|
|
|
|
# ctrl-A: enter raw REPL
|
|
|
|
|
self.serial.write(b'\x01')
|
|
|
|
|
return b'raw REPL; CTRL-B to exit\r\n>'
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def send_ctrl_b(self):
|
|
|
|
|
# ctrl-B: exit raw REPL
|
|
|
|
|
self.serial.write(b'\x02')
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-31 04:10:28 +01:00
|
|
|
def send_ctrl_c(self) -> bytes:
|
2022-12-30 23:33:59 +01:00
|
|
|
# ctrl-C twice: interrupt any running program
|
2022-12-31 04:10:28 +01:00
|
|
|
for _ in range(0, 2):
|
2022-12-30 23:33:59 +01:00
|
|
|
self.serial.write(b'\x03')
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
return b'raw REPL; CTRL-B to exit\r\n'
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def send_ctrl_d(self) -> bytes:
|
|
|
|
|
# ctrl-D: soft reset
|
|
|
|
|
self.serial.write(b'\x04')
|
|
|
|
|
return b'soft reboot\r\n'
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def read_until(self, delimiter: bytes, stream_output: bool = False) -> Optional[bytes]:
|
|
|
|
|
data = self.serial.read(1)
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
if stream_output:
|
|
|
|
|
self.stdout_write_bytes(data)
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
timeout = 0
|
|
|
|
|
max_len = len(delimiter)
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
while not timeout >= 1000:
|
|
|
|
|
if data.endswith(delimiter):
|
|
|
|
|
return data
|
|
|
|
|
elif self.serial.inWaiting() > 0:
|
|
|
|
|
timeout = 0
|
|
|
|
|
stream_data = self.serial.read(1)
|
|
|
|
|
data += stream_data
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
if stream_output:
|
|
|
|
|
self.stdout_write_bytes(stream_data)
|
|
|
|
|
data = data[-max_len:]
|
|
|
|
|
else:
|
|
|
|
|
timeout += 1
|
|
|
|
|
time.sleep(0.001)
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def __enter__(self):
|
|
|
|
|
self.send_ctrl_c()
|
2022-12-29 17:20:41 +01:00
|
|
|
n = self.serial.inWaiting()
|
2022-12-30 23:33:59 +01:00
|
|
|
|
2022-12-29 17:20:41 +01:00
|
|
|
while n > 0:
|
|
|
|
|
self.serial.read(n)
|
|
|
|
|
n = self.serial.inWaiting()
|
|
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
for _ in range(0, 5):
|
|
|
|
|
if self.read_until(self.send_ctrl_a()):
|
2022-12-29 17:20:41 +01:00
|
|
|
break
|
2022-12-30 23:33:59 +01:00
|
|
|
|
|
|
|
|
time.sleep(0.01)
|
2022-12-29 17:20:41 +01:00
|
|
|
else:
|
2022-12-30 23:33:59 +01:00
|
|
|
raise Exception('REPL: could not enter')
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
if not self.read_until(self.send_ctrl_d()):
|
|
|
|
|
raise Exception('REPL: could not soft reboot')
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
if not self.read_until(self.send_ctrl_c()):
|
|
|
|
|
raise Exception('REPL: could not interrupt after soft reboot')
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
return self.__terminal
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def __exit__(self, a, b, c):
|
|
|
|
|
self.send_ctrl_b()
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def __terminal(self, command: str, stream_output: bool = False, catch_output: bool = True) -> Optional[str]:
|
|
|
|
|
command = textwrap.dedent(command)
|
|
|
|
|
# send input
|
|
|
|
|
if not isinstance(command, bytes):
|
|
|
|
|
command = bytes(command, encoding='utf8')
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
if not self.read_until(b'>'):
|
|
|
|
|
raise Exception('Terminal: prompt has been lost')
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
for i in range(0, len(command), BUFFER_SIZE):
|
|
|
|
|
self.serial.write(command[i: min(i + BUFFER_SIZE, len(command))])
|
2022-12-30 23:33:59 +01:00
|
|
|
time.sleep(0.001)
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
if not self.read_until(self.send_ok()):
|
|
|
|
|
raise Exception('Terminal: could not execute command')
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
if not catch_output:
|
|
|
|
|
return
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
# catch output
|
|
|
|
|
data = self.read_until(b'\x04', stream_output=stream_output)
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
if not data:
|
|
|
|
|
raise Exception('Terminal: timeout waiting for first EOF reception')
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
exception = self.read_until(b'\x04')
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
if not exception:
|
|
|
|
|
raise Exception('Terminal: timeout waiting for second EOF reception')
|
2022-12-29 20:08:50 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
data, exception = (data[:-1].decode('utf-8'), exception[:-1].decode('utf-8'))
|
2022-12-29 20:08:50 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
if exception:
|
2022-12-29 20:08:50 +01:00
|
|
|
reason = 'Traceback (most recent call last):'
|
2022-12-30 23:33:59 +01:00
|
|
|
traceback = exception.split(reason)
|
2022-12-29 20:08:50 +01:00
|
|
|
|
|
|
|
|
if len(traceback) == 2:
|
|
|
|
|
for call in traceback[1][:-2].split('\\r\\n'):
|
|
|
|
|
reason += f'{call}\n'
|
|
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
raise Exception(f'-------\n{reason.strip()}')
|
2022-12-30 23:33:59 +01:00
|
|
|
else:
|
2022-12-31 01:56:48 +01:00
|
|
|
raise Exception(f'-------\n{exception}')
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
return data.strip()
|
2022-12-29 17:20:41 +01:00
|
|
|
|
|
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
class FileSystem(object):
|
2022-12-30 23:33:59 +01:00
|
|
|
_pyboard: Pyboard
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def __init__(self, pyboard: Pyboard):
|
|
|
|
|
self._pyboard = pyboard
|
|
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
def checksum(self, source: str, data: str) -> str:
|
2022-12-30 23:33:59 +01:00
|
|
|
output = self.terminal(f"""
|
|
|
|
|
def checksum(data):
|
|
|
|
|
v = 21
|
|
|
|
|
for c in data.decode("utf-8"):
|
|
|
|
|
v ^= ord(c)
|
|
|
|
|
return v
|
|
|
|
|
|
|
|
|
|
with open("{source}", "rb") as fh:
|
|
|
|
|
print(checksum(fh.read()))
|
|
|
|
|
""")
|
|
|
|
|
|
|
|
|
|
if isinstance(data, bytes):
|
|
|
|
|
data = data.decode('utf-8')
|
|
|
|
|
|
|
|
|
|
v = 21
|
|
|
|
|
for c in data:
|
|
|
|
|
v ^= ord(c)
|
|
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
return f'{v}:{output}'
|
2022-12-30 23:33:59 +01:00
|
|
|
|
|
|
|
|
def get(self, filename: str) -> Tuple[bytes, bool]:
|
|
|
|
|
if not filename.startswith('/'):
|
|
|
|
|
filename = '/' + filename
|
|
|
|
|
|
|
|
|
|
output = self.terminal(f"""
|
2022-12-29 17:20:41 +01:00
|
|
|
import sys
|
|
|
|
|
import ubinascii
|
|
|
|
|
with open('{filename}', 'rb') as infile:
|
|
|
|
|
while True:
|
2022-12-31 01:56:48 +01:00
|
|
|
result = infile.read({BUFFER_SIZE})
|
2022-12-29 17:20:41 +01:00
|
|
|
if result == b'':
|
|
|
|
|
break
|
2022-12-30 23:33:59 +01:00
|
|
|
sys.stdout.write(ubinascii.hexlify(result))
|
|
|
|
|
""")
|
|
|
|
|
output = binascii.unhexlify(output)
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
return (output, self.checksum(filename, output))
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
def download(self, source: str, destination: str) -> str:
|
2022-12-30 23:33:59 +01:00
|
|
|
output, checksum = self.get(source)
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
with open(destination, 'wb') as fh:
|
|
|
|
|
fh.write(output)
|
|
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
return checksum
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def put(self, filename: str, data: bytes) -> Tuple[str, bool]:
|
|
|
|
|
if not filename.startswith('/'):
|
|
|
|
|
filename = '/' + filename
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
if not isinstance(data, bytes):
|
|
|
|
|
data = bytes(data, encoding='utf8')
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
try:
|
2022-12-29 16:37:55 +01:00
|
|
|
if os.path.dirname(filename):
|
2022-12-30 23:33:59 +01:00
|
|
|
self.mkdir(os.path.dirname(filename))
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
with self._pyboard as terminal:
|
|
|
|
|
size = len(data)
|
|
|
|
|
terminal(f"""fh = open("{filename}", "wb")""")
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
for i in range(0, size, BUFFER_SIZE):
|
|
|
|
|
chunk_size = min(BUFFER_SIZE, size - i)
|
2022-12-30 23:33:59 +01:00
|
|
|
chunk = repr(data[i : i + chunk_size])
|
|
|
|
|
terminal(f"""fh.write({chunk})""")
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
terminal("""fh.close()""")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
raise e
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
return (filename, self.checksum(filename, data))
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
def upload(self, source: str, destination: str) -> str:
|
2022-12-30 23:33:59 +01:00
|
|
|
with open(source, 'rb') as fh:
|
2022-12-31 01:56:48 +01:00
|
|
|
_, checksum = self.put(destination, fh.read())
|
|
|
|
|
|
|
|
|
|
return checksum
|
2022-12-29 20:08:50 +01:00
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
def ls(self, dirname: str = '/') -> Tuple[int, List, str]:
|
2022-12-30 23:33:59 +01:00
|
|
|
if not dirname.startswith('/'):
|
|
|
|
|
dirname = '/' + dirname
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
output = self.terminal(f"""
|
|
|
|
|
try:
|
|
|
|
|
import os
|
|
|
|
|
import json
|
|
|
|
|
except ImportError:
|
|
|
|
|
import uos as os
|
|
|
|
|
import ujson as json
|
|
|
|
|
|
|
|
|
|
def ls(dirname):
|
2022-12-31 01:56:48 +01:00
|
|
|
e = []
|
|
|
|
|
s = os.stat(dirname)
|
|
|
|
|
if s[0] == 0x4000:
|
|
|
|
|
if not dirname.endswith("/"):
|
|
|
|
|
dirname += "/"
|
|
|
|
|
for t in os.ilistdir(dirname):
|
|
|
|
|
if t[1] == 0x4000:
|
|
|
|
|
e.append((dirname + t[0] + '/', -1))
|
|
|
|
|
e.extend(ls(dirname + t[0] + '/'))
|
|
|
|
|
else:
|
|
|
|
|
e.append((dirname + t[0], os.stat(dirname + t[0])[6]))
|
|
|
|
|
else:
|
|
|
|
|
e.append((dirname, s[6]))
|
|
|
|
|
return e
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
s = 1
|
|
|
|
|
r = ls("{dirname}")
|
|
|
|
|
x = ''
|
|
|
|
|
except Exception as e:
|
|
|
|
|
s = 0
|
|
|
|
|
r = [("{dirname}", -2)]
|
|
|
|
|
x = str(e)
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
print(json.dumps([s, r, x]))
|
2022-12-30 23:33:59 +01:00
|
|
|
""")
|
|
|
|
|
|
|
|
|
|
return json.loads(output)
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
def rm(self, filename: str) -> Tuple[int, List, str]:
|
2022-12-30 23:33:59 +01:00
|
|
|
if not filename.startswith('/'):
|
|
|
|
|
filename = '/' + filename
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
output = self.terminal(f"""
|
|
|
|
|
try:
|
|
|
|
|
import os
|
|
|
|
|
import json
|
|
|
|
|
except ImportError:
|
|
|
|
|
import uos as os
|
|
|
|
|
import ujson as json
|
|
|
|
|
|
|
|
|
|
def ls(dirname):
|
2022-12-31 01:56:48 +01:00
|
|
|
e = []
|
2022-12-30 23:33:59 +01:00
|
|
|
if not dirname.endswith("/"):
|
|
|
|
|
dirname += "/"
|
|
|
|
|
for t in os.ilistdir(dirname):
|
|
|
|
|
if t[1] == 0x4000:
|
2022-12-31 01:56:48 +01:00
|
|
|
e.append((dirname + t[0] + '/', -1))
|
|
|
|
|
e.extend(ls(dirname + t[0] + '/'))
|
2022-12-30 23:33:59 +01:00
|
|
|
else:
|
2022-12-31 01:56:48 +01:00
|
|
|
e.append((dirname + t[0], os.stat(dirname + t[0])[6]))
|
|
|
|
|
return e
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def rm(filename):
|
|
|
|
|
r = []
|
|
|
|
|
if os.stat(filename)[0] == 0x4000:
|
2022-12-31 01:56:48 +01:00
|
|
|
e = ls(filename)
|
2022-12-31 03:04:11 +01:00
|
|
|
if filename != '/':
|
|
|
|
|
e.append((filename, -1))
|
2022-12-31 01:56:48 +01:00
|
|
|
for f, s in e:
|
|
|
|
|
if not s == -1:
|
|
|
|
|
try:
|
|
|
|
|
os.remove(f)
|
|
|
|
|
r.append((f, 1, ''))
|
|
|
|
|
except Exception as e:
|
|
|
|
|
r.append((f, 0, str(e)))
|
|
|
|
|
for f, s in e:
|
|
|
|
|
if s == -1:
|
|
|
|
|
try:
|
|
|
|
|
os.rmdir(f)
|
|
|
|
|
r.append((f, 1, ''))
|
|
|
|
|
except Exception as e:
|
|
|
|
|
r.append((f, 0, str(e)))
|
2022-12-30 23:33:59 +01:00
|
|
|
else:
|
|
|
|
|
try:
|
|
|
|
|
os.remove(filename)
|
2022-12-31 01:56:48 +01:00
|
|
|
r.append((filename, 1, ''))
|
2022-12-30 23:33:59 +01:00
|
|
|
except Exception as e:
|
|
|
|
|
r.append((filename, 0, str(e)))
|
|
|
|
|
return r
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
try:
|
2022-12-31 01:56:48 +01:00
|
|
|
s = 1
|
|
|
|
|
r = rm("{filename}")
|
|
|
|
|
x = ''
|
2022-12-30 23:33:59 +01:00
|
|
|
except Exception as e:
|
2022-12-31 01:56:48 +01:00
|
|
|
s = 0
|
|
|
|
|
r = [("{filename}", 0, str(e))]
|
|
|
|
|
x = str(e)
|
|
|
|
|
|
|
|
|
|
print(json.dumps([s, r, x]))
|
2022-12-30 23:33:59 +01:00
|
|
|
""")
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
return json.loads(output)
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def mkdir(self, dirname: str) -> List:
|
|
|
|
|
if not dirname.startswith('/'):
|
|
|
|
|
dirname = '/' + dirname
|
|
|
|
|
|
|
|
|
|
if dirname.endswith('/'):
|
|
|
|
|
dirname = dirname[:-1]
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
output = self.terminal(f"""
|
|
|
|
|
try:
|
|
|
|
|
import os
|
|
|
|
|
import json
|
|
|
|
|
except ImportError:
|
|
|
|
|
import uos as os
|
|
|
|
|
import ujson as json
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
r = []
|
|
|
|
|
d = []
|
|
|
|
|
for zd in str("{dirname}").split("/"):
|
|
|
|
|
if not zd:
|
|
|
|
|
continue
|
|
|
|
|
d.append(zd)
|
|
|
|
|
zd = "/".join(d)
|
|
|
|
|
try:
|
|
|
|
|
os.mkdir(zd)
|
|
|
|
|
r.append(("/" + zd, 1))
|
|
|
|
|
except Exception as e:
|
|
|
|
|
if str(e).find('EEXIST'):
|
|
|
|
|
r.append(("/" + zd, 1))
|
|
|
|
|
else:
|
|
|
|
|
r.append(("/" + zd, 0, str(e)))
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
print(json.dumps(r))
|
|
|
|
|
""")
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
return json.loads(output)
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def launch(self, filename: str):
|
|
|
|
|
try:
|
|
|
|
|
self.terminal(f"""
|
|
|
|
|
with open("{filename}", "r") as fh:
|
|
|
|
|
exec(fh.read())
|
|
|
|
|
""", stream_output=True)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(str(e))
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-30 23:33:59 +01:00
|
|
|
def terminal(self, command: str, stream_output: bool = False) -> str:
|
|
|
|
|
with self._pyboard as terminal:
|
|
|
|
|
try:
|
|
|
|
|
return terminal(command, stream_output=stream_output)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
raise e
|
2022-12-29 16:37:55 +01:00
|
|
|
|
|
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
class Picowatch(object):
|
|
|
|
|
_fs: FileSystem
|
|
|
|
|
|
|
|
|
|
def __init__(self, pyboard: Pyboard):
|
|
|
|
|
self._fs = FileSystem(pyboard)
|
2022-12-31 04:10:28 +01:00
|
|
|
signal.signal(signal.SIGINT, lambda a, b: self.interupt())
|
|
|
|
|
|
|
|
|
|
def interupt(self):
|
|
|
|
|
self._fs._pyboard.send_ctrl_c()
|
|
|
|
|
|
|
|
|
|
def terminal(self, command: str):
|
|
|
|
|
self._fs.terminal(command, stream_output=True)
|
2022-12-31 01:56:48 +01:00
|
|
|
|
|
|
|
|
def listing(self, remote: str = '/'):
|
|
|
|
|
status, output, exception = self._fs.ls(remote)
|
|
|
|
|
|
|
|
|
|
if status:
|
|
|
|
|
for name, size in output:
|
|
|
|
|
if size == -1:
|
2022-12-31 03:04:11 +01:00
|
|
|
print('[.]', name[1:])
|
2022-12-31 01:56:48 +01:00
|
|
|
else:
|
2022-12-31 03:04:11 +01:00
|
|
|
print('[:]', name[1:], f'({size}b)')
|
2022-12-31 01:56:48 +01:00
|
|
|
else:
|
2022-12-31 04:11:56 +01:00
|
|
|
print('[?]', remote, PRINT_STRPAD, exception)
|
2022-12-31 01:56:48 +01:00
|
|
|
|
2022-12-31 03:04:11 +01:00
|
|
|
def contents(self, remote: str):
|
|
|
|
|
try:
|
|
|
|
|
content, checksum = self._fs.get(remote)
|
|
|
|
|
|
|
|
|
|
for ln in content.decode('utf-8').split('\n'):
|
|
|
|
|
print(ln)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print('[?]', remote, f'\n{str(e)}')
|
|
|
|
|
|
|
|
|
|
def upload(self, filepath: str):
|
|
|
|
|
local = filepath
|
|
|
|
|
|
2022-12-31 03:36:55 +01:00
|
|
|
if local.startswith('/'):
|
|
|
|
|
local = '.' + local
|
|
|
|
|
|
2022-12-31 03:04:11 +01:00
|
|
|
if not local.startswith('./'):
|
|
|
|
|
local = './' + local
|
|
|
|
|
|
|
|
|
|
queue = []
|
|
|
|
|
|
|
|
|
|
if os.path.isdir(local):
|
|
|
|
|
for root, _, files in os.walk(local, followlinks=True):
|
|
|
|
|
for filename in files:
|
|
|
|
|
filename = os.path.join(root, filename).replace('\\', '/')
|
|
|
|
|
queue.append((filename, filename[1:]))
|
|
|
|
|
|
|
|
|
|
elif os.path.exists(local):
|
|
|
|
|
queue.append((local, filepath))
|
|
|
|
|
|
|
|
|
|
for filename, remote in queue:
|
|
|
|
|
try:
|
2022-12-31 04:11:56 +01:00
|
|
|
print('[↑]', filename, PRINT_STRPAD, self._fs.upload(filename, remote))
|
2022-12-31 03:04:11 +01:00
|
|
|
except Exception as e:
|
2022-12-31 04:11:56 +01:00
|
|
|
print('[?]', filename, PRINT_STRPAD, str(e))
|
2022-12-31 01:56:48 +01:00
|
|
|
|
|
|
|
|
def download(self, filepath: str):
|
2022-12-31 03:04:11 +01:00
|
|
|
if filepath.startswith('.'):
|
|
|
|
|
filepath = filepath[1:]
|
|
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
status, output, exception = self._fs.ls(filepath)
|
2022-12-29 16:37:55 +01:00
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
if status:
|
|
|
|
|
for remote, size in output:
|
|
|
|
|
if size == -1:
|
2022-12-31 03:04:11 +01:00
|
|
|
os.makedirs(f'.{remote}', 777, exist_ok=True)
|
2022-12-29 17:20:41 +01:00
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
for remote, size in output:
|
2022-12-31 03:04:11 +01:00
|
|
|
local = f'.{remote}'
|
|
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
if not size == -1:
|
|
|
|
|
try:
|
2022-12-31 04:11:56 +01:00
|
|
|
print('[↓]', local, PRINT_STRPAD, self._fs.download(remote, local))
|
2022-12-31 01:56:48 +01:00
|
|
|
except Exception as e:
|
2022-12-31 04:11:56 +01:00
|
|
|
print('[?]', local, PRINT_STRPAD, str(e))
|
2022-12-31 01:56:48 +01:00
|
|
|
else:
|
2022-12-31 04:11:56 +01:00
|
|
|
print('[?]', filepath, PRINT_STRPAD, exception)
|
2022-12-31 01:56:48 +01:00
|
|
|
|
|
|
|
|
def delete(self, filepath: str):
|
|
|
|
|
status, output, exception = self._fs.rm(filepath)
|
|
|
|
|
|
|
|
|
|
if status:
|
|
|
|
|
for remote, checked, message in output:
|
2022-12-31 03:04:11 +01:00
|
|
|
local = f'.{remote}'
|
|
|
|
|
|
2022-12-31 01:56:48 +01:00
|
|
|
if checked:
|
2022-12-31 03:04:11 +01:00
|
|
|
print('[-]', local)
|
2022-12-31 01:56:48 +01:00
|
|
|
else:
|
2022-12-31 04:11:56 +01:00
|
|
|
print('[?]', local, PRINT_STRPAD, message)
|
2022-12-31 01:56:48 +01:00
|
|
|
else:
|
2022-12-31 04:11:56 +01:00
|
|
|
print('[?]', filepath, PRINT_STRPAD, exception)
|
2022-12-31 01:56:48 +01:00
|
|
|
|
2022-12-31 03:04:11 +01:00
|
|
|
def launch(self, filepath: str):
|
|
|
|
|
self._fs.launch(filepath)
|
2022-12-31 01:56:48 +01:00
|
|
|
|
2022-12-31 04:10:28 +01:00
|
|
|
def watch(self, filename: str):
|
|
|
|
|
if filename.startswith('/'):
|
|
|
|
|
filename = '.' + filename
|
|
|
|
|
|
|
|
|
|
if not filename.startswith('./'):
|
|
|
|
|
filename = './' + filename
|
|
|
|
|
|
|
|
|
|
with open(filename, 'r') as fh:
|
|
|
|
|
self._fs.terminal(fh.read(), stream_output=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print('Welcome to picowatch lib.')
|
|
|
|
|
# picowatch = False
|
|
|
|
|
|
|
|
|
|
# while not picowatch:
|
|
|
|
|
# print('-' * 30)
|
|
|
|
|
# device = input('Port: ').strip()
|
|
|
|
|
# baudrate = input('Baudrate (115200): ').strip() or 115200
|
|
|
|
|
|
|
|
|
|
# try:
|
|
|
|
|
# picowatch = Picowatch(Pyboard(device=device, baudrate=baudrate))
|
|
|
|
|
# print(f'Connected to device: {device} at a baudrate of: {baudrate}')
|
|
|
|
|
# print('-' * 30)
|
|
|
|
|
# except Exception as e:
|
|
|
|
|
# print(str(e))
|
|
|
|
|
|
|
|
|
|
picowatch = Picowatch(Pyboard('COM5'))
|
|
|
|
|
picowatch.interupt()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# sessions = {'deleted': set(), 'modified': set()}
|
|
|
|
|
|
|
|
|
|
# def on_modified_callback(event):
|
|
|
|
|
# if event.is_directory == True:
|
|
|
|
|
# return
|
|
|
|
|
|
|
|
|
|
# source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/')
|
|
|
|
|
# sessions['modified'].add(source)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# def on_deleted_callback(event):
|
|
|
|
|
# source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/')
|
|
|
|
|
|
|
|
|
|
# if event.is_directory == True and not source.endswith('/'):
|
|
|
|
|
# source += '/'
|
|
|
|
|
# elif len(source.split('.')) == 1:
|
|
|
|
|
# source += '/'
|
|
|
|
|
|
|
|
|
|
# sessions['deleted'].add(source)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# def watchdog_callback():
|
|
|
|
|
# for source in sessions['deleted']:
|
|
|
|
|
# delete(source, is_directory=source.endswith('/'))
|
|
|
|
|
|
|
|
|
|
# for source in sessions['modified']:
|
|
|
|
|
# upload(source)
|
|
|
|
|
|
|
|
|
|
# sessions['deleted'] = set()
|
|
|
|
|
# sessions['modified'] = set()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# watchdog_event = PatternMatchingEventHandler(
|
|
|
|
|
# patterns = ['*'],
|
|
|
|
|
# ignore_patterns = None,
|
|
|
|
|
# ignore_directories = False,
|
|
|
|
|
# case_sensitive = True
|
|
|
|
|
# )
|
|
|
|
|
# watchdog_event.on_modified = on_modified_callback
|
|
|
|
|
# watchdog_event.on_deleted = on_deleted_callback
|
|
|
|
|
|
|
|
|
|
# watchdog = Observer()
|
|
|
|
|
# watchdog.schedule(watchdog_event, path = './', recursive = True)
|
|
|
|
|
# watchdog.start()
|
|
|
|
|
|
|
|
|
|
# try:
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
message = input('>>> ').strip()
|
|
|
|
|
|
|
|
|
|
match message.split(' '):
|
|
|
|
|
case ['0' | 'exit']:
|
|
|
|
|
sys.exit()
|
|
|
|
|
case ['reboot' | 'reset']:
|
|
|
|
|
picowatch.terminal('help()')
|
|
|
|
|
case ['ls' | 'stat', *source]:
|
|
|
|
|
picowatch.listing(source[0] if source else '/')
|
|
|
|
|
case ['cat' | 'open' | 'contents', source]:
|
|
|
|
|
picowatch.contents(source)
|
|
|
|
|
case ['del' | 'rm' | 'delete' | 'remove', source]:
|
|
|
|
|
picowatch.delete(source)
|
|
|
|
|
case ['format']:
|
|
|
|
|
picowatch.delete('/')
|
|
|
|
|
case ['upl' | 'upload' | 'update', source]:
|
|
|
|
|
picowatch.upload(source)
|
|
|
|
|
case ['restore']:
|
|
|
|
|
picowatch.upload('/')
|
|
|
|
|
case ['download' | 'transfer', source]:
|
|
|
|
|
picowatch.download(source)
|
|
|
|
|
case ['backup']:
|
|
|
|
|
picowatch.download('/')
|
|
|
|
|
# case ['' | 'save' | 'commit']:
|
|
|
|
|
# watchdog_callback()
|
|
|
|
|
# case ['status' | 'staged']:
|
|
|
|
|
# for filename in sessions['deleted']:
|
|
|
|
|
# print('-', filename)
|
|
|
|
|
# for filename in sessions['modified']:
|
|
|
|
|
# print('+', filename)
|
|
|
|
|
# case ['cancel' | 'unstaged']:
|
|
|
|
|
# sessions['deleted'] = set()
|
|
|
|
|
# sessions['modified'] = set()
|
|
|
|
|
case ['watch' | 'test', filename]:
|
|
|
|
|
picowatch.watch(filename)
|
|
|
|
|
case _:
|
|
|
|
|
if message.startswith('./'):
|
|
|
|
|
picowatch.launch(message[2:])
|
|
|
|
|
elif message:
|
|
|
|
|
print(f'"{message}" is not recognized.')
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(str(e))
|
|
|
|
|
# except KeyboardInterrupt:
|
|
|
|
|
# watchdog.stop()
|
|
|
|
|
|
|
|
|
|
# watchdog.join()
|