Refactoring in progress...

This commit is contained in:
Gino D
2022-12-31 01:56:48 +01:00
parent a621509fc5
commit b7023d57e6

View File

@@ -1,3 +1,27 @@
#!/usr/bin/env python
# Primadiag SAS - Paris
# Author: Gino D.
# Copyright (c) 2023 Primadiag
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE
import os import os
import sys import sys
import time import time
@@ -6,12 +30,78 @@ import serial
import binascii import binascii
import textwrap import textwrap
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
class Pyboard: BUFFER_SIZE: int = 256
PYBOARD_BUFFER_SIZE: int = 256
# class TelnetToSerial:
# def __init__(self, ip, user, password, read_timeout=None):
# import telnetlib
# self.tn = telnetlib.Telnet(ip, timeout=15)
# self.read_timeout = read_timeout
# if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout):
# self.tn.write(bytes(user, 'ascii') + b"\r\n")
# if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout):
# # needed because of internal implementation details of the telnet server
# time.sleep(0.2)
# self.tn.write(bytes(password, 'ascii') + b"\r\n")
# if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout):
# # login succesful
# from collections import deque
# self.fifo = deque()
# return
# raise Exception('Failed to establish a telnet connection with the board')
# def __del__(self):
# self.close()
# def close(self):
# try:
# self.tn.close()
# except:
# # the telnet object might not exist yet, so ignore this one
# pass
# def read(self, size=1):
# while len(self.fifo) < size:
# timeout_count = 0
# data = self.tn.read_eager()
# if len(data):
# self.fifo.extend(data)
# timeout_count = 0
# else:
# time.sleep(0.25)
# if self.read_timeout is not None and timeout_count > 4 * self.read_timeout:
# break
# timeout_count += 1
# data = b''
# while len(data) < size and len(self.fifo) > 0:
# data += bytes([self.fifo.popleft()])
# return data
# def write(self, data):
# self.tn.write(data)
# return len(data)
# def in_waiting(self):
# n_waiting = len(self.fifo)
# if not n_waiting:
# data = self.tn.read_eager()
# self.fifo.extend(data)
# return len(data)
# else:
# return n_waiting
class Pyboard(object):
serial: serial.Serial serial: serial.Serial
def __init__(self, device: str, baudrate: int = 115200): def __init__(self, device: str, baudrate: int = 115200):
@@ -117,8 +207,8 @@ class Pyboard:
if not self.read_until(b'>'): if not self.read_until(b'>'):
raise Exception('Terminal: prompt has been lost') raise Exception('Terminal: prompt has been lost')
for i in range(0, len(command), Pyboard.PYBOARD_BUFFER_SIZE): for i in range(0, len(command), BUFFER_SIZE):
self.serial.write(command[i: min(i + Pyboard.PYBOARD_BUFFER_SIZE, len(command))]) self.serial.write(command[i: min(i + BUFFER_SIZE, len(command))])
time.sleep(0.001) time.sleep(0.001)
if not self.read_until(self.send_ok()): if not self.read_until(self.send_ok()):
@@ -148,24 +238,20 @@ class Pyboard:
for call in traceback[1][:-2].split('\\r\\n'): for call in traceback[1][:-2].split('\\r\\n'):
reason += f'{call}\n' reason += f'{call}\n'
raise Exception(reason.strip()) raise Exception(f'-------\n{reason.strip()}')
else: else:
raise Exception(exception) raise Exception(f'-------\n{exception}')
return data.strip() return data.strip()
class Picowatch(object): class FileSystem(object):
_pyboard: Pyboard _pyboard: Pyboard
@property
def pyboard(self) -> Optional[Pyboard]:
return self._pyboard
def __init__(self, pyboard: Pyboard): def __init__(self, pyboard: Pyboard):
self._pyboard = pyboard self._pyboard = pyboard
def checksum(self, source: str, data: str = '') -> bool: def checksum(self, source: str, data: str) -> str:
output = self.terminal(f""" output = self.terminal(f"""
def checksum(data): def checksum(data):
v = 21 v = 21
@@ -184,7 +270,7 @@ class Picowatch(object):
for c in data: for c in data:
v ^= ord(c) v ^= ord(c)
return str(v) == output return f'{v}:{output}'
def get(self, filename: str) -> Tuple[bytes, bool]: def get(self, filename: str) -> Tuple[bytes, bool]:
if not filename.startswith('/'): if not filename.startswith('/'):
@@ -195,7 +281,7 @@ class Picowatch(object):
import ubinascii import ubinascii
with open('{filename}', 'rb') as infile: with open('{filename}', 'rb') as infile:
while True: while True:
result = infile.read({Pyboard.PYBOARD_BUFFER_SIZE}) result = infile.read({BUFFER_SIZE})
if result == b'': if result == b'':
break break
sys.stdout.write(ubinascii.hexlify(result)) sys.stdout.write(ubinascii.hexlify(result))
@@ -204,13 +290,13 @@ class Picowatch(object):
return (output, self.checksum(filename, output)) return (output, self.checksum(filename, output))
def save(self, source: str, destination: str) -> Tuple[str, bool]: def download(self, source: str, destination: str) -> str:
output, checksum = self.get(source) output, checksum = self.get(source)
with open(destination, 'wb') as fh: with open(destination, 'wb') as fh:
fh.write(output) fh.write(output)
return (destination, checksum) return checksum
def put(self, filename: str, data: bytes) -> Tuple[str, bool]: def put(self, filename: str, data: bytes) -> Tuple[str, bool]:
if not filename.startswith('/'): if not filename.startswith('/'):
@@ -227,8 +313,8 @@ class Picowatch(object):
size = len(data) size = len(data)
terminal(f"""fh = open("{filename}", "wb")""") terminal(f"""fh = open("{filename}", "wb")""")
for i in range(0, size, Pyboard.PYBOARD_BUFFER_SIZE): for i in range(0, size, BUFFER_SIZE):
chunk_size = min(Pyboard.PYBOARD_BUFFER_SIZE, size - i) chunk_size = min(BUFFER_SIZE, size - i)
chunk = repr(data[i : i + chunk_size]) chunk = repr(data[i : i + chunk_size])
terminal(f"""fh.write({chunk})""") terminal(f"""fh.write({chunk})""")
@@ -238,17 +324,16 @@ class Picowatch(object):
return (filename, self.checksum(filename, data)) return (filename, self.checksum(filename, data))
def copy(self, source: str, destination: str) -> Tuple[str, int, bool]: def upload(self, source: str, destination: str) -> str:
with open(source, 'rb') as fh: with open(source, 'rb') as fh:
return self.put(destination, fh.read()) _, checksum = self.put(destination, fh.read())
def ls(self, dirname: str = '/') -> List[Dict[str, int]]: return checksum
def ls(self, dirname: str = '/') -> Tuple[int, List, str]:
if not dirname.startswith('/'): if not dirname.startswith('/'):
dirname = '/' + dirname dirname = '/' + dirname
if not dirname.endswith('/'):
dirname += '/'
output = self.terminal(f""" output = self.terminal(f"""
try: try:
import os import os
@@ -258,26 +343,36 @@ class Picowatch(object):
import ujson as json import ujson as json
def ls(dirname): def ls(dirname):
d = [] e = []
f = [] s = os.stat(dirname)
if not dirname.endswith("/"): if s[0] == 0x4000:
dirname += "/" if not dirname.endswith("/"):
for t in os.ilistdir(dirname): dirname += "/"
if t[1] == 0x4000: for t in os.ilistdir(dirname):
d.append(dirname + t[0] + '/') if t[1] == 0x4000:
zd, zf = ls(dirname + t[0] + '/') e.append((dirname + t[0] + '/', -1))
d.extend(zd) e.extend(ls(dirname + t[0] + '/'))
f.extend(zf) else:
else: e.append((dirname + t[0], os.stat(dirname + t[0])[6]))
f.append((dirname + t[0], os.stat(dirname + t[0])[6])) else:
return d, f e.append((dirname, s[6]))
return e
print(json.dumps(ls("{dirname}"))) try:
s = 1
r = ls("{dirname}")
x = ''
except Exception as e:
s = 0
r = [("{dirname}", -2)]
x = str(e)
print(json.dumps([s, r, x]))
""") """)
return json.loads(output) return json.loads(output)
def rm(self, filename: str) -> List: def rm(self, filename: str) -> Tuple[int, List, str]:
if not filename.startswith('/'): if not filename.startswith('/'):
filename = '/' + filename filename = '/' + filename
@@ -290,53 +385,54 @@ class Picowatch(object):
import ujson as json import ujson as json
def ls(dirname): def ls(dirname):
d = [] e = []
f = []
if not dirname.endswith("/"): if not dirname.endswith("/"):
dirname += "/" dirname += "/"
for t in os.ilistdir(dirname): for t in os.ilistdir(dirname):
if t[1] == 0x4000: if t[1] == 0x4000:
d.append(dirname + t[0] + '/') e.append((dirname + t[0] + '/', -1))
zd, zf = ls(dirname + t[0] + '/') e.extend(ls(dirname + t[0] + '/'))
d.extend(zd)
f.extend(zf)
else: else:
f.append(dirname + t[0]) e.append((dirname + t[0], os.stat(dirname + t[0])[6]))
return d, f return e
def rm(filename): def rm(filename):
r = [] r = []
if os.stat(filename)[0] == 0x4000: if os.stat(filename)[0] == 0x4000:
d, f = ls(filename) e = ls(filename)
for zf in f: e.append((filename, -1))
try: for f, s in e:
os.remove(zf) if not s == -1:
r.append((zf, 1)) try:
except Exception as e: os.remove(f)
r.append((zf, 0, str(e))) r.append((f, 1, ''))
for zd in d: except Exception as e:
try: r.append((f, 0, str(e)))
os.rmdir(zd) for f, s in e:
r.append((zd, 1)) if s == -1:
except Exception as e: try:
r.append((zd, 0, str(e))) os.rmdir(f)
try: r.append((f, 1, ''))
os.rmdir(filename) except Exception as e:
r.append((filename, 1)) r.append((f, 0, str(e)))
except Exception as e:
r.append((filename, 0, str(e)))
else: else:
try: try:
os.remove(filename) os.remove(filename)
r.append((filename, 1)) r.append((filename, 1, ''))
except Exception as e: except Exception as e:
r.append((filename, 0, str(e))) r.append((filename, 0, str(e)))
return r return r
try: try:
print(json.dumps(rm("{filename}"))) s = 1
r = rm("{filename}")
x = ''
except Exception as e: except Exception as e:
print(json.dumps([("{filename}", 0, str(e))])) s = 0
r = [("{filename}", 0, str(e))]
x = str(e)
print(json.dumps([s, r, x]))
""") """)
return json.loads(output) return json.loads(output)
@@ -387,14 +483,6 @@ class Picowatch(object):
except Exception as e: except Exception as e:
print(str(e)) print(str(e))
def devmode(self, filename: str):
with self._pyboard as terminal:
try:
with open(filename, 'r') as fh:
terminal(fh.read(), stream_output=True)
except Exception as e:
print(str(e))
def terminal(self, command: str, stream_output: bool = False) -> str: def terminal(self, command: str, stream_output: bool = False) -> str:
with self._pyboard as terminal: with self._pyboard as terminal:
try: try:
@@ -402,14 +490,67 @@ class Picowatch(object):
except Exception as e: except Exception as e:
raise e raise e
PRINT_PREFIX_EXCEPTION = '\t =>'
class Picowatch(object):
_fs: FileSystem
def __init__(self, pyboard: Pyboard):
self._fs = FileSystem(pyboard)
def listing(self, remote: str = '/'):
status, output, exception = self._fs.ls(remote)
if status:
for name, size in output:
if size == -1:
print('[/]', name[1:])
else:
print('[·]', name[1:], f'({size}b)')
else:
print('[?]', remote, PRINT_PREFIX_EXCEPTION, exception)
def upload(self, local: str, remote: str = ''):
pass
def download(self, filepath: str):
status, output, exception = self._fs.ls(filepath)
if status:
for remote, size in output:
if size == -1:
os.makedirs(remote, 777, exist_ok=True)
for remote, size in output:
if not size == -1:
try:
print('[↓]', remote, PRINT_PREFIX_EXCEPTION, self._fs.download(remote, f'.{remote}'))
except Exception as e:
print('[?]', remote, PRINT_PREFIX_EXCEPTION, str(e))
else:
print('[?]', filepath, PRINT_PREFIX_EXCEPTION, exception)
def delete(self, filepath: str):
status, output, exception = self._fs.rm(filepath)
if status:
for remote, checked, message in output:
if checked:
print('[-]', remote)
else:
print('[?]', remote, PRINT_PREFIX_EXCEPTION, message)
else:
print('[?]', filepath, PRINT_PREFIX_EXCEPTION, exception)
def execute(self, filepath: str):
pass
def watch(self, local: str):
pass
# picowatch = Picowatch(Pyboard('COM5')) # picowatch = Picowatch(Pyboard('COM5'))
# with open('./setenv.py', 'rb') as fh: # picowatch.listing('/lib/')
# print(picowatch.put('setenv.py', fh.read()))
# print(picowatch.get('setenv.py'))
# print(picowatch.save('main.py', 'cpmain.py'))
# print(picowatch.copy('cpmain.py', 'main.py'))
# picowatch.devmode('main.py')