Compare commits

...

10 Commits

Author SHA1 Message Date
b51deb6529 Initial commit 2026-03-31 13:26:46 +02:00
Gino D
9f2f6ee462 readline in try/except 2023-01-16 10:44:48 +01:00
Gino D
a78b28d980 Fixed tab witdth 2023-01-12 18:01:04 +01:00
Gino D
b817538199 Remove traceback 2023-01-11 18:10:18 +01:00
Gino D
7647edfee9 Fixed bug 2023-01-11 12:42:00 +01:00
Gino D
ca3e33d854 Bug fixed 2023-01-08 16:00:35 +01:00
Gino D
749bf7431e Optimized boot mode 2023-01-08 15:42:12 +01:00
Gino D
0d49259fcd Options added, fixed wordings and bugs 2023-01-08 14:33:04 +01:00
Gino D
cb88815dd9 Fixed wording... 2023-01-08 03:06:07 +01:00
Gino D
f7a533affd Fixed wording 2023-01-08 03:04:18 +01:00
3 changed files with 1041 additions and 1092 deletions

165
.gitignore vendored
View File

@@ -1,137 +1,50 @@
# Byte-compiled / optimized / DLL files
# ── Python ──────────────────────────────────────────────────────────────────
__pycache__/
*.py[cod]
*$py.class
*.pyo
*.pyd
*.pyc
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
# ── Environnements virtuels ──────────────────────────────────────────────────
venv/
.venv/
env/
ENV/
env.bak/
venv.bak/
Pipfile.lock
# Spyder project settings
.spyderproject
.spyproject
# ── Secrets ───────────────────────────────────────────────────────────────────
.env
.env.*
secret.env
secrets.py
*.secret
credentials.json
token.json
# Rope project settings
.ropeproject
# ── Logs ─────────────────────────────────────────────────────────────────────
*.log
logs/
# mkdocs documentation
/site
# ── IDE ───────────────────────────────────────────────────────────────────────
.vscode/
.idea/
*.swp
.DS_Store
Thumbs.db
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# ── Tests / Coverage ─────────────────────────────────────────────────────────
.coverage
htmlcov/
.pytest_cache/
# Pyre type checker
.pyre/
# ── Build ─────────────────────────────────────────────────────────────────────
dist/
build/
*.egg-info/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# ── Modèles IA (trop lourds pour git) ───────────────────────────────────────
*.gguf
*.bin
*.safetensors
models/
weights/

View File

