From 4833c58cc43637fa1489e30108a68d33d91a43be Mon Sep 17 00:00:00 2001 From: Gino D Date: Thu, 5 Jan 2023 16:56:52 +0100 Subject: [PATCH] Fixed bug and update version --- .picowatch | 3 - picowatch | 3 + picowatch.py => src/picowatch.py | 248 ++++++++++++++++++------------- 3 files changed, 147 insertions(+), 107 deletions(-) delete mode 100644 .picowatch create mode 100644 picowatch rename picowatch.py => src/picowatch.py (76%) diff --git a/.picowatch b/.picowatch deleted file mode 100644 index 9fb1748..0000000 --- a/.picowatch +++ /dev/null @@ -1,3 +0,0 @@ -DEVICE = 'COM5' -BAUDRATE = 115200 -PROJECT_NAME = 'dist' \ No newline at end of file diff --git a/picowatch b/picowatch new file mode 100644 index 0000000..eb50cb8 --- /dev/null +++ b/picowatch @@ -0,0 +1,3 @@ +#!/bin/bash + +python "$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &> /dev/null && pwd)/src/picowatch.py" $(pwd) \ No newline at end of file diff --git a/picowatch.py b/src/picowatch.py similarity index 76% rename from picowatch.py rename to src/picowatch.py index 930ae19..8d92578 100644 --- a/picowatch.py +++ b/src/picowatch.py @@ -30,6 +30,7 @@ import signal import tempfile import binascii import textwrap +import mpy_cross import subprocess from serial import Serial @@ -49,22 +50,39 @@ class Tab(): def blank(self, text: str = '-'): print(text * sum(self.colsize)) - def print(self, *texts: str): - def coltext(text: str, length: int = 20) -> str: + def print(self, *texts: str, truncate: bool = False): + def coltext(text: str, padding_length: int = 0, max_length: int = 20, truncate: bool = False) -> str: text = str(text) - if len(text) >= length: - text = text[:(length - 4) if length > 5 else length] + '...' + if len(text) >= max_length: + if truncate: + text = text[:(max_length - 4) if max_length > 5 else max_length] + '...' + else: + words = [] + lines = [] - return text + ' ' * (length - len(text)) + for word in text.split(' '): + if sum([len(w) + 1 for w in words]) >= max_length: + if len(lines) == 0: + lines.append(' '.join(words) + '\n') + else: + lines.append((' ' * (padding_length - 1)) + ' '.join(words) + '\n') + + words = [] + + words.append(word) + + lines.append((' ' * (padding_length - 1)) + ' '.join(words)) + text = ' '.join(lines) + + return text + ' ' * (max_length - len(text)) line = '' + padding_length = 0 for i, text in enumerate(texts[:len(self.colsize)]): - line += coltext(text, self.colsize[i]) - - for text in texts[len(self.colsize):]: - line += coltext(text) + line += coltext(text, padding_length, self.colsize[i], truncate) + padding_length += self.colsize[i] print(line) @@ -74,6 +92,12 @@ class Tab(): self.blank(blank_text) +# tab = Tab(4, 11, 60) +# tab.print('[*]', 'Notice how even though the input was a set and a tuple', 'Notice how even though the input was a set and a tuple, the output is a list because sorted() returns a new list by definition') +# tab.print('[*]', 'abcdefghij', 'The returned object can be cast to a new type if it needs to match the input type. Be careful if attempting to cast the resulting list back to a set, as a set by definition is unordered') +# exit() + + class Telnet: def __init__(self, IP: str, login: str, password: str): @@ -307,11 +331,11 @@ class FileSystem(object): output = self.terminal(f""" def checksum(data): v = 21 - for c in data.decode("utf-8"): + for c in data.decode('utf-8'): v ^= ord(c) return v - with open("{source}", "rb") as fh: + with open('{source}', 'rb') as fh: print(checksum(fh.read())) """) @@ -343,6 +367,9 @@ class FileSystem(object): """) output = binascii.unhexlify(output) + if filename.endswith('.mpy'): + return (output, '???') + return (output, self.checksum(filename, output)) def download(self, source: str, destination: str) -> str: @@ -366,7 +393,7 @@ class FileSystem(object): with self.pyboard as terminal: size = len(data) - terminal(f"""fh = open("{filename}", "wb")""") + terminal(f"""fh = open('{filename}', 'wb')""") for i in range(0, size, BUFFER_SIZE): chunk_size = min(BUFFER_SIZE, size - i) @@ -377,6 +404,9 @@ class FileSystem(object): except Exception as e: raise e + if filename.endswith('.mpy'): + return (filename, '???') + return (filename, self.checksum(filename, data)) def upload(self, source: str, destination: str) -> str: @@ -400,27 +430,27 @@ class FileSystem(object): e = [] s = os.stat(dirname) if s[0] == 0x4000: - if not dirname.endswith("/"): - dirname += "/" + if not dirname.endswith('/'): + dirname += '/' + e.append((dirname, -1)) for t in os.ilistdir(dirname): if dirname.startswith('/'): dirname = dirname[1:] if t[1] == 0x4000: - e.append((dirname + t[0] + '/', -1)) e.extend(ls(dirname + t[0] + '/')) else: e.append((dirname + t[0], os.stat(dirname + t[0])[6])) else: e.append((dirname, s[6])) - return e + return sorted(e) try: s = 1 - r = ls("{dirname}") + r = ls('{dirname}') x = '' except Exception as e: s = 0 - r = [("{dirname}", -2)] + r = [('{dirname}', -2)] x = str(e) print(json.dumps([s, r, x])) @@ -442,8 +472,8 @@ class FileSystem(object): def ls(dirname): e = [] - if not dirname.endswith("/"): - dirname += "/" + if not dirname.endswith('/'): + dirname += '/' for t in os.ilistdir(dirname): if dirname.startswith('/'): dirname = dirname[1:] @@ -452,14 +482,14 @@ class FileSystem(object): e.extend(ls(dirname + t[0] + '/')) else: e.append((dirname + t[0], os.stat(dirname + t[0])[6])) - return e + return sorted(e, reverse=True) def rm(filename): r = [] if os.stat(filename)[0] == 0x4000: e = ls(filename) if filename != '/': - e.append((filename, -1)) + e.append((filename.strip('/'), -1)) for f, s in e: if not s == -1: try: @@ -484,11 +514,11 @@ class FileSystem(object): try: s = 1 - r = rm("{filename}") + r = rm('{filename}') x = '' except Exception as e: s = 0 - r = [("{filename}", 0, str(e))] + r = [('{filename}', 0, str(e))] x = str(e) print(json.dumps([s, r, x])) @@ -513,19 +543,19 @@ class FileSystem(object): r = [] d = [] - for zd in str("{dirname}").split("/"): + for zd in str('{dirname}').split('/'): if not zd: continue d.append(zd) - zd = "/".join(d) + zd = '/'.join(d) try: os.mkdir(zd) - r.append(("/" + zd, 1)) + r.append(('/' + zd, 1)) except Exception as e: if str(e).find('EEXIST'): - r.append(("/" + zd, 1)) + r.append(('/' + zd, 1)) else: - r.append(("/" + zd, 0, str(e))) + r.append(('/' + zd, 0, str(e))) print(json.dumps(r)) """) @@ -535,7 +565,7 @@ class FileSystem(object): def launch(self, filename: str): try: self.terminal(f""" - with open("{filename}", "r") as fh: + with open('{filename}', 'r') as fh: exec(fh.read()) """, stream_output=True) except Exception as e: @@ -551,19 +581,8 @@ class FileSystem(object): class Picowatch(object): - def __init__(self, pyboard: Pyboard, project_name: str = 'src'): + def __init__(self, pyboard: Pyboard): self.filesystem = FileSystem(pyboard) - project_name = project_name.strip('./').strip('/') - - if not project_name or project_name == '/': - raise Exception('Project name is incorrect, can not be empty or only a forward slash "/"!') - - self.project_name = project_name.replace(os.sep, '/') - self.project_dirname = os.path.join(os.getcwd(), self.project_name).replace(os.sep, '/') - - if not os.path.isdir(self.project_dirname): - raise Exception('Project is not a directory!') - signal.signal(signal.SIGINT, lambda a, b: self.interupt()) def interupt(self): @@ -572,6 +591,40 @@ class Picowatch(object): def terminal(self, command: str): self.filesystem.terminal(command, stream_output=True) + def internal_ls(self, filepath: str): + queue = [] + + if filepath == '/': + filepath = os.getcwd().replace(os.sep, '/') + else: + filepath = os.path.join(os.getcwd(), filepath.strip('./').strip('/')).replace(os.sep, '/') + + if os.path.isfile(filepath): + queue = [(filepath, os.stat(filepath)[6])] + elif os.path.isdir(filepath): + def ls(dirname: str): + e = [] + + if os.path.isdir(dirname): + if not dirname.endswith('/'): + dirname += '/' + + for filename in os.listdir(dirname): + if filename.startswith('.'): + continue + + filename = dirname + filename + + if os.path.isdir(filename): + e.extend(ls(filename)) + else: + e.append((filename, os.stat(filename)[6])) + return e + + queue = ls(filepath) + + return queue + def listing(self, filepath: str = '/'): filepath = filepath.strip('./') tab = Tab(4, 30, 15, 100) @@ -596,42 +649,29 @@ class Picowatch(object): print(ln) def upload(self, filepath: str): - filepath = filepath.strip('./').strip('/') - queue = [] - source = os.path.join(self.project_dirname, filepath).replace(os.sep, '/') + tab = Tab(4, 30, 15, 15, 100) + tab.labels('[ ]', 'Filename', 'Size (kb)', 'Checksum', 'Exception') - if os.path.isdir(source): - for root, _, files in os.walk(source, followlinks=True): - for filename in files: - filename = os.path.join(root, filename).replace(os.sep, '/') - queue.append((filename, filename.replace(self.project_dirname, ''))) - elif os.path.exists(source): - queue.append((source, filepath)) - - tab = Tab(4, 30, 15, 100) - tab.labels('[ ]', 'Filename', 'Checksum', 'Exception') - - for source, destination in queue: - destination = destination.strip('/') + for source, size in self.internal_ls(filepath): + destination = source.replace(os.getcwd().replace(os.sep, '/'), '').strip('/') try: - tab.print('[↑]', destination, self.filesystem.upload(source, destination)) + tab.print('[↑]', destination, f'{round(size / 1024, 2)} kb', self.filesystem.upload(source, destination)) except Exception as e: - tab.print('[?]', destination, '', str(e)) + tab.print('[?]', destination, '', '', str(e)) def download(self, filepath: str): - filepath = filepath.strip('./').strip('/') tab = Tab(4, 30, 15, 100) tab.labels('[ ]', 'Filename', 'Checksum', 'Exception') - status, output, exception = self.filesystem.ls(filepath) + status, output, exception = self.filesystem.ls(filepath.strip('./').strip('/')) if status: for remote, size in output: if size == -1: - os.makedirs(os.path.join(self.project_dirname, remote), 777, exist_ok=True) + os.makedirs(os.path.join(os.getcwd(), remote), 777, exist_ok=True) for remote, size in output: - destination = os.path.join(self.project_dirname, remote).replace(os.sep, '/') + destination = os.path.join(os.getcwd(), remote).replace(os.sep, '/') if not size == -1: try: @@ -642,10 +682,9 @@ class Picowatch(object): tab.print('[?]', filepath, '', exception) def delete(self, filepath: str): - filepath = filepath.strip('./') tab = Tab(4, 30, 100) tab.labels('[ ]', 'Filename', 'Exception') - status, output, exception = self.filesystem.rm(filepath) + status, output, exception = self.filesystem.rm(filepath.strip('./')) if status: for filename, checked, exception in output: @@ -657,15 +696,14 @@ class Picowatch(object): tab.print('[?]', filepath, exception) def compare(self, filepath: str): - filepath = filepath.strip('./').strip('/') - content, _ = self.filesystem.get(filepath) + content, _ = self.filesystem.get(filepath.strip('./').strip('/')) fh, tempname = tempfile.mkstemp() try: with os.fdopen(fh, 'wb') as tmp: tmp.write(content) - subprocess.Popen(f'code --diff "{tempname}" "{os.path.join(self.project_dirname, filepath)}"', stdout=subprocess.PIPE, shell=True).communicate() + subprocess.Popen(f'code --diff "{tempname}" "{os.path.join(os.getcwd(), filepath)}"', stdout=subprocess.PIPE, shell=True).communicate() finally: input('Press Enter to delete temp file.') os.remove(tempname) @@ -682,13 +720,15 @@ class Picowatch(object): status, filename = filename.split(' ') - if filename.startswith(self.project_name): - if status in ['A', 'M', '??']: - changes.append((1, filename.replace(self.project_name + '/', ''))) - elif status == 'D': - changes.append((-1, filename.replace(self.project_name + '/', ''))) + if status in ['A', 'M', '??']: + changes.append((1, filename)) + elif status == 'D': + changes.append((-1, filename)) except Exception as e: - print(e.output.decode('utf-8').strip()) + try: + print(e.output.decode('utf-8').strip()) + except: + print(e.decode('utf-8').strip()) finally: if return_output: return changes @@ -701,7 +741,7 @@ class Picowatch(object): def push(self): tab = Tab(4, 30, 15, 100) - tab.labels('[ ]', 'Filename', 'Checksum', 'Exception') + tab.labels('[ ]', 'Filename', 'Size (kb)', 'Checksum', 'Exception') changes = self.status(return_output=True) for filepath in [filename for status, filename in changes if status == -1]: @@ -711,64 +751,65 @@ class Picowatch(object): if status: for filename, checked, exception in output: if checked: - tab.print('[-]', filename) + tab.print('[-]', filename, '', 'DELETED') else: - tab.print('[?]', filename, '', exception) + tab.print('[?]', filename, '', '', exception) else: tab.print('[?]', filepath, '', exception) queue = [] for filepath in [filename for status, filename in changes if status == 1]: - filepath = filepath.strip('/') - source = os.path.join(self.project_dirname, filepath).replace(os.sep, '/') + queue.extend(self.internal_ls(filepath)) - if os.path.isdir(source): - for root, _, files in os.walk(source, followlinks=True): - for file in files: - file = os.path.join(root, file).replace(os.sep, '/') - queue.append((file, file.replace(self.project_dirname, ''))) - elif os.path.exists(source): - queue.append((source, filepath)) - - for source, destination in queue: - destination = destination.strip('/') + for source, size in queue: + destination = source.replace(os.getcwd().replace(os.sep, '/'), '').strip('/') try: - tab.print('[↑]', destination, self.filesystem.upload(source, destination)) + tab.print('[↑]', destination, f'{round(size / 1024, 2)} kb', self.filesystem.upload(source, destination)) except Exception as e: - tab.print('[?]', destination, '', str(e)) + tab.print('[?]', destination, '', '', str(e)) print('Pico board up to date.') + def compile(self, filename: str): + _, error = mpy_cross.run(filename, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True).communicate() + + if error: + print(error.decode('utf-8')) + else: + print(f'MicroPython File from "{filename}" created!') + def test(self, filename: str): - with open(os.path.join(self.project_dirname, filename), 'r') as fh: + with open(os.path.join(os.getcwd(), filename), 'r') as fh: self.filesystem.terminal(fh.read(), stream_output=True) - def launch(self, filepath: str): - self.filesystem.launch(filepath) + def launch(self, filename: str): + self.filesystem.launch(filename) print('Welcome to Picowatch Terminal') +print(f'Listening to project: {os.getcwd().replace(os.sep, "/")}/') picowatch = False try: env = dotenv_values('.picowatch') - picowatch = Picowatch(Pyboard(env["DEVICE"], env["BAUDRATE"]), env["PROJECT_NAME"]) + picowatch = Picowatch(Pyboard(env["DEVICE"], env["BAUDRATE"])) print(f'Connected automatically to device: {env["DEVICE"]} at a baudrate of: {env["BAUDRATE"]}') - print(f'Listening to project: ./{env["PROJECT_NAME"]}') except: while not picowatch: print('-' * 50) device = input('Port: ').strip() baudrate = input('Baudrate (115200): ').strip() or 115200 - project_name = input('Project name (src): ').strip() or 'src' try: - picowatch = Picowatch(Pyboard(device, baudrate), project_name) + picowatch = Picowatch(Pyboard(device, baudrate)) print('-' * 50) - print(f'Connected to device: {picowatch} at a baudrate of: {baudrate}') - print(f'Listening to project: ./{env["PROJECT_NAME"]}') + print(f'Connected to device: {device} and baudrate: {baudrate}') + + with open(os.path.join(os.getcwd(), '.picowatch'), 'w+') as fh: + fh.write(f'DEVICE = "{device}"\n') + fh.write(f'BAUDRATE = {baudrate}\n') except Exception as e: print(str(e)) @@ -803,9 +844,8 @@ while True: picowatch.status(return_output=False) case ['push']: picowatch.push() - case ['compile', filename]: - # https://pypi.org/project/mpy-cross/ - pass + case ['mpy' | 'compile', filename]: + picowatch.compile(filename) case ['install', package_name]: pass case ['test', filename]: