Fixed bug and update version
This commit is contained in:
863
src/picowatch.py
Normal file
863
src/picowatch.py
Normal file
@@ -0,0 +1,863 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Primadiag SAS - Paris
|
||||
# Author: Gino D.
|
||||
# Copyright (c) 2023 Primadiag
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to deal
|
||||
# in the Software without restriction, including without limitation the rights
|
||||
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
# copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import json
|
||||
import signal
|
||||
import tempfile
|
||||
import binascii
|
||||
import textwrap
|
||||
import mpy_cross
|
||||
import subprocess
|
||||
|
||||
from serial import Serial
|
||||
from dotenv import dotenv_values
|
||||
from typing import List, Optional, Tuple, Union
|
||||
|
||||
|
||||
BUFFER_SIZE: int = 256
|
||||
|
||||
|
||||
class Tab():
|
||||
colsize: Tuple = ()
|
||||
|
||||
def __init__(self, *length: int):
|
||||
self.colsize = length
|
||||
|
||||
def blank(self, text: str = '-'):
|
||||
print(text * sum(self.colsize))
|
||||
|
||||
def print(self, *texts: str, truncate: bool = False):
|
||||
def coltext(text: str, padding_length: int = 0, max_length: int = 20, truncate: bool = False) -> str:
|
||||
text = str(text)
|
||||
|
||||
if len(text) >= max_length:
|
||||
if truncate:
|
||||
text = text[:(max_length - 4) if max_length > 5 else max_length] + '...'
|
||||
else:
|
||||
words = []
|
||||
lines = []
|
||||
|
||||
for word in text.split(' '):
|
||||
if sum([len(w) + 1 for w in words]) >= max_length:
|
||||
if len(lines) == 0:
|
||||
lines.append(' '.join(words) + '\n')
|
||||
else:
|
||||
lines.append((' ' * (padding_length - 1)) + ' '.join(words) + '\n')
|
||||
|
||||
words = []
|
||||
|
||||
words.append(word)
|
||||
|
||||
lines.append((' ' * (padding_length - 1)) + ' '.join(words))
|
||||
text = ' '.join(lines)
|
||||
|
||||
return text + ' ' * (max_length - len(text))
|
||||
|
||||
line = ''
|
||||
padding_length = 0
|
||||
|
||||
for i, text in enumerate(texts[:len(self.colsize)]):
|
||||
line += coltext(text, padding_length, self.colsize[i], truncate)
|
||||
padding_length += self.colsize[i]
|
||||
|
||||
print(line)
|
||||
|
||||
def labels(self, *texts: str, blank_text: str = '-'):
|
||||
self.blank(blank_text)
|
||||
self.print(*texts)
|
||||
self.blank(blank_text)
|
||||
|
||||
|
||||
# tab = Tab(4, 11, 60)
|
||||
# tab.print('[*]', 'Notice how even though the input was a set and a tuple', 'Notice how even though the input was a set and a tuple, the output is a list because sorted() returns a new list by definition')
|
||||
# tab.print('[*]', 'abcdefghij', 'The returned object can be cast to a new type if it needs to match the input type. Be careful if attempting to cast the resulting list back to a set, as a set by definition is unordered')
|
||||
# exit()
|
||||
|
||||
|
||||
class Telnet:
|
||||
|
||||
def __init__(self, IP: str, login: str, password: str):
|
||||
import telnetlib
|
||||
from collections import deque
|
||||
|
||||
self.tn = telnetlib.Telnet(IP, timeout=15)
|
||||
self.fifo = deque()
|
||||
|
||||
if b'Login as:' in self.tn.read_until(b'Login as:'):
|
||||
self.tn.write(bytes(login, 'ascii') + b'\r\n')
|
||||
|
||||
if b'Password:' in self.tn.read_until(b'Password:'):
|
||||
time.sleep(0.2)
|
||||
self.tn.write(bytes(password, 'ascii') + b'\r\n')
|
||||
|
||||
if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.'):
|
||||
return
|
||||
|
||||
raise Exception('Failed to establish a Telnet connection with the board')
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self.tn.close()
|
||||
except:
|
||||
pass
|
||||
|
||||
def read(self, size: int = 1) -> bytes:
|
||||
timeout = 0
|
||||
|
||||
while len(self.fifo) < size and not timeout >= 8:
|
||||
timeout = 0
|
||||
data = self.tn.read_eager()
|
||||
|
||||
if len(data):
|
||||
self.fifo.extend(data)
|
||||
timeout = 0
|
||||
else:
|
||||
timeout += 1
|
||||
time.sleep(0.25)
|
||||
|
||||
data = b''
|
||||
|
||||
while len(data) < size and len(self.fifo) > 0:
|
||||
data += bytes([self.fifo.popleft()])
|
||||
|
||||
return data
|
||||
|
||||
def write(self, data: bytes):
|
||||
self.tn.write(data)
|
||||
return len(data)
|
||||
|
||||
def inWaiting(self) -> int:
|
||||
n_waiting = len(self.fifo)
|
||||
|
||||
if not n_waiting:
|
||||
data = self.tn.read_eager()
|
||||
self.fifo.extend(data)
|
||||
n_waiting = len(data)
|
||||
|
||||
return n_waiting
|
||||
|
||||
|
||||
class Pyboard(object):
|
||||
i: int = 0
|
||||
serial: Union[Serial, Telnet]
|
||||
|
||||
def __init__(self, device: str, baudrate: int = 115200, login: str = '', password: str = ''):
|
||||
is_telnet = device and device.count('.') == 3
|
||||
|
||||
for _ in range(0, 3):
|
||||
try:
|
||||
if is_telnet:
|
||||
self.serial = Telnet(device, login, password)
|
||||
else:
|
||||
self.serial = Serial(device, baudrate=baudrate, interCharTimeout=1)
|
||||
break
|
||||
except:
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise Exception(f'Failed to access device: {device}')
|
||||
|
||||
def close(self):
|
||||
self.serial.close()
|
||||
|
||||
def transfer_status(self) -> int:
|
||||
arrows = ['◜', '◝', '◞', '◟']
|
||||
sys.stdout.write(f'[∘] Transfering... {arrows[self.i]}\r')
|
||||
sys.stdout.flush()
|
||||
self.i = (self.i + 1) % 4
|
||||
|
||||
def stdout_write_bytes(self, data: str):
|
||||
sys.stdout.buffer.write(data.replace(b'\x04', b''))
|
||||
sys.stdout.buffer.flush()
|
||||
|
||||
def send_ok(self) -> bytes:
|
||||
self.serial.write(b'\x04')
|
||||
return b'OK'
|
||||
|
||||
def send_ctrl_a(self) -> bytes:
|
||||
# ctrl-A: enter raw REPL
|
||||
self.serial.write(b'\x01')
|
||||
return b'raw REPL; CTRL-B to exit\r\n>'
|
||||
|
||||
def send_ctrl_b(self):
|
||||
# ctrl-B: exit raw REPL
|
||||
self.serial.write(b'\x02')
|
||||
|
||||
def send_ctrl_c(self) -> bytes:
|
||||
# ctrl-C twice: interrupt any running program
|
||||
for _ in range(0, 2):
|
||||
self.serial.write(b'\x03')
|
||||
|
||||
return b'raw REPL; CTRL-B to exit\r\n'
|
||||
|
||||
def send_ctrl_d(self) -> bytes:
|
||||
# ctrl-D: soft reset
|
||||
self.serial.write(b'\x04')
|
||||
return b'soft reboot\r\n'
|
||||
|
||||
def read_until(self, delimiter: bytes, stream_output: bool = False, show_status: bool = False) -> Optional[bytes]:
|
||||
data = self.serial.read(1)
|
||||
|
||||
if stream_output:
|
||||
self.stdout_write_bytes(data)
|
||||
|
||||
timeout = 0
|
||||
max_len = len(delimiter)
|
||||
|
||||
while not timeout >= 1000:
|
||||
if data.endswith(delimiter):
|
||||
return data
|
||||
elif self.serial.inWaiting() > 0:
|
||||
timeout = 0
|
||||
stream_data = self.serial.read(1)
|
||||
data += stream_data
|
||||
|
||||
if stream_output:
|
||||
self.stdout_write_bytes(stream_data)
|
||||
data = data[-max_len:]
|
||||
elif show_status:
|
||||
self.transfer_status()
|
||||
else:
|
||||
timeout += 1
|
||||
time.sleep(0.0001)
|
||||
|
||||
def __enter__(self):
|
||||
self.send_ctrl_c()
|
||||
n = self.serial.inWaiting()
|
||||
|
||||
while n > 0:
|
||||
self.serial.read(n)
|
||||
n = self.serial.inWaiting()
|
||||
|
||||
for _ in range(0, 5):
|
||||
if self.read_until(self.send_ctrl_a()):
|
||||
break
|
||||
|
||||
time.sleep(0.01)
|
||||
else:
|
||||
raise Exception('REPL: could not enter')
|
||||
|
||||
if not self.read_until(self.send_ctrl_d()):
|
||||
raise Exception('REPL: could not soft reboot')
|
||||
|
||||
if not self.read_until(self.send_ctrl_c()):
|
||||
raise Exception('REPL: could not interrupt after soft reboot')
|
||||
|
||||
return self.__terminal
|
||||
|
||||
def __exit__(self, a, b, c):
|
||||
self.send_ctrl_b()
|
||||
|
||||
def __terminal(self, command: str, stream_output: bool = False) -> Optional[str]:
|
||||
command = textwrap.dedent(command)
|
||||
# send input
|
||||
if not isinstance(command, bytes):
|
||||
command = bytes(command, encoding='utf8')
|
||||
|
||||
if not self.read_until(b'>'):
|
||||
raise Exception('Terminal: prompt has been lost')
|
||||
|
||||
for i in range(0, len(command), BUFFER_SIZE):
|
||||
if not stream_output:
|
||||
self.transfer_status()
|
||||
|
||||
self.serial.write(command[i: min(i + BUFFER_SIZE, len(command))])
|
||||
time.sleep(0.0001)
|
||||
|
||||
if not self.read_until(self.send_ok()):
|
||||
raise Exception('Terminal: could not execute command')
|
||||
|
||||
# catch output
|
||||
data = self.read_until(b'\x04', stream_output=stream_output, show_status=not stream_output)
|
||||
|
||||
if not data:
|
||||
raise Exception('Terminal: timeout waiting for first EOF reception')
|
||||
|
||||
exception = self.read_until(b'\x04')
|
||||
|
||||
if not exception:
|
||||
raise Exception('Terminal: timeout waiting for second EOF reception')
|
||||
|
||||
data, exception = (data[:-1].decode('utf-8'), exception[:-1].decode('utf-8'))
|
||||
|
||||
if exception:
|
||||
reason = 'Traceback (most recent call last):'
|
||||
traceback = exception.split(reason)
|
||||
|
||||
if len(traceback) == 2:
|
||||
for call in traceback[1][:-2].split('\\r\\n'):
|
||||
reason += f'{call}\n'
|
||||
|
||||
raise Exception(f'{"-"* 45}\n{reason.strip()}')
|
||||
else:
|
||||
raise Exception(f'{"-" * 45}\n{exception}')
|
||||
|
||||
return data.strip()
|
||||
|
||||
|
||||
class FileSystem(object):
|
||||
pyboard: Pyboard
|
||||
|
||||
def __init__(self, pyboard: Pyboard):
|
||||
self.pyboard = pyboard
|
||||
|
||||
def checksum(self, source: str, data: str) -> str:
|
||||
output = self.terminal(f"""
|
||||
def checksum(data):
|
||||
v = 21
|
||||
for c in data.decode('utf-8'):
|
||||
v ^= ord(c)
|
||||
return v
|
||||
|
||||
with open('{source}', 'rb') as fh:
|
||||
print(checksum(fh.read()))
|
||||
""")
|
||||
|
||||
if isinstance(data, bytes):
|
||||
data = data.decode('utf-8')
|
||||
|
||||
v = 21
|
||||
for c in data:
|
||||
v ^= ord(c)
|
||||
|
||||
if int(v) == int(output):
|
||||
return 'OK'
|
||||
|
||||
return f'{v} != {output}'
|
||||
|
||||
def get(self, filename: str) -> Tuple[bytes, bool]:
|
||||
if not filename.startswith('/'):
|
||||
filename = '/' + filename
|
||||
|
||||
output = self.terminal(f"""
|
||||
import sys
|
||||
import ubinascii
|
||||
with open('{filename}', 'rb') as infile:
|
||||
while True:
|
||||
result = infile.read({BUFFER_SIZE})
|
||||
if result == b'':
|
||||
break
|
||||
sys.stdout.write(ubinascii.hexlify(result))
|
||||
""")
|
||||
output = binascii.unhexlify(output)
|
||||
|
||||
if filename.endswith('.mpy'):
|
||||
return (output, '???')
|
||||
|
||||
return (output, self.checksum(filename, output))
|
||||
|
||||
def download(self, source: str, destination: str) -> str:
|
||||
output, checksum = self.get(source)
|
||||
|
||||
with open(destination, 'wb') as fh:
|
||||
fh.write(output)
|
||||
|
||||
return checksum
|
||||
|
||||
def put(self, filename: str, data: bytes) -> Tuple[str, bool]:
|
||||
if not filename.startswith('/'):
|
||||
filename = '/' + filename
|
||||
|
||||
if not isinstance(data, bytes):
|
||||
data = bytes(data, encoding='utf8')
|
||||
|
||||
try:
|
||||
if os.path.dirname(filename):
|
||||
self.mkdir(os.path.dirname(filename))
|
||||
|
||||
with self.pyboard as terminal:
|
||||
size = len(data)
|
||||
terminal(f"""fh = open('{filename}', 'wb')""")
|
||||
|
||||
for i in range(0, size, BUFFER_SIZE):
|
||||
chunk_size = min(BUFFER_SIZE, size - i)
|
||||
chunk = repr(data[i : i + chunk_size])
|
||||
terminal(f"""fh.write({chunk})""")
|
||||
|
||||
terminal("""fh.close()""")
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
if filename.endswith('.mpy'):
|
||||
return (filename, '???')
|
||||
|
||||
return (filename, self.checksum(filename, data))
|
||||
|
||||
def upload(self, source: str, destination: str) -> str:
|
||||
with open(source, 'rb') as fh:
|
||||
_, checksum = self.put(destination, fh.read())
|
||||
|
||||
return checksum
|
||||
|
||||
def ls(self, dirname: str = '/') -> Tuple[int, List, str]:
|
||||
dirname = dirname.strip('./').strip('/')
|
||||
|
||||
output = self.terminal(f"""
|
||||
try:
|
||||
import os
|
||||
import json
|
||||
except ImportError:
|
||||
import uos as os
|
||||
import ujson as json
|
||||
|
||||
def ls(dirname):
|
||||
e = []
|
||||
s = os.stat(dirname)
|
||||
if s[0] == 0x4000:
|
||||
if not dirname.endswith('/'):
|
||||
dirname += '/'
|
||||
e.append((dirname, -1))
|
||||
for t in os.ilistdir(dirname):
|
||||
if dirname.startswith('/'):
|
||||
dirname = dirname[1:]
|
||||
if t[1] == 0x4000:
|
||||
e.extend(ls(dirname + t[0] + '/'))
|
||||
else:
|
||||
e.append((dirname + t[0], os.stat(dirname + t[0])[6]))
|
||||
else:
|
||||
e.append((dirname, s[6]))
|
||||
return sorted(e)
|
||||
|
||||
try:
|
||||
s = 1
|
||||
r = ls('{dirname}')
|
||||
x = ''
|
||||
except Exception as e:
|
||||
s = 0
|
||||
r = [('{dirname}', -2)]
|
||||
x = str(e)
|
||||
|
||||
print(json.dumps([s, r, x]))
|
||||
""")
|
||||
|
||||
return json.loads(output)
|
||||
|
||||
def rm(self, filename: str) -> Tuple[int, List, str]:
|
||||
if not filename.startswith('/'):
|
||||
filename = '/' + filename
|
||||
|
||||
output = self.terminal(f"""
|
||||
try:
|
||||
import os
|
||||
import json
|
||||
except ImportError:
|
||||
import uos as os
|
||||
import ujson as json
|
||||
|
||||
def ls(dirname):
|
||||
e = []
|
||||
if not dirname.endswith('/'):
|
||||
dirname += '/'
|
||||
for t in os.ilistdir(dirname):
|
||||
if dirname.startswith('/'):
|
||||
dirname = dirname[1:]
|
||||
if t[1] == 0x4000:
|
||||
e.append((dirname + t[0] + '/', -1))
|
||||
e.extend(ls(dirname + t[0] + '/'))
|
||||
else:
|
||||
e.append((dirname + t[0], os.stat(dirname + t[0])[6]))
|
||||
return sorted(e, reverse=True)
|
||||
|
||||
def rm(filename):
|
||||
r = []
|
||||
if os.stat(filename)[0] == 0x4000:
|
||||
e = ls(filename)
|
||||
if filename != '/':
|
||||
e.append((filename.strip('/'), -1))
|
||||
for f, s in e:
|
||||
if not s == -1:
|
||||
try:
|
||||
os.remove(f)
|
||||
r.append((f, 1, ''))
|
||||
except Exception as e:
|
||||
r.append((f, 0, str(e)))
|
||||
for f, s in e:
|
||||
if s == -1:
|
||||
try:
|
||||
os.rmdir(f)
|
||||
r.append((f, 1, ''))
|
||||
except Exception as e:
|
||||
r.append((f, 0, str(e)))
|
||||
else:
|
||||
try:
|
||||
os.remove(filename)
|
||||
r.append((filename, 1, ''))
|
||||
except Exception as e:
|
||||
r.append((filename, 0, str(e)))
|
||||
return r
|
||||
|
||||
try:
|
||||
s = 1
|
||||
r = rm('{filename}')
|
||||
x = ''
|
||||
except Exception as e:
|
||||
s = 0
|
||||
r = [('{filename}', 0, str(e))]
|
||||
x = str(e)
|
||||
|
||||
print(json.dumps([s, r, x]))
|
||||
""")
|
||||
|
||||
return json.loads(output)
|
||||
|
||||
def mkdir(self, dirname: str) -> List:
|
||||
if not dirname.startswith('/'):
|
||||
dirname = '/' + dirname
|
||||
|
||||
if dirname.endswith('/'):
|
||||
dirname = dirname[:-1]
|
||||
|
||||
output = self.terminal(f"""
|
||||
try:
|
||||
import os
|
||||
import json
|
||||
except ImportError:
|
||||
import uos as os
|
||||
import ujson as json
|
||||
|
||||
r = []
|
||||
d = []
|
||||
for zd in str('{dirname}').split('/'):
|
||||
if not zd:
|
||||
continue
|
||||
d.append(zd)
|
||||
zd = '/'.join(d)
|
||||
try:
|
||||
os.mkdir(zd)
|
||||
r.append(('/' + zd, 1))
|
||||
except Exception as e:
|
||||
if str(e).find('EEXIST'):
|
||||
r.append(('/' + zd, 1))
|
||||
else:
|
||||
r.append(('/' + zd, 0, str(e)))
|
||||
|
||||
print(json.dumps(r))
|
||||
""")
|
||||
|
||||
return json.loads(output)
|
||||
|
||||
def launch(self, filename: str):
|
||||
try:
|
||||
self.terminal(f"""
|
||||
with open('{filename}', 'r') as fh:
|
||||
exec(fh.read())
|
||||
""", stream_output=True)
|
||||
except Exception as e:
|
||||
print(str(e))
|
||||
|
||||
def terminal(self, command: str, stream_output: bool = False) -> str:
|
||||
with self.pyboard as terminal:
|
||||
try:
|
||||
return terminal(command, stream_output=stream_output)
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
|
||||
class Picowatch(object):
|
||||
|
||||
def __init__(self, pyboard: Pyboard):
|
||||
self.filesystem = FileSystem(pyboard)
|
||||
signal.signal(signal.SIGINT, lambda a, b: self.interupt())
|
||||
|
||||
def interupt(self):
|
||||
self.filesystem.pyboard.send_ctrl_c()
|
||||
|
||||
def terminal(self, command: str):
|
||||
self.filesystem.terminal(command, stream_output=True)
|
||||
|
||||
def internal_ls(self, filepath: str):
|
||||
queue = []
|
||||
|
||||
if filepath == '/':
|
||||
filepath = os.getcwd().replace(os.sep, '/')
|
||||
else:
|
||||
filepath = os.path.join(os.getcwd(), filepath.strip('./').strip('/')).replace(os.sep, '/')
|
||||
|
||||
if os.path.isfile(filepath):
|
||||
queue = [(filepath, os.stat(filepath)[6])]
|
||||
elif os.path.isdir(filepath):
|
||||
def ls(dirname: str):
|
||||
e = []
|
||||
|
||||
if os.path.isdir(dirname):
|
||||
if not dirname.endswith('/'):
|
||||
dirname += '/'
|
||||
|
||||
for filename in os.listdir(dirname):
|
||||
if filename.startswith('.'):
|
||||
continue
|
||||
|
||||
filename = dirname + filename
|
||||
|
||||
if os.path.isdir(filename):
|
||||
e.extend(ls(filename))
|
||||
else:
|
||||
e.append((filename, os.stat(filename)[6]))
|
||||
return e
|
||||
|
||||
queue = ls(filepath)
|
||||
|
||||
return queue
|
||||
|
||||
def listing(self, filepath: str = '/'):
|
||||
filepath = filepath.strip('./')
|
||||
tab = Tab(4, 30, 15, 100)
|
||||
tab.labels('[ ]', 'Filename', 'Size (kb)', 'Exception')
|
||||
status, output, exception = self.filesystem.ls(filepath)
|
||||
|
||||
if status:
|
||||
for filename, size in output:
|
||||
if size == -1:
|
||||
tab.print('[*]', filename, '-')
|
||||
else:
|
||||
tab.print('[*]', filename, f'{round(size / 1024, 2)} kb')
|
||||
else:
|
||||
tab.print('[?]', filepath, '', str(exception))
|
||||
|
||||
def contents(self, filename: str):
|
||||
filename = filename.strip('./').strip('/')
|
||||
content, _ = self.filesystem.get(filename)
|
||||
print('-' * 50)
|
||||
|
||||
for ln in content.decode('utf-8').split('\n'):
|
||||
print(ln)
|
||||
|
||||
def upload(self, filepath: str):
|
||||
tab = Tab(4, 30, 15, 15, 100)
|
||||
tab.labels('[ ]', 'Filename', 'Size (kb)', 'Checksum', 'Exception')
|
||||
|
||||
for source, size in self.internal_ls(filepath):
|
||||
destination = source.replace(os.getcwd().replace(os.sep, '/'), '').strip('/')
|
||||
|
||||
try:
|
||||
tab.print('[↑]', destination, f'{round(size / 1024, 2)} kb', self.filesystem.upload(source, destination))
|
||||
except Exception as e:
|
||||
tab.print('[?]', destination, '', '', str(e))
|
||||
|
||||
def download(self, filepath: str):
|
||||
tab = Tab(4, 30, 15, 100)
|
||||
tab.labels('[ ]', 'Filename', 'Checksum', 'Exception')
|
||||
status, output, exception = self.filesystem.ls(filepath.strip('./').strip('/'))
|
||||
|
||||
if status:
|
||||
for remote, size in output:
|
||||
if size == -1:
|
||||
os.makedirs(os.path.join(os.getcwd(), remote), 777, exist_ok=True)
|
||||
|
||||
for remote, size in output:
|
||||
destination = os.path.join(os.getcwd(), remote).replace(os.sep, '/')
|
||||
|
||||
if not size == -1:
|
||||
try:
|
||||
tab.print('[↓]', remote, self.filesystem.download(remote, destination))
|
||||
except Exception as e:
|
||||
tab.print('[?]', remote, '', str(e))
|
||||
else:
|
||||
tab.print('[?]', filepath, '', exception)
|
||||
|
||||
def delete(self, filepath: str):
|
||||
tab = Tab(4, 30, 100)
|
||||
tab.labels('[ ]', 'Filename', 'Exception')
|
||||
status, output, exception = self.filesystem.rm(filepath.strip('./'))
|
||||
|
||||
if status:
|
||||
for filename, checked, exception in output:
|
||||
if checked:
|
||||
tab.print('[-]', filename)
|
||||
else:
|
||||
tab.print('[?]', filename, exception)
|
||||
else:
|
||||
tab.print('[?]', filepath, exception)
|
||||
|
||||
def compare(self, filepath: str):
|
||||
content, _ = self.filesystem.get(filepath.strip('./').strip('/'))
|
||||
fh, tempname = tempfile.mkstemp()
|
||||
|
||||
try:
|
||||
with os.fdopen(fh, 'wb') as tmp:
|
||||
tmp.write(content)
|
||||
|
||||
subprocess.Popen(f'code --diff "{tempname}" "{os.path.join(os.getcwd(), filepath)}"', stdout=subprocess.PIPE, shell=True).communicate()
|
||||
finally:
|
||||
input('Press Enter to delete temp file.')
|
||||
os.remove(tempname)
|
||||
|
||||
def status(self, return_output: bool = False):
|
||||
changes = []
|
||||
|
||||
try:
|
||||
output = subprocess.check_output(['git', 'status', '-s'], stderr=subprocess.STDOUT)
|
||||
|
||||
for filename in [f.strip() for f in output.decode('utf-8').split('\n')]:
|
||||
if not filename:
|
||||
continue
|
||||
|
||||
status, filename = filename.split(' ')
|
||||
|
||||
if status in ['A', 'M', '??']:
|
||||
changes.append((1, filename))
|
||||
elif status == 'D':
|
||||
changes.append((-1, filename))
|
||||
except Exception as e:
|
||||
try:
|
||||
print(e.output.decode('utf-8').strip())
|
||||
except:
|
||||
print(e.decode('utf-8').strip())
|
||||
finally:
|
||||
if return_output:
|
||||
return changes
|
||||
|
||||
tab = Tab(4, 40)
|
||||
tab.labels('[ ]', 'Filename')
|
||||
|
||||
for status, filename in changes:
|
||||
tab.print('[+]' if status == 1 else '[-]', filename)
|
||||
|
||||
def push(self):
|
||||
tab = Tab(4, 30, 15, 100)
|
||||
tab.labels('[ ]', 'Filename', 'Size (kb)', 'Checksum', 'Exception')
|
||||
changes = self.status(return_output=True)
|
||||
|
||||
for filepath in [filename for status, filename in changes if status == -1]:
|
||||
filepath = filepath.strip('/')
|
||||
status, output, exception = self.filesystem.rm(filepath)
|
||||
|
||||
if status:
|
||||
for filename, checked, exception in output:
|
||||
if checked:
|
||||
tab.print('[-]', filename, '', 'DELETED')
|
||||
else:
|
||||
tab.print('[?]', filename, '', '', exception)
|
||||
else:
|
||||
tab.print('[?]', filepath, '', exception)
|
||||
|
||||
queue = []
|
||||
|
||||
for filepath in [filename for status, filename in changes if status == 1]:
|
||||
queue.extend(self.internal_ls(filepath))
|
||||
|
||||
for source, size in queue:
|
||||
destination = source.replace(os.getcwd().replace(os.sep, '/'), '').strip('/')
|
||||
|
||||
try:
|
||||
tab.print('[↑]', destination, f'{round(size / 1024, 2)} kb', self.filesystem.upload(source, destination))
|
||||
except Exception as e:
|
||||
tab.print('[?]', destination, '', '', str(e))
|
||||
|
||||
print('Pico board up to date.')
|
||||
|
||||
def compile(self, filename: str):
|
||||
_, error = mpy_cross.run(filename, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True).communicate()
|
||||
|
||||
if error:
|
||||
print(error.decode('utf-8'))
|
||||
else:
|
||||
print(f'MicroPython File from "{filename}" created!')
|
||||
|
||||
def test(self, filename: str):
|
||||
with open(os.path.join(os.getcwd(), filename), 'r') as fh:
|
||||
self.filesystem.terminal(fh.read(), stream_output=True)
|
||||
|
||||
def launch(self, filename: str):
|
||||
self.filesystem.launch(filename)
|
||||
|
||||
|
||||
print('Welcome to Picowatch Terminal')
|
||||
print(f'Listening to project: {os.getcwd().replace(os.sep, "/")}/')
|
||||
picowatch = False
|
||||
|
||||
try:
|
||||
env = dotenv_values('.picowatch')
|
||||
picowatch = Picowatch(Pyboard(env["DEVICE"], env["BAUDRATE"]))
|
||||
print(f'Connected automatically to device: {env["DEVICE"]} at a baudrate of: {env["BAUDRATE"]}')
|
||||
except:
|
||||
while not picowatch:
|
||||
print('-' * 50)
|
||||
device = input('Port: ').strip()
|
||||
baudrate = input('Baudrate (115200): ').strip() or 115200
|
||||
|
||||
try:
|
||||
picowatch = Picowatch(Pyboard(device, baudrate))
|
||||
print('-' * 50)
|
||||
print(f'Connected to device: {device} and baudrate: {baudrate}')
|
||||
|
||||
with open(os.path.join(os.getcwd(), '.picowatch'), 'w+') as fh:
|
||||
fh.write(f'DEVICE = "{device}"\n')
|
||||
fh.write(f'BAUDRATE = {baudrate}\n')
|
||||
except Exception as e:
|
||||
print(str(e))
|
||||
|
||||
print('-' * 50)
|
||||
picowatch.interupt()
|
||||
|
||||
while True:
|
||||
try:
|
||||
unmessage = input('>>> ').strip()
|
||||
|
||||
for message in unmessage.split('&'):
|
||||
match message.strip().split(' '):
|
||||
case ['exit']:
|
||||
sys.exit()
|
||||
case ['help']:
|
||||
print('TODO')
|
||||
case ['reboot']:
|
||||
picowatch.terminal('help()')
|
||||
case ['ls' | 'list', *source]:
|
||||
picowatch.listing(source[0] if source else '/')
|
||||
case ['cat' | 'code', source]:
|
||||
picowatch.contents(source)
|
||||
case ['rm' | 'delete', source]:
|
||||
picowatch.delete(source)
|
||||
case ['put' | 'upload', source]:
|
||||
picowatch.upload(source)
|
||||
case ['get' | 'download', source]:
|
||||
picowatch.download(source)
|
||||
case ['diff' | 'compare', filename]:
|
||||
picowatch.compare(filename)
|
||||
case ['status']:
|
||||
picowatch.status(return_output=False)
|
||||
case ['push']:
|
||||
picowatch.push()
|
||||
case ['mpy' | 'compile', filename]:
|
||||
picowatch.compile(filename)
|
||||
case ['install', package_name]:
|
||||
pass
|
||||
case ['test', filename]:
|
||||
picowatch.test(filename)
|
||||
case ['!']:
|
||||
picowatch.test('main.py')
|
||||
case ['!!']:
|
||||
picowatch.launch('main.py')
|
||||
case _:
|
||||
if message.startswith('./'):
|
||||
picowatch.launch(message[2:])
|
||||
elif message:
|
||||
print(f'"{message}" is not recognized.')
|
||||
except Exception as e:
|
||||
print(str(e))
|
||||
Reference in New Issue
Block a user