Refactoring in progress...
This commit is contained in:
@@ -27,6 +27,7 @@ import sys
|
||||
import time
|
||||
import json
|
||||
import serial
|
||||
import signal
|
||||
import binascii
|
||||
import textwrap
|
||||
|
||||
@@ -147,6 +148,11 @@ class Pyboard(object):
|
||||
self.serial.write(b'\x04')
|
||||
return b'soft reboot\r\n'
|
||||
|
||||
def interupt(self, max_times: int = 2):
|
||||
for _ in range(0, max_times):
|
||||
self.serial.write(b'\r\x03')
|
||||
time.sleep(0.05)
|
||||
|
||||
def read_until(self, delimiter: bytes, stream_output: bool = False) -> Optional[bytes]:
|
||||
data = self.serial.read(1)
|
||||
|
||||
@@ -400,7 +406,8 @@ class FileSystem(object):
|
||||
r = []
|
||||
if os.stat(filename)[0] == 0x4000:
|
||||
e = ls(filename)
|
||||
e.append((filename, -1))
|
||||
if filename != '/':
|
||||
e.append((filename, -1))
|
||||
for f, s in e:
|
||||
if not s == -1:
|
||||
try:
|
||||
@@ -459,7 +466,6 @@ class FileSystem(object):
|
||||
continue
|
||||
d.append(zd)
|
||||
zd = "/".join(d)
|
||||
|
||||
try:
|
||||
os.mkdir(zd)
|
||||
r.append(("/" + zd, 1))
|
||||
@@ -491,14 +497,14 @@ class FileSystem(object):
|
||||
raise e
|
||||
|
||||
|
||||
PRINT_PREFIX_EXCEPTION = '\t =>'
|
||||
|
||||
PRINT_PREFIX_EXCEPTION = '\t\t =>'
|
||||
|
||||
class Picowatch(object):
|
||||
_fs: FileSystem
|
||||
|
||||
def __init__(self, pyboard: Pyboard):
|
||||
self._fs = FileSystem(pyboard)
|
||||
signal.signal(signal.SIGINT, lambda a, b: pyboard.interupt())
|
||||
|
||||
def listing(self, remote: str = '/'):
|
||||
status, output, exception = self._fs.ls(remote)
|
||||
@@ -506,29 +512,66 @@ class Picowatch(object):
|
||||
if status:
|
||||
for name, size in output:
|
||||
if size == -1:
|
||||
print('[/]', name[1:])
|
||||
print('[.]', name[1:])
|
||||
else:
|
||||
print('[·]', name[1:], f'({size}b)')
|
||||
print('[:]', name[1:], f'({size}b)')
|
||||
else:
|
||||
print('[?]', remote, PRINT_PREFIX_EXCEPTION, exception)
|
||||
|
||||
def upload(self, local: str, remote: str = ''):
|
||||
pass
|
||||
def contents(self, remote: str):
|
||||
try:
|
||||
content, checksum = self._fs.get(remote)
|
||||
|
||||
for ln in content.decode('utf-8').split('\n'):
|
||||
print(ln)
|
||||
except Exception as e:
|
||||
print('[?]', remote, f'\n{str(e)}')
|
||||
|
||||
def upload(self, filepath: str):
|
||||
local = filepath
|
||||
|
||||
if not local.startswith('./'):
|
||||
local = './' + local
|
||||
|
||||
if not local.startswith('.'):
|
||||
local = '.' + local
|
||||
|
||||
queue = []
|
||||
|
||||
if os.path.isdir(local):
|
||||
for root, _, files in os.walk(local, followlinks=True):
|
||||
for filename in files:
|
||||
filename = os.path.join(root, filename).replace('\\', '/')
|
||||
queue.append((filename, filename[1:]))
|
||||
|
||||
elif os.path.exists(local):
|
||||
queue.append((local, filepath))
|
||||
|
||||
for filename, remote in queue:
|
||||
try:
|
||||
print('[↑]', filename, PRINT_PREFIX_EXCEPTION, self._fs.upload(filename, remote))
|
||||
except Exception as e:
|
||||
print('[?]', filename, PRINT_PREFIX_EXCEPTION, str(e))
|
||||
|
||||
def download(self, filepath: str):
|
||||
if filepath.startswith('.'):
|
||||
filepath = filepath[1:]
|
||||
|
||||
status, output, exception = self._fs.ls(filepath)
|
||||
|
||||
if status:
|
||||
for remote, size in output:
|
||||
if size == -1:
|
||||
os.makedirs(remote, 777, exist_ok=True)
|
||||
os.makedirs(f'.{remote}', 777, exist_ok=True)
|
||||
|
||||
for remote, size in output:
|
||||
local = f'.{remote}'
|
||||
|
||||
if not size == -1:
|
||||
try:
|
||||
print('[↓]', remote, PRINT_PREFIX_EXCEPTION, self._fs.download(remote, f'.{remote}'))
|
||||
print('[↓]', local, PRINT_PREFIX_EXCEPTION, self._fs.download(remote, local))
|
||||
except Exception as e:
|
||||
print('[?]', remote, PRINT_PREFIX_EXCEPTION, str(e))
|
||||
print('[?]', local, PRINT_PREFIX_EXCEPTION, str(e))
|
||||
else:
|
||||
print('[?]', filepath, PRINT_PREFIX_EXCEPTION, exception)
|
||||
|
||||
@@ -537,20 +580,17 @@ class Picowatch(object):
|
||||
|
||||
if status:
|
||||
for remote, checked, message in output:
|
||||
local = f'.{remote}'
|
||||
|
||||
if checked:
|
||||
print('[-]', remote)
|
||||
print('[-]', local)
|
||||
else:
|
||||
print('[?]', remote, PRINT_PREFIX_EXCEPTION, message)
|
||||
print('[?]', local, PRINT_PREFIX_EXCEPTION, message)
|
||||
else:
|
||||
print('[?]', filepath, PRINT_PREFIX_EXCEPTION, exception)
|
||||
|
||||
def execute(self, filepath: str):
|
||||
def launch(self, filepath: str):
|
||||
self._fs.launch(filepath)
|
||||
|
||||
def watch(self, filepath: str):
|
||||
pass
|
||||
|
||||
def watch(self, local: str):
|
||||
pass
|
||||
|
||||
|
||||
# picowatch = Picowatch(Pyboard('COM5'))
|
||||
|
||||
# picowatch.listing('/lib/')
|
||||
Reference in New Issue
Block a user