@@ -1,47 +1,30 @@
#!/usr/bin/env python
# Primadiag SAS - Paris
# Author: Gino D.
# Copyright (c) 2023 Primadiag
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE
import os
import sys
import time
import json
import signal
import readline
import tempfile
import binascii
import textwrap
import mpy_cross
import subprocess
try:
import readline
except:
pass
from serial import Serial
from dotenv import dotenv_values
from typing import List, Optional, Tuple, Union
BUFFER_SIZE: int = 256
BUFFER_SIZE: int = 512
LISTENING_TO: str = os.getcwd()
class Tab():
bordered: bool = False
colsize: List = []
@@ -62,18 +45,18 @@ class Tab():
if isinstance(align_to_right, (list, tuple)):
self.align_to_right(*align_to_right)
def head(self, *texts: str):
self.border('=')
self.line(*texts, bordered=False)
self.border('=')
def head(self, *labels: str, border_style: str = '='):
self.border(border_style)
self.line(*labels, bordered=False)
self.border(border_style)
def border(self, text: str = '-'):
sys.stdout.write(text * sum(self.colsize) + '\n')
def border(self, style: str = '-'):
print(style * sum(self.colsize) + '\r')
def align_to_right(self, *column_num: int):
self.text_to_right = [i - 1 for i in column_num]
def line(self, *texts: str, bordered: bool = None):
def line(self, *texts: str, bordered: bool = None, border_style: str = '-'):
lines = {}
max_lines = 0
@@ -114,13 +97,13 @@ class Tab():
else:
output += ' ' * width
sys.stdout.write(output + '\n')
print(output + '\r')
if bordered == False:
return
if bordered or self.bordered:
self.border('-')
self.border(border_style)
class Telnet:
@@ -191,6 +174,7 @@ class Telnet:
class Pyboard(object):
i: int = 0
boot_status: bool = False
serial: Union[Serial, Telnet]
def __init__(self, device: str, baudrate: int = 115200, login: str = 'micro', password: str = 'python'):
@@ -231,6 +215,7 @@ class Pyboard(object):
self.serial.write(b'\x02')
def send_ctrl_c(self) -> bytes:
self.boot_status = False
# Ctrl-C cancels any input, or interrupts the currently running code.
for _ in range(0, 2):
self.serial.write(b'\x03')
@@ -251,14 +236,14 @@ class Pyboard(object):
def read_until(self, delimiter: bytes, stream_output: bool = False, show_status: bool = False) -> Optional[bytes]:
data = b''
timeout = 0
timeout = 300
max_len = len(delimiter)
while not timeout >= 1000:
while timeout > 0:
if data.endswith(delimiter):
return data
elif self.serial.inWaiting() > 0:
timeout = 0
timeout = 300
data += self.serial.read(1)
if stream_output:
@@ -268,8 +253,8 @@ class Pyboard(object):
elif show_status:
self.transfer_status()
else:
timeout += 1
time.sleep(0.0001)
timeout -= 1
time.sleep(0.01)
def boot(self):
data = b''
@@ -280,13 +265,21 @@ class Pyboard(object):
if not self.read_until(self.send_ctrl_d()):
raise Exception('REPL: could not soft reboot')
while not data.endswith(b'\r\nMicroPython v') and not data.endswith(b'.\r\n>>>'):
if self.serial.inWaiting():
data = data[-15:] + self.serial.read(1)
sys.stdout.buffer.write(data[-1:])
sys.stdout.buffer.flush()
output_status = True
self.boot_status = True
sys.stdout.buffer.write(b'\r' + b' ' * 20 + b'\r')
while self.boot_status and output_status:
if self.serial.inWaiting():
sys.stdout.buffer.write((data := data[-15:] + self.serial.read(1))[-1:])
sys.stdout.buffer.flush()
output_status = not data.endswith(b'\r\nMicroPython v') and not data.endswith(b'.\r\n>>>')
if not output_status:
sys.stdout.buffer.write(b'\r<<< Program terminated\r\n')
elif not self.boot_status:
sys.stdout.buffer.write(b'\r\n<<< Program terminated with <KeyboardInterrupt Exception>\r\n')
self.boot_status = False
def __enter__(self):
self.send_ctrl_c()
@@ -316,6 +309,7 @@ class Pyboard(object):
self.transfer_status()
self.serial.write(command[i: min(i + BUFFER_SIZE, len(command))])
# time.sleep(0.0001)
if not self.read_until(self.send_ok()):
raise Exception('Terminal: could not execute command')
@@ -323,11 +317,13 @@ class Pyboard(object):
data = self.read_until(b'\x04', stream_output=stream_output, show_status=not stream_output)
if not data:
self.send_ctrl_d()
raise Exception('Terminal: timeout waiting for first EOF reception')
exception = self.read_until(b'\x04')
if not exception:
self.send_ctrl_d()
raise Exception('Terminal: timeout waiting for second EOF reception')
data, exception = (data[:-1].decode('utf-8'), exception[:-1].decode('utf-8'))
@@ -340,9 +336,9 @@ class Pyboard(object):
for call in traceback[1][:-2].split('\\r\\n'):
reason += f'{call}\n'
raise Exception('' + reason.strip())
raise Exception('\r' + reason.strip())
else:
raise Exception(exception)
raise Exception('\r' + exception)
return data.strip()
@@ -609,11 +605,34 @@ class Picowatch(object):
def __init__(self, pyboard: Pyboard):
self.filesystem = FileSystem(pyboard)
signal.signal(signal.SIGINT, lambda sig, frame: self.interrupt())
signal.signal(signal.SIGINT, lambda signum, frame: self.interrupt())
def boot(self):
self.filesystem.pyboard.boot()
def system(self):
self.terminal("""
import os
import gc
print('Board:', os.uname().machine, os.uname().version)
print('Free memory:', round(gc.mem_free() / 1024, 2), 'kb.')
""")
def reset(self):
self.terminal("""
import machine
machine.soft_reset()
""")
self.interrupt()
def flash(self):
self.terminal("""
import machine
machine.bootloader()
""")
self.interrupt()
def interrupt(self):
self.filesystem.pyboard.send_ctrl_c()
self.filesystem.pyboard.until_nothing_in_waiting()
@@ -657,7 +676,7 @@ class Picowatch(object):
def listing(self, filepath: str = '/'):
filepath = filepath.strip('./')
tab = Tab(4, 30, 15, nb_columns=4)
tab = Tab(4, 50, 15, nb_columns=4)
tab.head('[ ]', 'Filename', 'Size (kb)', 'Exception')
status, output, exception = self.filesystem.ls(filepath)
@@ -678,8 +697,9 @@ class Picowatch(object):
for ln in content.decode('utf-8').split('\n'):
print(ln)
def upload(self, filepath: str):
tab = Tab(4, 30, 15, 15, nb_columns=5)
tab = Tab(4, 50, 15, 15, nb_columns=5)
tab.head('[ ]', 'Filename', 'Size (kb)', 'Checksum', 'Exception')
for source, size in self.internal_ls(filepath):
@@ -691,7 +711,7 @@ class Picowatch(object):
tab.line('[?]', destination, '', '', str(e))
def download(self, filepath: str):
tab = Tab(4, 30, 15, nb_columns=4)
tab = Tab(4, 50, 15, nb_columns=4)
tab.head('[ ]', 'Filename', 'Checksum', 'Exception')
status, output, exception = self.filesystem.ls(filepath.strip('./').strip('/'))
@@ -712,7 +732,7 @@ class Picowatch(object):
tab.line('[?]', filepath, '', exception)
def delete(self, filepath: str):
tab = Tab(4, 30, nb_columns=3)
tab = Tab(4, 50, nb_columns=3)
tab.head('[ ]', 'Filename', 'Exception')
status, output, exception = self.filesystem.rm(filepath.strip('./'))
@@ -855,6 +875,12 @@ class Picowatch(object):
else:
print(f'Python file "{filename}" compile to .mpy')
def install(self, package_name: str):
self.terminal(f"""
import mip
mip.install('{package_name}')
""")
def test(self, filename: str):
with open(os.path.join(LISTENING_TO, filename), 'r') as fh:
self.filesystem.terminal(fh.read(), stream_output=True)
@@ -899,29 +925,35 @@ while True:
try:
match message.strip().split(' '):
case ['?' | 'help']:
print('These are common Picowatch keywords:\n')
print('These are common Picowatch keywords:')
tab = Tab(12, 10, 32, nb_columns=4)
tab.head('Keywords', 'Shortcut', 'Parameters', 'Description')
tab.line('help', '?', '', 'Show keywords and their description')
tab.line('boot', '.', '', 'Do a soft reset and run main.py (if exists) in REPL mode')
tab.line('test', '!', '<file> (default: main.py)', 'Run a script from PC on the Pyboard and print out the results in raw-REPL mode')
tab.line('run', '!!', '<file> (default: main.py)', 'Run a script on the Pyboard and print out the results in raw-REPL mode')
tab.line('ctrl + C', '', '', 'Interrupts the currently running code in REPL or raw-REPL mode')
tab.line('ctrl + D', 'exit', '', 'Exit Picowatch Terminal')
tab.line('uname', 'os', '', 'Pyboard name and version')
tab.line('edit', 'vim', '<file> [use vim]', 'Edit specified file from the PC (vim or vscode is required)')
tab.line('scan', 'ls', '<path> (default: /)', 'List information about the file(s) on the Pyboard')
tab.line('source', 'cat', '<file>', 'Concatenate source code to standard output')
tab.line('delete', 'rm', '<path>', 'Delete file or directory contents on the Pyboard')
tab.line('upload', 'put', '<path>', 'Upload file or directory contents from the PC to the Pyboard')
tab.head('Keywdords', 'Shortcut', 'Parameters', 'Description')
tab.line('help', '?', '', 'Show keywords and their description.')
tab.line('modules', '??', '', 'List availables modules on the Pyboard.')
tab.line('boot', '.', '', 'Perform a soft reset and run main.py (if exists) in REPL mode.')
tab.line('test', '!', '[<file>] (default: main.py)', 'Run a script from PC on the Pyboard and print out the results in raw-REPL mode.')
tab.line('run', '!!', '[<file>] (default: main.py)', 'Run a script on the Pyboard and print out the results in raw-REPL mode.')
tab.line('ctrl + C', '', '', 'Interrupts the currently running code in REPL or raw-REPL mode.')
tab.line('ctrl + D', 'exit', '', 'Exit Picowatch Terminal.')
tab.line('ctrl + Z', 'exit', '', 'Same as ctrl + D, Exit Picowatch Terminal.')
tab.line('system', 'os', '', 'Pyboard name and version.')
tab.line('reset', 'rs', '', 'Perform a soft reset from the REPL.')
tab.line('flash', 'fl', '', 'Perform a flash disk from the REPL.')
tab.line('scan', 'ls', '[<path>] (default: /)', 'List information about the file(s) on the Pyboard.')
tab.line('edit', 'vim', '<file> [<use vim>]', 'Edit specified file from the PC (vim or vscode is required).')
tab.line('source', 'cat', '<file>', 'Concatenate source code to standard output.')
tab.line('delete', 'rm', '<path>', 'Delete file or directory contents on the Pyboard.')
tab.line('upload', 'put', '<path>', 'Upload file or directory contents from the PC to the Pyboard.')
tab.line('download', 'get', '<path>', 'Download file or directory contents from the Pyboard to the PC. Warning: this may cause file corruption.')
tab.line('compare', 'diff', '<file> [use vim]', 'Compare source code from PC with source code from the Pyboard (vim or vscode is required)')
tab.line('compile', 'mpy', '<python file>', 'Compile source file to mpy file')
tab.line('install', 'mip', '<package name>', 'Install a package lib')
tab.line('status', 'mod', '', 'Show the working tree status (Git is required)')
tab.line('commit', 'sync', '<message> (default: "")', 'Synchronize Pyboard along with associated commit(s) (Git is required)')
case ['os' | 'uname']:
picowatch.terminal('import os;print(os.uname().machine, os.uname().version)')
tab.line('compare', 'diff', '<file> [<use vim>]', 'Compare source code from PC with source code from the Pyboard (vim or vscode is required).')
tab.line('compile', 'mpy', '<python file>', 'Compile source file to mpy file.')
tab.line('install', 'mip', '<package name>', 'Install packages from micropython-lib and from third-party sites (including GitHub) - *Network-capable boards only.')
tab.line('status', 'mod', '', 'Show the working tree status (Git is required).')
tab.line('commit', 'sync', '[<message>] (default: "")', 'Synchronize Pyboard along with associated commit(s) (Git is required).')
case ['??' | 'modules']:
picowatch.terminal(f'help("modules")')
case ['os' | 'system']:
picowatch.system()
case ['ls' | 'scan', *file]:
picowatch.listing(file[0] if file else '/')
case ['vim' | 'edit', file, *use_vim]:
@@ -936,7 +968,7 @@ while True:
case ['put' | 'upload', path]:
picowatch.upload(path)
case ['get' | 'download', path]:
picowatch.download(file)
picowatch.download(path)
case ['diff' | 'compare', file, *use_vim]:
picowatch.compare(file, use_vim=len(use_vim) > 0)
case ['mod' | 'status']:
@@ -946,13 +978,17 @@ while True:
case ['mpy' | 'compile', file]:
picowatch.compile(file)
case ['mip' | 'install', package_name]:
print('# TODO')
picowatch.install(package_name)
case ['!' | 'test', *file]:
picowatch.test(file[0] if file else 'main.py')
case ['!!' | 'run', *file]:
picowatch.launch(file[0] if file else 'main.py')
case ['.'| 'boot']:
picowatch.boot()
case ['rs' | 'reset']:
picowatch.reset()
case ['fl' | 'flash']:
picowatch.flash()
case ['exit']:
sys.exit('Picowatch Terminal disconnected!')
case _: