From 967b5b0f93f2883d44e35840beb95970188066b9 Mon Sep 17 00:00:00 2001 From: Gino D Date: Wed, 4 Jan 2023 15:40:22 +0100 Subject: [PATCH] Release v1 --- picowatch | 3 - picowatch_d/picowatch.py => picowatch.py | 265 +++++++++++++---------- 2 files changed, 152 insertions(+), 116 deletions(-) delete mode 100644 picowatch rename picowatch_d/picowatch.py => picowatch.py (73%) diff --git a/picowatch b/picowatch deleted file mode 100644 index 03f5839..0000000 --- a/picowatch +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -python "$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &> /dev/null && pwd)/picowatch_d/picowatch.py" \ No newline at end of file diff --git a/picowatch_d/picowatch.py b/picowatch.py similarity index 73% rename from picowatch_d/picowatch.py rename to picowatch.py index eb1f6b7..b21441e 100644 --- a/picowatch_d/picowatch.py +++ b/picowatch.py @@ -33,11 +33,10 @@ import textwrap import subprocess from serial import Serial -from typing import Dict, List, Optional, Tuple, Union +from typing import List, Optional, Tuple, Union BUFFER_SIZE: int = 126 -CURRENT_DIRECTORY: str = os.getcwd().replace('\\', '/') class Tab(): @@ -51,6 +50,8 @@ class Tab(): def print(self, *texts: str): def coltext(text: str, length: int = 20) -> str: + text = str(text) + if len(text) >= length: text = text[:(length - 4) if length > 5 else length] + '...' @@ -67,6 +68,7 @@ class Tab(): print(line) def labels(self, *texts: str, blank_text: str = '-'): + self.blank(blank_text) self.print(*texts) self.blank(blank_text) @@ -294,7 +296,7 @@ class Pyboard(object): return data.strip() -class PyboardTerminal(object): +class FileSystem(object): pyboard: Pyboard def __init__(self, pyboard: Pyboard): @@ -544,131 +546,126 @@ class PyboardTerminal(object): class Picowatch(object): - terminal: PyboardTerminal - def __init__(self, pyboard: Pyboard): - self.terminal = PyboardTerminal(pyboard) + def __init__(self, pyboard: Pyboard, project_name: str = 'src'): + self.filesystem = FileSystem(pyboard) + + project_name = project_name.strip('./').strip('/') + + if project_name == '/': + raise Exception('Project name is incorrect!') + + self.project_name = project_name.replace(os.sep, '/') + self.project_dirname = os.path.join(os.getcwd(), self.project_name).replace(os.sep, '/') signal.signal(signal.SIGINT, lambda a, b: self.interupt()) def interupt(self): - self.terminal.pyboard.send_ctrl_c() + self.filesystem.pyboard.send_ctrl_c() def terminal(self, command: str): - self.terminal.terminal(command, stream_output=True) + self.filesystem.terminal(command, stream_output=True) - def listing(self, remote: str = '/'): - status, output, exception = self.terminal.ls(remote) + def listing(self, filepath: str = '/'): + filepath = filepath.strip('./') + tab = Tab(4, 30, 15, 30) + tab.labels('[ ]', 'Filename', 'Size (kb)', 'Exception') + status, output, exception = self.filesystem.ls(filepath) if status: - tab = Tab(4, 30, 15, 30) - tab.labels('[ ]', 'Filename', 'Size (kb)', 'Exception') - - for name, size in output: + for filename, size in output: if size == -1: - tab.print('[*]', name[1:], '-') + tab.print('[*]', filename, '-') else: - tab.print('[*]', name[1:], f'{size}b') + tab.print('[*]', filename, f'{size}b') else: - tab.print('[?]', remote, '', str(exception)) + tab.print('[?]', filepath, '', str(exception)) - def contents(self, remote: str): - try: - content, _ = self.terminal.get(remote) + def contents(self, filename: str): + filename = filename.strip('./').strip('/') + content, _ = self.filesystem.get(filename) + print('-' * 50) - for ln in content.decode('utf-8').split('\n'): - print(ln) - except Exception as e: - print('[?]', remote, f'\n{str(e)}') + for ln in content.decode('utf-8').split('\n'): + print(ln) def upload(self, filepath: str): - local = filepath - - if local.startswith('/'): - local = '.' + local - - if not local.startswith('./'): - local = './' + local - + filepath = filepath.strip('./').strip('/') queue = [] + source = os.path.join(self.project_dirname, filepath).replace(os.sep, '/') - if os.path.isdir(local): - for root, _, files in os.walk(local, followlinks=True): + if os.path.isdir(source): + for root, _, files in os.walk(source, followlinks=True): for filename in files: - filename = os.path.join(root, filename).replace('\\', '/') - queue.append((filename, filename[1:])) - - elif os.path.exists(local): - queue.append((local, filepath)) + filename = os.path.join(root, filename).replace(os.sep, '/') + queue.append((filename, filename.replace(self.project_dirname, ''))) + elif os.path.exists(source): + queue.append((source, filepath)) tab = Tab(4, 30, 15, 30) tab.labels('[ ]', 'Filename', 'Checksum', 'Exception') - for filename, remote in queue: + for source, destination in queue: + destination = destination.strip('/') + try: - tab.print('[↑]', filename, self.terminal.upload(filename, remote)) + tab.print('[↑]', destination, self.filesystem.upload(source, destination)) except Exception as e: - tab.print('[?]', filename, '-', str(e)) + tab.print('[?]', destination, '', str(e)) def download(self, filepath: str): - if filepath.startswith('.'): - filepath = filepath[1:] - - status, output, exception = self.terminal.ls(filepath) + filepath = filepath.strip('./').strip('/') + tab = Tab(4, 30, 15, 30) + tab.labels('', 'Filename', 'Checksum', 'Exception') + status, output, exception = self.filesystem.ls(filepath) if status: - tab = Tab(4, 30, 15, 30) - tab.labels('', 'Filename', 'Checksum', 'Exception') - for remote, size in output: if size == -1: - os.makedirs(f'.{remote}', 777, exist_ok=True) + os.makedirs(os.path.join(self.project_dirname, remote), 777, exist_ok=True) for remote, size in output: - local = f'.{remote}' - if not size == -1: try: - tab.print('[↓]', local, self.terminal.download(remote, local)) + tab.print('[↓]', remote, self.filesystem.download(remote, os.path.join(self.project_dirname, remote))) except Exception as e: - tab.print('[?]', local, '', str(e)) + tab.print('[?]', remote, '', str(e)) else: tab.print('[?]', filepath, '', exception) def delete(self, filepath: str): - status, output, exception = self.terminal.rm(filepath) + filepath = filepath.strip('./') + tab = Tab(4, 30, 30) + tab.labels('[ ]', 'Filename', 'Exception') + status, output, exception = self.filesystem.rm(filepath) if status: - tab = Tab(4, 30, 30) - tab.labels('[ ]', 'Filename', 'Exception') - - for remote, checked, message in output: - local = f'.{remote}' - + for filename, checked, exception in output: if checked: - tab.print('[-]', local) + tab.print('[-]', filename) else: - tab.print('[?]', local, message) + tab.print('[?]', filename, exception) else: tab.print('[?]', filepath, exception) - def compare(self, source: str): - content, _ = self.terminal.get(source) - fh, filename = tempfile.mkstemp() + def compare(self, filepath: str): + filepath = filepath.strip('./').strip('/') + content, _ = self.filesystem.get(filepath) + fh, tempname = tempfile.mkstemp() try: with os.fdopen(fh, 'wb') as tmp: tmp.write(content) - subprocess.Popen(f'code --diff "{filename}" "./src/{source}"', stdout=subprocess.PIPE, shell=True).communicate() + subprocess.Popen(f'code --diff "{tempname}" "{os.path.join(self.project_dirname, filepath)}"', stdout=subprocess.PIPE, shell=True).communicate() finally: input('Press Enter to delete temp file.') - os.remove(filename) + os.remove(tempname) - def status(self): + def status(self, return_output: bool = False): changes = [] try: - output = subprocess.check_output(['git', 'status', '-s']) + output = subprocess.check_output(['git', 'status', '-s'], stderr=subprocess.STDOUT) for filename in [f.strip() for f in output.decode('utf-8').split('\n')]: if not filename: @@ -676,51 +673,89 @@ class Picowatch(object): status, filename = filename.split(' ') - if filename.startswith('src/'): + if filename.startswith(self.project_name): if status in ['A', 'M', '??']: - changes.append((1, filename)) + changes.append((1, filename.replace(self.project_name + '/', ''))) elif status == 'D': - changes.append((0, filename)) - except: - pass - - tab = Tab(4, 40) - tab.labels('[ ]', 'Filename') + changes.append((-1, filename.replace(self.project_name + '/', ''))) + except Exception as e: + print(e.output.decode('utf-8').strip()) + finally: + if return_output: + return changes - for status, filename in changes: - tab.print('[+]' if status == 1 else '[-]', filename) + tab = Tab(4, 40) + tab.labels('[ ]', 'Filename') + + for status, filename in changes: + tab.print('[+]' if status == 1 else '[-]', filename) + + def push(self): + tab = Tab(4, 30, 15, 30) + tab.labels('[ ]', 'Filename', 'Checksum', 'Exception') + changes = self.status(return_output=True) + + for filepath in [filename for status, filename in changes if status == -1]: + filepath = filepath.strip('/') + status, output, exception = self.filesystem.rm(filepath) + + if status: + for filename, checked, exception in output: + if checked: + tab.print('[-]', filename) + else: + tab.print('[?]', filename, '', exception) + else: + tab.print('[?]', filepath, '', exception) + + queue = [] + + for filepath in [filename for status, filename in changes if status == 1]: + filepath = filepath.strip('/') + source = os.path.join(self.project_dirname, filepath).replace(os.sep, '/') + + if os.path.isdir(source): + for root, _, files in os.walk(source, followlinks=True): + for file in files: + file = os.path.join(root, file).replace(os.sep, '/') + queue.append((file, file.replace(self.project_dirname, ''))) + elif os.path.exists(source): + queue.append((source, filepath)) + + for source, destination in queue: + destination = destination.strip('/') + + try: + tab.print('[↑]', destination, self.filesystem.upload(source, destination)) + except Exception as e: + tab.print('[?]', destination, '', str(e)) + + print('Pico board up to date.') + + def test(self, filename: str): + with open(os.path.join(self.project_dirname, filename), 'r') as fh: + self.filesystem.terminal(fh.read(), stream_output=True) def launch(self, filepath: str): - self.terminal.launch(filepath) - - def watch(self, filename: str): - if filename.startswith('/'): - filename = '.' + filename - - if not filename.startswith('./'): - filename = './' + filename - - with open(filename, 'r') as fh: - self.terminal.terminal(fh.read(), stream_output=True) - + self.filesystem.launch(filepath) print('Welcome to Picowatch Terminal') -# picowatch = False +picowatch = False -# while not picowatch: -# print('-' * 30) -# device = input('Port: ').strip() -# baudrate = input('Baudrate (115200): ').strip() or 115200 +while not picowatch: + print('-' * 30) + device = input('Port: ').strip() + baudrate = input('Baudrate (115200): ').strip() or 115200 + project_name = input('Project name (src): ').strip() or 'src' -# try: -# picowatch = Picowatch(Pyboard(device=device, baudrate=baudrate)) -# print(f'Connected to device: {device} at a baudrate of: {baudrate}') -# print('-' * 30) -# except Exception as e: -# print(str(e)) + try: + picowatch = Picowatch(Pyboard(device, baudrate), project_name) + print(f'Connected to device: {device} at a baudrate of: {baudrate}. Listening to project: {project_name}') + print('-' * 30) + except Exception as e: + print(str(e)) -picowatch = Picowatch(Pyboard('COM5')) picowatch.interupt() while True: @@ -733,26 +768,30 @@ while True: sys.exit() case ['reboot']: picowatch.terminal('help()') - case ['ls', *source]: + case ['ls' | 'list', *source]: picowatch.listing(source[0] if source else '/') - case ['cat', source]: + case ['cat' | 'code', source]: picowatch.contents(source) - case ['rm', source]: + case ['rm' | 'delete', source]: picowatch.delete(source) - case ['put', source]: + case ['put' | 'upload', source]: picowatch.upload(source) - case ['get', source]: + case ['get' | 'download', source]: picowatch.download(source) - case ['diff', filename]: + case ['diff' | 'compare', filename]: picowatch.compare(filename) case ['status']: - picowatch.status() + picowatch.status(return_output=False) case ['push']: + picowatch.push() + case ['compile', filename]: + pass + case ['install', package_name]: pass case ['test', filename]: - picowatch.watch(filename) + picowatch.test(filename) case ['!']: - picowatch.watch('main.py') + picowatch.test('main.py') case ['!!']: picowatch.launch('main.py') case _: