Save wip...

This commit is contained in:
Gino D
2023-01-02 11:12:01 +01:00
parent 9eb7c15ed6
commit 9222f80ed9

View File

@@ -27,16 +27,17 @@ import sys
import time
import json
import signal
import tempfile
import binascii
import textwrap
import subprocess
from serial import Serial
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
from typing import Dict, List, Optional, Tuple, Union
BUFFER_SIZE: int = 126
CURRENT_DIRECTORY: str = os.getcwd().replace('\\', '/')
def strpad(text: str, length: int = 30) -> str:
return text + ' ' * (length - len(text))
@@ -258,14 +259,14 @@ class Pyboard(object):
for call in traceback[1][:-2].split('\\r\\n'):
reason += f'{call}\n'
raise Exception(f'-------\n{reason.strip()}')
raise Exception(f'-------------------\n{reason.strip()}')
else:
raise Exception(f'-------\n{exception}')
raise Exception(f'-------------------\n{exception}')
return data.strip()
class FileSystem(object):
class PicoTerminal(object):
_pyboard: Pyboard
def __init__(self, pyboard: Pyboard):
@@ -512,20 +513,20 @@ class FileSystem(object):
class Picowatch(object):
_fs: FileSystem
pico_terminal: PicoTerminal
def __init__(self, pyboard: Pyboard):
self._fs = FileSystem(pyboard)
self.pico_terminal = PicoTerminal(pyboard)
signal.signal(signal.SIGINT, lambda a, b: self.interupt())
def interupt(self):
self._fs._pyboard.send_ctrl_c()
self.pico_terminal._pyboard.send_ctrl_c()
def terminal(self, command: str):
self._fs.terminal(command, stream_output=True)
self.pico_terminal.terminal(command, stream_output=True)
def listing(self, remote: str = '/'):
status, output, exception = self._fs.ls(remote)
status, output, exception = self.pico_terminal.ls(remote)
if status:
print('-' * 60)
@@ -542,7 +543,7 @@ class Picowatch(object):
def contents(self, remote: str):
try:
content, _ = self._fs.get(remote)
content, _ = self.pico_terminal.get(remote)
for ln in content.decode('utf-8').split('\n'):
print(ln)
@@ -575,7 +576,7 @@ class Picowatch(object):
for filename, remote in queue:
try:
print('[↑]', strpad(filename), self._fs.upload(filename, remote))
print('[↑]', strpad(filename), self.pico_terminal.upload(filename, remote))
except Exception as e:
print('[?]', strpad(filename), str(e))
@@ -583,7 +584,7 @@ class Picowatch(object):
if filepath.startswith('.'):
filepath = filepath[1:]
status, output, exception = self._fs.ls(filepath)
status, output, exception = self.pico_terminal.ls(filepath)
if status:
print('-' * 60)
@@ -599,14 +600,14 @@ class Picowatch(object):
if not size == -1:
try:
print('[↓]', strpad(local), self._fs.download(remote, local))
print('[↓]', strpad(local), self.pico_terminal.download(remote, local))
except Exception as e:
print('[?]', strpad(local), str(e))
else:
print('[?]', strpad(filepath), exception)
def delete(self, filepath: str):
status, output, exception = self._fs.rm(filepath)
status, output, exception = self.pico_terminal.rm(filepath)
if status:
print('-' * 60)
@@ -623,8 +624,26 @@ class Picowatch(object):
else:
print('[?]', strpad(filepath), exception)
def compare(self, source: str):
content, _ = self.pico_terminal.get(source)
fh, filename = tempfile.mkstemp()
try:
with os.fdopen(fh, 'wb') as tmp:
tmp.write(content)
subprocess.Popen(f'code --diff "./src/{source}" "{filename}"', stdout=subprocess.PIPE, shell=True).communicate()
print(filename)
finally:
input('<<< ')
os.remove(filename)
def status(self):
status_out = subprocess.check_output(['git', 'status', '-s'])
print(status_out)
def launch(self, filepath: str):
self._fs.launch(filepath)
self.pico_terminal.launch(filepath)
def watch(self, filename: str):
if filename.startswith('/'):
@@ -634,7 +653,7 @@ class Picowatch(object):
filename = './' + filename
with open(filename, 'r') as fh:
self._fs.terminal(fh.read(), stream_output=True)
self.pico_terminal.terminal(fh.read(), stream_output=True)
@@ -656,89 +675,33 @@ print('Welcome to Picowatch Terminal')
picowatch = Picowatch(Pyboard('COM5'))
picowatch.interupt()
# sessions = {'deleted': set(), 'modified': set()}
# def on_modified_callback(event):
# if event.is_directory == True:
# return
# source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/')
# sessions['modified'].add(source)
# def on_deleted_callback(event):
# source = event.src_path.replace(WATCHING_DIRECTORY, '').replace('\\', '/')
# if event.is_directory == True and not source.endswith('/'):
# source += '/'
# elif len(source.split('.')) == 1:
# source += '/'
# sessions['deleted'].add(source)
# def watchdog_callback():
# for source in sessions['deleted']:
# delete(source, is_directory=source.endswith('/'))
# for source in sessions['modified']:
# upload(source)
# sessions['deleted'] = set()
# sessions['modified'] = set()
# watchdog_event = PatternMatchingEventHandler(
# patterns = ['*'],
# ignore_patterns = None,
# ignore_directories = False,
# case_sensitive = True
# )
# watchdog_event.on_modified = on_modified_callback
# watchdog_event.on_deleted = on_deleted_callback
# watchdog = Observer()
# watchdog.schedule(watchdog_event, path = './', recursive = True)
# watchdog.start()
# try:
while True:
try:
unmessage = input('>>> ').strip()
for message in unmessage.split('&'):
match message.strip().split(' '):
case ['0' | 'exit']:
case ['exit']:
sys.exit()
case ['reboot' | 'reset']:
case ['reboot']:
picowatch.terminal('help()')
case ['ls' | 'stat', *source]:
case ['ls', *source]:
picowatch.listing(source[0] if source else '/')
case ['cat' | 'open' | 'contents', source]:
case ['cat', source]:
picowatch.contents(source)
case ['del' | 'rm' | 'delete' | 'remove', source]:
case ['rm', source]:
picowatch.delete(source)
case ['format']:
picowatch.delete('/')
case ['upl' | 'upload' | 'update', source]:
case ['put', source]:
picowatch.upload(source)
case ['restore']:
picowatch.upload('/')
case ['download' | 'transfer', source]:
case ['get', source]:
picowatch.download(source)
case ['backup']:
picowatch.download('/')
# case ['' | 'save' | 'commit']:
# watchdog_callback()
# case ['status' | 'staged']:
# for filename in sessions['deleted']:
# print('-', filename)
# for filename in sessions['modified']:
# print('+', filename)
# case ['cancel' | 'unstaged']:
# sessions['deleted'] = set()
# sessions['modified'] = set()
case ['!', 'watch' | 'test', filename]:
case ['diff', filename]:
picowatch.compare(filename)
case ['status']:
picowatch.status()
case ['push']:
pass
case ['test', filename]:
picowatch.watch(filename)
case ['!']:
picowatch.watch('main.py')
@@ -751,7 +714,34 @@ while True:
print(f'"{message}" is not recognized.')
except Exception as e:
print(str(e))
# except KeyboardInterrupt:
# watchdog.stop()
# watchdog.join()
# def git_status():
# '''Get the status of the local git repo'''
# global modified, deleted, added
# # A Dictionary with status-list pairs
# delta = {'M' : modified, 'D' : deleted, '?' : added, 'A' : added}
# # Get the output of the 'git status -s' CMD command as a string
# status_out = subprocess.check_output(['git', 'status', '-s'])
# # Split recived output to lines
# files = status_out.split('\n')
# # Sort the filenames to their appropriate list
# for filename in files:
# # Avoid errors (last line of output might be '')
# if len(filename) < 1:
# continue
# # Remove all spaces in filename
# filename = filename.replace(' ', '')
# # Get the status and remove it from file name
# file_status = filename[0]
# filename = filename[1:]
# # Untracked files are prefixed by '??' so we need to remove one '?'
# if file_status == '?':
# filename = filename[1:]
# # If status unfamilier, ignore the file
# # TODO: add 'other' list
# # TODO: Dispose of unwanted extansions here
# if file_status not in delta.keys():
# continue
# delta[file_status].append(filename)
# stage_file(filename)