From b7023d57e6f1f34cd0aceaf5faa4a37595bc137e Mon Sep 17 00:00:00 2001 From: Gino D Date: Sat, 31 Dec 2022 01:56:48 +0100 Subject: [PATCH] Refactoring in progress... --- picowatch_d/picowatch.py | 311 ++++++++++++++++++++++++++++----------- 1 file changed, 226 insertions(+), 85 deletions(-) diff --git a/picowatch_d/picowatch.py b/picowatch_d/picowatch.py index 9303c10..450a4ca 100644 --- a/picowatch_d/picowatch.py +++ b/picowatch_d/picowatch.py @@ -1,3 +1,27 @@ +#!/usr/bin/env python + +# Primadiag SAS - Paris +# Author: Gino D. +# Copyright (c) 2023 Primadiag +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE + import os import sys import time @@ -6,12 +30,78 @@ import serial import binascii import textwrap +from watchdog.observers import Observer +from watchdog.events import PatternMatchingEventHandler from typing import Dict, List, Optional, Tuple -class Pyboard: +BUFFER_SIZE: int = 256 - PYBOARD_BUFFER_SIZE: int = 256 + +# class TelnetToSerial: +# def __init__(self, ip, user, password, read_timeout=None): +# import telnetlib +# self.tn = telnetlib.Telnet(ip, timeout=15) +# self.read_timeout = read_timeout +# if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout): +# self.tn.write(bytes(user, 'ascii') + b"\r\n") + +# if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout): +# # needed because of internal implementation details of the telnet server +# time.sleep(0.2) +# self.tn.write(bytes(password, 'ascii') + b"\r\n") + +# if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout): +# # login succesful +# from collections import deque +# self.fifo = deque() +# return + +# raise Exception('Failed to establish a telnet connection with the board') + +# def __del__(self): +# self.close() + +# def close(self): +# try: +# self.tn.close() +# except: +# # the telnet object might not exist yet, so ignore this one +# pass + +# def read(self, size=1): +# while len(self.fifo) < size: +# timeout_count = 0 +# data = self.tn.read_eager() +# if len(data): +# self.fifo.extend(data) +# timeout_count = 0 +# else: +# time.sleep(0.25) +# if self.read_timeout is not None and timeout_count > 4 * self.read_timeout: +# break +# timeout_count += 1 + +# data = b'' +# while len(data) < size and len(self.fifo) > 0: +# data += bytes([self.fifo.popleft()]) +# return data + +# def write(self, data): +# self.tn.write(data) +# return len(data) + +# def in_waiting(self): +# n_waiting = len(self.fifo) +# if not n_waiting: +# data = self.tn.read_eager() +# self.fifo.extend(data) +# return len(data) +# else: +# return n_waiting + + +class Pyboard(object): serial: serial.Serial def __init__(self, device: str, baudrate: int = 115200): @@ -117,8 +207,8 @@ class Pyboard: if not self.read_until(b'>'): raise Exception('Terminal: prompt has been lost') - for i in range(0, len(command), Pyboard.PYBOARD_BUFFER_SIZE): - self.serial.write(command[i: min(i + Pyboard.PYBOARD_BUFFER_SIZE, len(command))]) + for i in range(0, len(command), BUFFER_SIZE): + self.serial.write(command[i: min(i + BUFFER_SIZE, len(command))]) time.sleep(0.001) if not self.read_until(self.send_ok()): @@ -148,24 +238,20 @@ class Pyboard: for call in traceback[1][:-2].split('\\r\\n'): reason += f'{call}\n' - raise Exception(reason.strip()) + raise Exception(f'-------\n{reason.strip()}') else: - raise Exception(exception) + raise Exception(f'-------\n{exception}') return data.strip() -class Picowatch(object): +class FileSystem(object): _pyboard: Pyboard - @property - def pyboard(self) -> Optional[Pyboard]: - return self._pyboard - def __init__(self, pyboard: Pyboard): self._pyboard = pyboard - def checksum(self, source: str, data: str = '') -> bool: + def checksum(self, source: str, data: str) -> str: output = self.terminal(f""" def checksum(data): v = 21 @@ -184,7 +270,7 @@ class Picowatch(object): for c in data: v ^= ord(c) - return str(v) == output + return f'{v}:{output}' def get(self, filename: str) -> Tuple[bytes, bool]: if not filename.startswith('/'): @@ -195,7 +281,7 @@ class Picowatch(object): import ubinascii with open('{filename}', 'rb') as infile: while True: - result = infile.read({Pyboard.PYBOARD_BUFFER_SIZE}) + result = infile.read({BUFFER_SIZE}) if result == b'': break sys.stdout.write(ubinascii.hexlify(result)) @@ -204,13 +290,13 @@ class Picowatch(object): return (output, self.checksum(filename, output)) - def save(self, source: str, destination: str) -> Tuple[str, bool]: + def download(self, source: str, destination: str) -> str: output, checksum = self.get(source) with open(destination, 'wb') as fh: fh.write(output) - return (destination, checksum) + return checksum def put(self, filename: str, data: bytes) -> Tuple[str, bool]: if not filename.startswith('/'): @@ -227,8 +313,8 @@ class Picowatch(object): size = len(data) terminal(f"""fh = open("{filename}", "wb")""") - for i in range(0, size, Pyboard.PYBOARD_BUFFER_SIZE): - chunk_size = min(Pyboard.PYBOARD_BUFFER_SIZE, size - i) + for i in range(0, size, BUFFER_SIZE): + chunk_size = min(BUFFER_SIZE, size - i) chunk = repr(data[i : i + chunk_size]) terminal(f"""fh.write({chunk})""") @@ -238,17 +324,16 @@ class Picowatch(object): return (filename, self.checksum(filename, data)) - def copy(self, source: str, destination: str) -> Tuple[str, int, bool]: + def upload(self, source: str, destination: str) -> str: with open(source, 'rb') as fh: - return self.put(destination, fh.read()) + _, checksum = self.put(destination, fh.read()) - def ls(self, dirname: str = '/') -> List[Dict[str, int]]: + return checksum + + def ls(self, dirname: str = '/') -> Tuple[int, List, str]: if not dirname.startswith('/'): dirname = '/' + dirname - if not dirname.endswith('/'): - dirname += '/' - output = self.terminal(f""" try: import os @@ -258,26 +343,36 @@ class Picowatch(object): import ujson as json def ls(dirname): - d = [] - f = [] - if not dirname.endswith("/"): - dirname += "/" - for t in os.ilistdir(dirname): - if t[1] == 0x4000: - d.append(dirname + t[0] + '/') - zd, zf = ls(dirname + t[0] + '/') - d.extend(zd) - f.extend(zf) - else: - f.append((dirname + t[0], os.stat(dirname + t[0])[6])) - return d, f + e = [] + s = os.stat(dirname) + if s[0] == 0x4000: + if not dirname.endswith("/"): + dirname += "/" + for t in os.ilistdir(dirname): + if t[1] == 0x4000: + e.append((dirname + t[0] + '/', -1)) + e.extend(ls(dirname + t[0] + '/')) + else: + e.append((dirname + t[0], os.stat(dirname + t[0])[6])) + else: + e.append((dirname, s[6])) + return e + + try: + s = 1 + r = ls("{dirname}") + x = '' + except Exception as e: + s = 0 + r = [("{dirname}", -2)] + x = str(e) - print(json.dumps(ls("{dirname}"))) + print(json.dumps([s, r, x])) """) return json.loads(output) - def rm(self, filename: str) -> List: + def rm(self, filename: str) -> Tuple[int, List, str]: if not filename.startswith('/'): filename = '/' + filename @@ -290,53 +385,54 @@ class Picowatch(object): import ujson as json def ls(dirname): - d = [] - f = [] + e = [] if not dirname.endswith("/"): dirname += "/" for t in os.ilistdir(dirname): if t[1] == 0x4000: - d.append(dirname + t[0] + '/') - zd, zf = ls(dirname + t[0] + '/') - d.extend(zd) - f.extend(zf) + e.append((dirname + t[0] + '/', -1)) + e.extend(ls(dirname + t[0] + '/')) else: - f.append(dirname + t[0]) - return d, f + e.append((dirname + t[0], os.stat(dirname + t[0])[6])) + return e def rm(filename): r = [] if os.stat(filename)[0] == 0x4000: - d, f = ls(filename) - for zf in f: - try: - os.remove(zf) - r.append((zf, 1)) - except Exception as e: - r.append((zf, 0, str(e))) - for zd in d: - try: - os.rmdir(zd) - r.append((zd, 1)) - except Exception as e: - r.append((zd, 0, str(e))) - try: - os.rmdir(filename) - r.append((filename, 1)) - except Exception as e: - r.append((filename, 0, str(e))) + e = ls(filename) + e.append((filename, -1)) + for f, s in e: + if not s == -1: + try: + os.remove(f) + r.append((f, 1, '')) + except Exception as e: + r.append((f, 0, str(e))) + for f, s in e: + if s == -1: + try: + os.rmdir(f) + r.append((f, 1, '')) + except Exception as e: + r.append((f, 0, str(e))) else: try: os.remove(filename) - r.append((filename, 1)) + r.append((filename, 1, '')) except Exception as e: r.append((filename, 0, str(e))) return r try: - print(json.dumps(rm("{filename}"))) + s = 1 + r = rm("{filename}") + x = '' except Exception as e: - print(json.dumps([("{filename}", 0, str(e))])) + s = 0 + r = [("{filename}", 0, str(e))] + x = str(e) + + print(json.dumps([s, r, x])) """) return json.loads(output) @@ -387,14 +483,6 @@ class Picowatch(object): except Exception as e: print(str(e)) - def devmode(self, filename: str): - with self._pyboard as terminal: - try: - with open(filename, 'r') as fh: - terminal(fh.read(), stream_output=True) - except Exception as e: - print(str(e)) - def terminal(self, command: str, stream_output: bool = False) -> str: with self._pyboard as terminal: try: @@ -402,14 +490,67 @@ class Picowatch(object): except Exception as e: raise e + +PRINT_PREFIX_EXCEPTION = '\t =>' + + +class Picowatch(object): + _fs: FileSystem + + def __init__(self, pyboard: Pyboard): + self._fs = FileSystem(pyboard) + + def listing(self, remote: str = '/'): + status, output, exception = self._fs.ls(remote) + + if status: + for name, size in output: + if size == -1: + print('[/]', name[1:]) + else: + print('[·]', name[1:], f'({size}b)') + else: + print('[?]', remote, PRINT_PREFIX_EXCEPTION, exception) + + def upload(self, local: str, remote: str = ''): + pass + + def download(self, filepath: str): + status, output, exception = self._fs.ls(filepath) + + if status: + for remote, size in output: + if size == -1: + os.makedirs(remote, 777, exist_ok=True) + + for remote, size in output: + if not size == -1: + try: + print('[↓]', remote, PRINT_PREFIX_EXCEPTION, self._fs.download(remote, f'.{remote}')) + except Exception as e: + print('[?]', remote, PRINT_PREFIX_EXCEPTION, str(e)) + else: + print('[?]', filepath, PRINT_PREFIX_EXCEPTION, exception) + + def delete(self, filepath: str): + status, output, exception = self._fs.rm(filepath) + + if status: + for remote, checked, message in output: + if checked: + print('[-]', remote) + else: + print('[?]', remote, PRINT_PREFIX_EXCEPTION, message) + else: + print('[?]', filepath, PRINT_PREFIX_EXCEPTION, exception) + + def execute(self, filepath: str): + pass + + def watch(self, local: str): + pass + + # picowatch = Picowatch(Pyboard('COM5')) -# with open('./setenv.py', 'rb') as fh: -# print(picowatch.put('setenv.py', fh.read())) - -# print(picowatch.get('setenv.py')) - -# print(picowatch.save('main.py', 'cpmain.py')) -# print(picowatch.copy('cpmain.py', 'main.py')) - -# picowatch.devmode('main.py') \ No newline at end of file +# picowatch.listing('/lib/') \ No newline at end of file