Files
primadiag-pybash/picowatch_d/picowatch.py
2023-01-04 00:54:46 +01:00

765 lines
23 KiB
Python

#!/usr/bin/env python
# Primadiag SAS - Paris
# Author: Gino D.
# Copyright (c) 2023 Primadiag
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE
import os
import sys
import time
import json
import signal
import tempfile
import binascii
import textwrap
import subprocess
from serial import Serial
from typing import Dict, List, Optional, Tuple, Union
BUFFER_SIZE: int = 126
CURRENT_DIRECTORY: str = os.getcwd().replace('\\', '/')
class Tab():
colsize: Tuple = ()
def __init__(self, *length: int):
self.colsize = length
def blank(self, text: str = '-'):
print(text * sum(self.colsize))
def print(self, *texts: str):
def coltext(text: str, length: int = 20) -> str:
if len(text) >= length:
text = text[:(length - 4) if length > 5 else length] + '...'
return text + ' ' * (length - len(text))
line = ''
for i, text in enumerate(texts[:len(self.colsize)]):
line += coltext(text, self.colsize[i])
for text in texts[len(self.colsize):]:
line += coltext(text)
print(line)
def labels(self, *texts: str, blank_text: str = '-'):
self.print(*texts)
self.blank(blank_text)
class Telnet:
def __init__(self, IP: str, login: str, password: str):
import telnetlib
from collections import deque
self.tn = telnetlib.Telnet(IP, timeout=15)
self.fifo = deque()
if b'Login as:' in self.tn.read_until(b'Login as:'):
self.tn.write(bytes(login, 'ascii') + b'\r\n')
if b'Password:' in self.tn.read_until(b'Password:'):
time.sleep(0.2)
self.tn.write(bytes(password, 'ascii') + b'\r\n')
if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.'):
return
raise Exception('Failed to establish a Telnet connection with the board')
def __del__(self):
self.close()
def close(self):
try:
self.tn.close()
except:
pass
def read(self, size: int = 1) -> bytes:
timeout = 0
while len(self.fifo) < size and not timeout >= 8:
timeout = 0
data = self.tn.read_eager()
if len(data):
self.fifo.extend(data)
timeout = 0
else:
timeout += 1
time.sleep(0.25)
data = b''
while len(data) < size and len(self.fifo) > 0:
data += bytes([self.fifo.popleft()])
return data
def write(self, data: bytes):
self.tn.write(data)
return len(data)
def inWaiting(self) -> int:
n_waiting = len(self.fifo)
if not n_waiting:
data = self.tn.read_eager()
self.fifo.extend(data)
n_waiting = len(data)
return n_waiting
class Pyboard(object):
i: int = 0
serial: Union[Serial, Telnet]
def __init__(self, device: str, baudrate: int = 115200, login: str = '', password: str = ''):
is_telnet = device and device.count('.') == 3
for _ in range(0, 3):
try:
if is_telnet:
self.serial = Telnet(device, login, password)
else:
self.serial = Serial(device, baudrate=baudrate, interCharTimeout=1)
break
except:
time.sleep(1)
else:
raise Exception(f'Failed to access {device}')
def close(self):
self.serial.close()
def transfer_status(self) -> int:
arrows = ['', '', '', '']
sys.stdout.write(f'[∘] Transfering... {arrows[self.i]}\r')
sys.stdout.flush()
self.i = (self.i + 1) % 4
def stdout_write_bytes(self, data: str):
sys.stdout.buffer.write(data.replace(b'\x04', b''))
sys.stdout.buffer.flush()
def send_ok(self) -> bytes:
self.serial.write(b'\x04')
return b'OK'
def send_ctrl_a(self) -> bytes:
# ctrl-A: enter raw REPL
self.serial.write(b'\x01')
return b'raw REPL; CTRL-B to exit\r\n>'
def send_ctrl_b(self):
# ctrl-B: exit raw REPL
self.serial.write(b'\x02')
def send_ctrl_c(self) -> bytes:
# ctrl-C twice: interrupt any running program
for _ in range(0, 2):
self.serial.write(b'\x03')
return b'raw REPL; CTRL-B to exit\r\n'
def send_ctrl_d(self) -> bytes:
# ctrl-D: soft reset
self.serial.write(b'\x04')
return b'soft reboot\r\n'
def read_until(self, delimiter: bytes, stream_output: bool = False, show_status: bool = False) -> Optional[bytes]:
data = self.serial.read(1)
if stream_output:
self.stdout_write_bytes(data)
timeout = 0
max_len = len(delimiter)
while not timeout >= 1000:
if data.endswith(delimiter):
return data
elif self.serial.inWaiting() > 0:
timeout = 0
stream_data = self.serial.read(1)
data += stream_data
if stream_output:
self.stdout_write_bytes(stream_data)
data = data[-max_len:]
elif show_status:
self.transfer_status()
else:
timeout += 1
time.sleep(0.0001)
def __enter__(self):
self.send_ctrl_c()
n = self.serial.inWaiting()
while n > 0:
self.serial.read(n)
n = self.serial.inWaiting()
for _ in range(0, 5):
if self.read_until(self.send_ctrl_a()):
break
time.sleep(0.01)
else:
raise Exception('REPL: could not enter')
if not self.read_until(self.send_ctrl_d()):
raise Exception('REPL: could not soft reboot')
if not self.read_until(self.send_ctrl_c()):
raise Exception('REPL: could not interrupt after soft reboot')
return self.__terminal
def __exit__(self, a, b, c):
self.send_ctrl_b()
def __terminal(self, command: str, stream_output: bool = False) -> Optional[str]:
command = textwrap.dedent(command)
# send input
if not isinstance(command, bytes):
command = bytes(command, encoding='utf8')
if not self.read_until(b'>'):
raise Exception('Terminal: prompt has been lost')
for i in range(0, len(command), BUFFER_SIZE):
if not stream_output:
self.transfer_status()
self.serial.write(command[i: min(i + BUFFER_SIZE, len(command))])
time.sleep(0.0001)
if not self.read_until(self.send_ok()):
raise Exception('Terminal: could not execute command')
# catch output
data = self.read_until(b'\x04', stream_output=stream_output, show_status=not stream_output)
if not data:
raise Exception('Terminal: timeout waiting for first EOF reception')
exception = self.read_until(b'\x04')
if not exception:
raise Exception('Terminal: timeout waiting for second EOF reception')
data, exception = (data[:-1].decode('utf-8'), exception[:-1].decode('utf-8'))
if exception:
reason = 'Traceback (most recent call last):'
traceback = exception.split(reason)
if len(traceback) == 2:
for call in traceback[1][:-2].split('\\r\\n'):
reason += f'{call}\n'
raise Exception(f'{"-"* 45}\n{reason.strip()}')
else:
raise Exception(f'{"-" * 45}\n{exception}')
return data.strip()
class PyboardTerminal(object):
pyboard: Pyboard
def __init__(self, pyboard: Pyboard):
self.pyboard = pyboard
def checksum(self, source: str, data: str) -> str:
output = self.terminal(f"""
def checksum(data):
v = 21
for c in data.decode("utf-8"):
v ^= ord(c)
return v
with open("{source}", "rb") as fh:
print(checksum(fh.read()))
""")
if isinstance(data, bytes):
data = data.decode('utf-8')
v = 21
for c in data:
v ^= ord(c)
if int(v) == int(output):
return 'OK'
return f'{v} != {output}'
def get(self, filename: str) -> Tuple[bytes, bool]:
if not filename.startswith('/'):
filename = '/' + filename
output = self.terminal(f"""
import sys
import ubinascii
with open('{filename}', 'rb') as infile:
while True:
result = infile.read({BUFFER_SIZE})
if result == b'':
break
sys.stdout.write(ubinascii.hexlify(result))
""")
output = binascii.unhexlify(output)
return (output, self.checksum(filename, output))
def download(self, source: str, destination: str) -> str:
output, checksum = self.get(source)
with open(destination, 'wb') as fh:
fh.write(output)
return checksum
def put(self, filename: str, data: bytes) -> Tuple[str, bool]:
if not filename.startswith('/'):
filename = '/' + filename
if not isinstance(data, bytes):
data = bytes(data, encoding='utf8')
try:
if os.path.dirname(filename):
self.mkdir(os.path.dirname(filename))
with self.pyboard as terminal:
size = len(data)
terminal(f"""fh = open("{filename}", "wb")""")
for i in range(0, size, BUFFER_SIZE):
chunk_size = min(BUFFER_SIZE, size - i)
chunk = repr(data[i : i + chunk_size])
terminal(f"""fh.write({chunk})""")
terminal("""fh.close()""")
except Exception as e:
raise e
return (filename, self.checksum(filename, data))
def upload(self, source: str, destination: str) -> str:
with open(source, 'rb') as fh:
_, checksum = self.put(destination, fh.read())
return checksum
def ls(self, dirname: str = '/') -> Tuple[int, List, str]:
if not dirname.startswith('/'):
dirname = '/' + dirname
output = self.terminal(f"""
try:
import os
import json
except ImportError:
import uos as os
import ujson as json
def ls(dirname):
e = []
s = os.stat(dirname)
if s[0] == 0x4000:
if not dirname.endswith("/"):
dirname += "/"
for t in os.ilistdir(dirname):
if t[1] == 0x4000:
e.append((dirname + t[0] + '/', -1))
e.extend(ls(dirname + t[0] + '/'))
else:
e.append((dirname + t[0], os.stat(dirname + t[0])[6]))
else:
e.append((dirname, s[6]))
return e
try:
s = 1
r = ls("{dirname}")
x = ''
except Exception as e:
s = 0
r = [("{dirname}", -2)]
x = str(e)
print(json.dumps([s, r, x]))
""")
return json.loads(output)
def rm(self, filename: str) -> Tuple[int, List, str]:
if not filename.startswith('/'):
filename = '/' + filename
output = self.terminal(f"""
try:
import os
import json
except ImportError:
import uos as os
import ujson as json
def ls(dirname):
e = []
if not dirname.endswith("/"):
dirname += "/"
for t in os.ilistdir(dirname):
if t[1] == 0x4000:
e.append((dirname + t[0] + '/', -1))
e.extend(ls(dirname + t[0] + '/'))
else:
e.append((dirname + t[0], os.stat(dirname + t[0])[6]))
return e
def rm(filename):
r = []
if os.stat(filename)[0] == 0x4000:
e = ls(filename)
if filename != '/':
e.append((filename, -1))
for f, s in e:
if not s == -1:
try:
os.remove(f)
r.append((f, 1, ''))
except Exception as e:
r.append((f, 0, str(e)))
for f, s in e:
if s == -1:
try:
os.rmdir(f)
r.append((f, 1, ''))
except Exception as e:
r.append((f, 0, str(e)))
else:
try:
os.remove(filename)
r.append((filename, 1, ''))
except Exception as e:
r.append((filename, 0, str(e)))
return r
try:
s = 1
r = rm("{filename}")
x = ''
except Exception as e:
s = 0
r = [("{filename}", 0, str(e))]
x = str(e)
print(json.dumps([s, r, x]))
""")
return json.loads(output)
def mkdir(self, dirname: str) -> List:
if not dirname.startswith('/'):
dirname = '/' + dirname
if dirname.endswith('/'):
dirname = dirname[:-1]
output = self.terminal(f"""
try:
import os
import json
except ImportError:
import uos as os
import ujson as json
r = []
d = []
for zd in str("{dirname}").split("/"):
if not zd:
continue
d.append(zd)
zd = "/".join(d)
try:
os.mkdir(zd)
r.append(("/" + zd, 1))
except Exception as e:
if str(e).find('EEXIST'):
r.append(("/" + zd, 1))
else:
r.append(("/" + zd, 0, str(e)))
print(json.dumps(r))
""")
return json.loads(output)
def launch(self, filename: str):
try:
self.terminal(f"""
with open("{filename}", "r") as fh:
exec(fh.read())
""", stream_output=True)
except Exception as e:
print(str(e))
def terminal(self, command: str, stream_output: bool = False) -> str:
with self.pyboard as terminal:
try:
return terminal(command, stream_output=stream_output)
except Exception as e:
raise e
class Picowatch(object):
terminal: PyboardTerminal
def __init__(self, pyboard: Pyboard):
self.terminal = PyboardTerminal(pyboard)
signal.signal(signal.SIGINT, lambda a, b: self.interupt())
def interupt(self):
self.terminal.pyboard.send_ctrl_c()
def terminal(self, command: str):
self.terminal.terminal(command, stream_output=True)
def listing(self, remote: str = '/'):
status, output, exception = self.terminal.ls(remote)
if status:
tab = Tab(4, 30, 15, 30)
tab.labels('[ ]', 'Filename', 'Size (kb)', 'Exception')
for name, size in output:
if size == -1:
tab.print('[*]', name[1:], '-')
else:
tab.print('[*]', name[1:], f'{size}b')
else:
tab.print('[?]', remote, '', str(exception))
def contents(self, remote: str):
try:
content, _ = self.terminal.get(remote)
for ln in content.decode('utf-8').split('\n'):
print(ln)
except Exception as e:
print('[?]', remote, f'\n{str(e)}')
def upload(self, filepath: str):
local = filepath
if local.startswith('/'):
local = '.' + local
if not local.startswith('./'):
local = './' + local
queue = []
if os.path.isdir(local):
for root, _, files in os.walk(local, followlinks=True):
for filename in files:
filename = os.path.join(root, filename).replace('\\', '/')
queue.append((filename, filename[1:]))
elif os.path.exists(local):
queue.append((local, filepath))
tab = Tab(4, 30, 15, 30)
tab.labels('[ ]', 'Filename', 'Checksum', 'Exception')
for filename, remote in queue:
try:
tab.print('[↑]', filename, self.terminal.upload(filename, remote))
except Exception as e:
tab.print('[?]', filename, '-', str(e))
def download(self, filepath: str):
if filepath.startswith('.'):
filepath = filepath[1:]
status, output, exception = self.terminal.ls(filepath)
if status:
tab = Tab(4, 30, 15, 30)
tab.labels('', 'Filename', 'Checksum', 'Exception')
for remote, size in output:
if size == -1:
os.makedirs(f'.{remote}', 777, exist_ok=True)
for remote, size in output:
local = f'.{remote}'
if not size == -1:
try:
tab.print('[↓]', local, self.terminal.download(remote, local))
except Exception as e:
tab.print('[?]', local, '', str(e))
else:
tab.print('[?]', filepath, '', exception)
def delete(self, filepath: str):
status, output, exception = self.terminal.rm(filepath)
if status:
tab = Tab(4, 30, 30)
tab.labels('[ ]', 'Filename', 'Exception')
for remote, checked, message in output:
local = f'.{remote}'
if checked:
tab.print('[-]', local)
else:
tab.print('[?]', local, message)
else:
tab.print('[?]', filepath, exception)
def compare(self, source: str):
content, _ = self.terminal.get(source)
fh, filename = tempfile.mkstemp()
try:
with os.fdopen(fh, 'wb') as tmp:
tmp.write(content)
subprocess.Popen(f'code --diff "{filename}" "./src/{source}"', stdout=subprocess.PIPE, shell=True).communicate()
finally:
input('Press Enter to delete temp file.')
os.remove(filename)
def status(self):
changes = []
try:
output = subprocess.check_output(['git', 'status', '-s'])
for filename in [f.strip() for f in output.decode('utf-8').split('\n')]:
if not filename:
continue
status, filename = filename.split(' ')
if filename.startswith('src/'):
if status in ['A', 'M', '??']:
changes.append((1, filename))
elif status == 'D':
changes.append((0, filename))
except:
pass
tab = Tab(4, 40)
tab.labels('[ ]', 'Filename')
for status, filename in changes:
tab.print('[+]' if status == 1 else '[-]', filename)
def launch(self, filepath: str):
self.terminal.launch(filepath)
def watch(self, filename: str):
if filename.startswith('/'):
filename = '.' + filename
if not filename.startswith('./'):
filename = './' + filename
with open(filename, 'r') as fh:
self.terminal.terminal(fh.read(), stream_output=True)
print('Welcome to Picowatch Terminal')
# picowatch = False
# while not picowatch:
# print('-' * 30)
# device = input('Port: ').strip()
# baudrate = input('Baudrate (115200): ').strip() or 115200
# try:
# picowatch = Picowatch(Pyboard(device=device, baudrate=baudrate))
# print(f'Connected to device: {device} at a baudrate of: {baudrate}')
# print('-' * 30)
# except Exception as e:
# print(str(e))
picowatch = Picowatch(Pyboard('COM5'))
picowatch.interupt()
while True:
try:
unmessage = input('>>> ').strip()
for message in unmessage.split('&'):
match message.strip().split(' '):
case ['exit']:
sys.exit()
case ['reboot']:
picowatch.terminal('help()')
case ['ls', *source]:
picowatch.listing(source[0] if source else '/')
case ['cat', source]:
picowatch.contents(source)
case ['rm', source]:
picowatch.delete(source)
case ['put', source]:
picowatch.upload(source)
case ['get', source]:
picowatch.download(source)
case ['diff', filename]:
picowatch.compare(filename)
case ['status']:
picowatch.status()
case ['push']:
pass
case ['test', filename]:
picowatch.watch(filename)
case ['!']:
picowatch.watch('main.py')
case ['!!']:
picowatch.launch('main.py')
case _:
if message.startswith('./'):
picowatch.launch(message[2:])
elif message:
print(f'"{message}" is not recognized.')
except Exception as e:
print(str(e))