Compare commits

...

10 Commits

Author SHA1 Message Date
b51deb6529 Initial commit 2026-03-31 13:26:46 +02:00
Gino D
9f2f6ee462 readline in try/except 2023-01-16 10:44:48 +01:00
Gino D
a78b28d980 Fixed tab witdth 2023-01-12 18:01:04 +01:00
Gino D
b817538199 Remove traceback 2023-01-11 18:10:18 +01:00
Gino D
7647edfee9 Fixed bug 2023-01-11 12:42:00 +01:00
Gino D
ca3e33d854 Bug fixed 2023-01-08 16:00:35 +01:00
Gino D
749bf7431e Optimized boot mode 2023-01-08 15:42:12 +01:00
Gino D
0d49259fcd Options added, fixed wordings and bugs 2023-01-08 14:33:04 +01:00
Gino D
cb88815dd9 Fixed wording... 2023-01-08 03:06:07 +01:00
Gino D
f7a533affd Fixed wording 2023-01-08 03:04:18 +01:00
3 changed files with 1041 additions and 1092 deletions

165
.gitignore vendored
View File

@@ -1,137 +1,50 @@
# Byte-compiled / optimized / DLL files # ── Python ──────────────────────────────────────────────────────────────────
__pycache__/ __pycache__/
*.py[cod] *.py[cod]
*$py.class *.pyo
*.pyd
*.pyc
# C extensions # ── Environnements virtuels ──────────────────────────────────────────────────
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/ venv/
.venv/
env/
ENV/ ENV/
env.bak/ Pipfile.lock
venv.bak/
# Spyder project settings # ── Secrets ───────────────────────────────────────────────────────────────────
.spyderproject .env
.spyproject .env.*
secret.env
secrets.py
*.secret
credentials.json
token.json
# Rope project settings # ── Logs ─────────────────────────────────────────────────────────────────────
.ropeproject *.log
logs/
# mkdocs documentation # ── IDE ───────────────────────────────────────────────────────────────────────
/site .vscode/
.idea/
*.swp
.DS_Store
Thumbs.db
# mypy # ── Tests / Coverage ─────────────────────────────────────────────────────────
.mypy_cache/ .coverage
.dmypy.json htmlcov/
dmypy.json .pytest_cache/
# Pyre type checker # ── Build ─────────────────────────────────────────────────────────────────────
.pyre/ dist/
build/
*.egg-info/
# pytype static type analyzer # ── Modèles IA (trop lourds pour git) ───────────────────────────────────────
.pytype/ *.gguf
*.bin
# Cython debug symbols *.safetensors
cython_debug/ models/
weights/

View File

@@ -1,47 +1,30 @@
#!/usr/bin/env python #!/usr/bin/env python
# Primadiag SAS - Paris
# Author: Gino D.
# Copyright (c) 2023 Primadiag
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE
import os import os
import sys import sys
import time import time
import json import json
import signal import signal
import readline
import tempfile import tempfile
import binascii import binascii
import textwrap import textwrap
import mpy_cross import mpy_cross
import subprocess import subprocess
try:
import readline
except:
pass
from serial import Serial from serial import Serial
from dotenv import dotenv_values from dotenv import dotenv_values
from typing import List, Optional, Tuple, Union from typing import List, Optional, Tuple, Union
BUFFER_SIZE: int = 256 BUFFER_SIZE: int = 512
LISTENING_TO: str = os.getcwd() LISTENING_TO: str = os.getcwd()
class Tab(): class Tab():
bordered: bool = False bordered: bool = False
colsize: List = [] colsize: List = []
@@ -62,18 +45,18 @@ class Tab():
if isinstance(align_to_right, (list, tuple)): if isinstance(align_to_right, (list, tuple)):
self.align_to_right(*align_to_right) self.align_to_right(*align_to_right)
def head(self, *texts: str): def head(self, *labels: str, border_style: str = '='):
self.border('=') self.border(border_style)
self.line(*texts, bordered=False) self.line(*labels, bordered=False)
self.border('=') self.border(border_style)
def border(self, text: str = '-'): def border(self, style: str = '-'):
sys.stdout.write(text * sum(self.colsize) + '\n') print(style * sum(self.colsize) + '\r')
def align_to_right(self, *column_num: int): def align_to_right(self, *column_num: int):
self.text_to_right = [i - 1 for i in column_num] self.text_to_right = [i - 1 for i in column_num]
def line(self, *texts: str, bordered: bool = None): def line(self, *texts: str, bordered: bool = None, border_style: str = '-'):
lines = {} lines = {}
max_lines = 0 max_lines = 0
@@ -114,13 +97,13 @@ class Tab():
else: else:
output += ' ' * width output += ' ' * width
sys.stdout.write(output + '\n') print(output + '\r')
if bordered == False: if bordered == False:
return return
if bordered or self.bordered: if bordered or self.bordered:
self.border('-') self.border(border_style)
class Telnet: class Telnet:
@@ -191,6 +174,7 @@ class Telnet:
class Pyboard(object): class Pyboard(object):
i: int = 0 i: int = 0
boot_status: bool = False
serial: Union[Serial, Telnet] serial: Union[Serial, Telnet]
def __init__(self, device: str, baudrate: int = 115200, login: str = 'micro', password: str = 'python'): def __init__(self, device: str, baudrate: int = 115200, login: str = 'micro', password: str = 'python'):
@@ -231,6 +215,7 @@ class Pyboard(object):
self.serial.write(b'\x02') self.serial.write(b'\x02')
def send_ctrl_c(self) -> bytes: def send_ctrl_c(self) -> bytes:
self.boot_status = False
# Ctrl-C cancels any input, or interrupts the currently running code. # Ctrl-C cancels any input, or interrupts the currently running code.
for _ in range(0, 2): for _ in range(0, 2):
self.serial.write(b'\x03') self.serial.write(b'\x03')
@@ -251,14 +236,14 @@ class Pyboard(object):
def read_until(self, delimiter: bytes, stream_output: bool = False, show_status: bool = False) -> Optional[bytes]: def read_until(self, delimiter: bytes, stream_output: bool = False, show_status: bool = False) -> Optional[bytes]:
data = b'' data = b''
timeout = 0 timeout = 300
max_len = len(delimiter) max_len = len(delimiter)
while not timeout >= 1000: while timeout > 0:
if data.endswith(delimiter): if data.endswith(delimiter):
return data return data
elif self.serial.inWaiting() > 0: elif self.serial.inWaiting() > 0:
timeout = 0 timeout = 300
data += self.serial.read(1) data += self.serial.read(1)
if stream_output: if stream_output:
@@ -268,8 +253,8 @@ class Pyboard(object):
elif show_status: elif show_status:
self.transfer_status() self.transfer_status()
else: else:
timeout += 1 timeout -= 1
time.sleep(0.0001) time.sleep(0.01)
def boot(self): def boot(self):
data = b'' data = b''
@@ -280,13 +265,21 @@ class Pyboard(object):
if not self.read_until(self.send_ctrl_d()): if not self.read_until(self.send_ctrl_d()):
raise Exception('REPL: could not soft reboot') raise Exception('REPL: could not soft reboot')
while not data.endswith(b'\r\nMicroPython v') and not data.endswith(b'.\r\n>>>'): output_status = True
if self.serial.inWaiting(): self.boot_status = True
data = data[-15:] + self.serial.read(1)
sys.stdout.buffer.write(data[-1:])
sys.stdout.buffer.flush()
sys.stdout.buffer.write(b'\r' + b' ' * 20 + b'\r') while self.boot_status and output_status:
if self.serial.inWaiting():
sys.stdout.buffer.write((data := data[-15:] + self.serial.read(1))[-1:])
sys.stdout.buffer.flush()
output_status = not data.endswith(b'\r\nMicroPython v') and not data.endswith(b'.\r\n>>>')
if not output_status:
sys.stdout.buffer.write(b'\r<<< Program terminated\r\n')
elif not self.boot_status:
sys.stdout.buffer.write(b'\r\n<<< Program terminated with <KeyboardInterrupt Exception>\r\n')
self.boot_status = False
def __enter__(self): def __enter__(self):
self.send_ctrl_c() self.send_ctrl_c()
@@ -316,6 +309,7 @@ class Pyboard(object):
self.transfer_status() self.transfer_status()
self.serial.write(command[i: min(i + BUFFER_SIZE, len(command))]) self.serial.write(command[i: min(i + BUFFER_SIZE, len(command))])
# time.sleep(0.0001)
if not self.read_until(self.send_ok()): if not self.read_until(self.send_ok()):
raise Exception('Terminal: could not execute command') raise Exception('Terminal: could not execute command')
@@ -323,11 +317,13 @@ class Pyboard(object):
data = self.read_until(b'\x04', stream_output=stream_output, show_status=not stream_output) data = self.read_until(b'\x04', stream_output=stream_output, show_status=not stream_output)
if not data: if not data:
self.send_ctrl_d()
raise Exception('Terminal: timeout waiting for first EOF reception') raise Exception('Terminal: timeout waiting for first EOF reception')
exception = self.read_until(b'\x04') exception = self.read_until(b'\x04')
if not exception: if not exception:
self.send_ctrl_d()
raise Exception('Terminal: timeout waiting for second EOF reception') raise Exception('Terminal: timeout waiting for second EOF reception')
data, exception = (data[:-1].decode('utf-8'), exception[:-1].decode('utf-8')) data, exception = (data[:-1].decode('utf-8'), exception[:-1].decode('utf-8'))
@@ -340,9 +336,9 @@ class Pyboard(object):
for call in traceback[1][:-2].split('\\r\\n'): for call in traceback[1][:-2].split('\\r\\n'):
reason += f'{call}\n' reason += f'{call}\n'
raise Exception('' + reason.strip()) raise Exception('\r' + reason.strip())
else: else:
raise Exception(exception) raise Exception('\r' + exception)
return data.strip() return data.strip()
@@ -609,11 +605,34 @@ class Picowatch(object):
def __init__(self, pyboard: Pyboard): def __init__(self, pyboard: Pyboard):
self.filesystem = FileSystem(pyboard) self.filesystem = FileSystem(pyboard)
signal.signal(signal.SIGINT, lambda sig, frame: self.interrupt()) signal.signal(signal.SIGINT, lambda signum, frame: self.interrupt())
def boot(self): def boot(self):
self.filesystem.pyboard.boot() self.filesystem.pyboard.boot()
def system(self):
self.terminal("""
import os
import gc
print('Board:', os.uname().machine, os.uname().version)
print('Free memory:', round(gc.mem_free() / 1024, 2), 'kb.')
""")
def reset(self):
self.terminal("""
import machine
machine.soft_reset()
""")
self.interrupt()
def flash(self):
self.terminal("""
import machine
machine.bootloader()
""")
self.interrupt()
def interrupt(self): def interrupt(self):
self.filesystem.pyboard.send_ctrl_c() self.filesystem.pyboard.send_ctrl_c()
self.filesystem.pyboard.until_nothing_in_waiting() self.filesystem.pyboard.until_nothing_in_waiting()
@@ -657,7 +676,7 @@ class Picowatch(object):
def listing(self, filepath: str = '/'): def listing(self, filepath: str = '/'):
filepath = filepath.strip('./') filepath = filepath.strip('./')
tab = Tab(4, 30, 15, nb_columns=4) tab = Tab(4, 50, 15, nb_columns=4)
tab.head('[ ]', 'Filename', 'Size (kb)', 'Exception') tab.head('[ ]', 'Filename', 'Size (kb)', 'Exception')
status, output, exception = self.filesystem.ls(filepath) status, output, exception = self.filesystem.ls(filepath)
@@ -678,8 +697,9 @@ class Picowatch(object):
for ln in content.decode('utf-8').split('\n'): for ln in content.decode('utf-8').split('\n'):
print(ln) print(ln)
def upload(self, filepath: str): def upload(self, filepath: str):
tab = Tab(4, 30, 15, 15, nb_columns=5) tab = Tab(4, 50, 15, 15, nb_columns=5)
tab.head('[ ]', 'Filename', 'Size (kb)', 'Checksum', 'Exception') tab.head('[ ]', 'Filename', 'Size (kb)', 'Checksum', 'Exception')
for source, size in self.internal_ls(filepath): for source, size in self.internal_ls(filepath):
@@ -691,7 +711,7 @@ class Picowatch(object):
tab.line('[?]', destination, '', '', str(e)) tab.line('[?]', destination, '', '', str(e))
def download(self, filepath: str): def download(self, filepath: str):
tab = Tab(4, 30, 15, nb_columns=4) tab = Tab(4, 50, 15, nb_columns=4)
tab.head('[ ]', 'Filename', 'Checksum', 'Exception') tab.head('[ ]', 'Filename', 'Checksum', 'Exception')
status, output, exception = self.filesystem.ls(filepath.strip('./').strip('/')) status, output, exception = self.filesystem.ls(filepath.strip('./').strip('/'))
@@ -712,7 +732,7 @@ class Picowatch(object):
tab.line('[?]', filepath, '', exception) tab.line('[?]', filepath, '', exception)
def delete(self, filepath: str): def delete(self, filepath: str):
tab = Tab(4, 30, nb_columns=3) tab = Tab(4, 50, nb_columns=3)
tab.head('[ ]', 'Filename', 'Exception') tab.head('[ ]', 'Filename', 'Exception')
status, output, exception = self.filesystem.rm(filepath.strip('./')) status, output, exception = self.filesystem.rm(filepath.strip('./'))
@@ -855,6 +875,12 @@ class Picowatch(object):
else: else:
print(f'Python file "{filename}" compile to .mpy') print(f'Python file "{filename}" compile to .mpy')
def install(self, package_name: str):
self.terminal(f"""
import mip
mip.install('{package_name}')
""")
def test(self, filename: str): def test(self, filename: str):
with open(os.path.join(LISTENING_TO, filename), 'r') as fh: with open(os.path.join(LISTENING_TO, filename), 'r') as fh:
self.filesystem.terminal(fh.read(), stream_output=True) self.filesystem.terminal(fh.read(), stream_output=True)
@@ -899,29 +925,35 @@ while True:
try: try:
match message.strip().split(' '): match message.strip().split(' '):
case ['?' | 'help']: case ['?' | 'help']:
print('These are common Picowatch keywords:\n') print('These are common Picowatch keywords:')
tab = Tab(12, 10, 32, nb_columns=4) tab = Tab(12, 10, 32, nb_columns=4)
tab.head('Keywords', 'Shortcut', 'Parameters', 'Description') tab.head('Keywdords', 'Shortcut', 'Parameters', 'Description')
tab.line('help', '?', '', 'Show keywords and their description') tab.line('help', '?', '', 'Show keywords and their description.')
tab.line('boot', '.', '', 'Do a soft reset and run main.py (if exists) in REPL mode') tab.line('modules', '??', '', 'List availables modules on the Pyboard.')
tab.line('test', '!', '<file> (default: main.py)', 'Run a script from PC on the Pyboard and print out the results in raw-REPL mode') tab.line('boot', '.', '', 'Perform a soft reset and run main.py (if exists) in REPL mode.')
tab.line('run', '!!', '<file> (default: main.py)', 'Run a script on the Pyboard and print out the results in raw-REPL mode') tab.line('test', '!', '[<file>] (default: main.py)', 'Run a script from PC on the Pyboard and print out the results in raw-REPL mode.')
tab.line('ctrl + C', '', '', 'Interrupts the currently running code in REPL or raw-REPL mode') tab.line('run', '!!', '[<file>] (default: main.py)', 'Run a script on the Pyboard and print out the results in raw-REPL mode.')
tab.line('ctrl + D', 'exit', '', 'Exit Picowatch Terminal') tab.line('ctrl + C', '', '', 'Interrupts the currently running code in REPL or raw-REPL mode.')
tab.line('uname', 'os', '', 'Pyboard name and version') tab.line('ctrl + D', 'exit', '', 'Exit Picowatch Terminal.')
tab.line('edit', 'vim', '<file> [use vim]', 'Edit specified file from the PC (vim or vscode is required)') tab.line('ctrl + Z', 'exit', '', 'Same as ctrl + D, Exit Picowatch Terminal.')
tab.line('scan', 'ls', '<path> (default: /)', 'List information about the file(s) on the Pyboard') tab.line('system', 'os', '', 'Pyboard name and version.')
tab.line('source', 'cat', '<file>', 'Concatenate source code to standard output') tab.line('reset', 'rs', '', 'Perform a soft reset from the REPL.')
tab.line('delete', 'rm', '<path>', 'Delete file or directory contents on the Pyboard') tab.line('flash', 'fl', '', 'Perform a flash disk from the REPL.')
tab.line('upload', 'put', '<path>', 'Upload file or directory contents from the PC to the Pyboard') tab.line('scan', 'ls', '[<path>] (default: /)', 'List information about the file(s) on the Pyboard.')
tab.line('edit', 'vim', '<file> [<use vim>]', 'Edit specified file from the PC (vim or vscode is required).')
tab.line('source', 'cat', '<file>', 'Concatenate source code to standard output.')
tab.line('delete', 'rm', '<path>', 'Delete file or directory contents on the Pyboard.')
tab.line('upload', 'put', '<path>', 'Upload file or directory contents from the PC to the Pyboard.')
tab.line('download', 'get', '<path>', 'Download file or directory contents from the Pyboard to the PC. Warning: this may cause file corruption.') tab.line('download', 'get', '<path>', 'Download file or directory contents from the Pyboard to the PC. Warning: this may cause file corruption.')
tab.line('compare', 'diff', '<file> [use vim]', 'Compare source code from PC with source code from the Pyboard (vim or vscode is required)') tab.line('compare', 'diff', '<file> [<use vim>]', 'Compare source code from PC with source code from the Pyboard (vim or vscode is required).')
tab.line('compile', 'mpy', '<python file>', 'Compile source file to mpy file') tab.line('compile', 'mpy', '<python file>', 'Compile source file to mpy file.')
tab.line('install', 'mip', '<package name>', 'Install a package lib') tab.line('install', 'mip', '<package name>', 'Install packages from micropython-lib and from third-party sites (including GitHub) - *Network-capable boards only.')
tab.line('status', 'mod', '', 'Show the working tree status (Git is required)') tab.line('status', 'mod', '', 'Show the working tree status (Git is required).')
tab.line('commit', 'sync', '<message> (default: "")', 'Synchronize Pyboard along with associated commit(s) (Git is required)') tab.line('commit', 'sync', '[<message>] (default: "")', 'Synchronize Pyboard along with associated commit(s) (Git is required).')
case ['os' | 'uname']: case ['??' | 'modules']:
picowatch.terminal('import os;print(os.uname().machine, os.uname().version)') picowatch.terminal(f'help("modules")')
case ['os' | 'system']:
picowatch.system()
case ['ls' | 'scan', *file]: case ['ls' | 'scan', *file]:
picowatch.listing(file[0] if file else '/') picowatch.listing(file[0] if file else '/')
case ['vim' | 'edit', file, *use_vim]: case ['vim' | 'edit', file, *use_vim]:
@@ -936,7 +968,7 @@ while True:
case ['put' | 'upload', path]: case ['put' | 'upload', path]:
picowatch.upload(path) picowatch.upload(path)
case ['get' | 'download', path]: case ['get' | 'download', path]:
picowatch.download(file) picowatch.download(path)
case ['diff' | 'compare', file, *use_vim]: case ['diff' | 'compare', file, *use_vim]:
picowatch.compare(file, use_vim=len(use_vim) > 0) picowatch.compare(file, use_vim=len(use_vim) > 0)
case ['mod' | 'status']: case ['mod' | 'status']:
@@ -946,13 +978,17 @@ while True:
case ['mpy' | 'compile', file]: case ['mpy' | 'compile', file]:
picowatch.compile(file) picowatch.compile(file)
case ['mip' | 'install', package_name]: case ['mip' | 'install', package_name]:
print('# TODO') picowatch.install(package_name)
case ['!' | 'test', *file]: case ['!' | 'test', *file]:
picowatch.test(file[0] if file else 'main.py') picowatch.test(file[0] if file else 'main.py')
case ['!!' | 'run', *file]: case ['!!' | 'run', *file]:
picowatch.launch(file[0] if file else 'main.py') picowatch.launch(file[0] if file else 'main.py')
case ['.'| 'boot']: case ['.'| 'boot']:
picowatch.boot() picowatch.boot()
case ['rs' | 'reset']:
picowatch.reset()
case ['fl' | 'flash']:
picowatch.flash()
case ['exit']: case ['exit']:
sys.exit('Picowatch Terminal disconnected!') sys.exit('Picowatch Terminal disconnected!')
case _: case _: