Save work in progress...

This commit is contained in:
Gino D
2022-12-31 03:36:55 +01:00
parent 29629aac58
commit e7ecde0d0c

View File

@@ -26,89 +26,97 @@ import os
import sys
import time
import json
import serial
import signal
import binascii
import textwrap
from serial import Serial
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple, Union
BUFFER_SIZE: int = 256
# class TelnetToSerial:
# def __init__(self, ip, user, password, read_timeout=None):
# import telnetlib
# self.tn = telnetlib.Telnet(ip, timeout=15)
# self.read_timeout = read_timeout
# if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout):
# self.tn.write(bytes(user, 'ascii') + b"\r\n")
class Telnet:
# if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout):
# # needed because of internal implementation details of the telnet server
# time.sleep(0.2)
# self.tn.write(bytes(password, 'ascii') + b"\r\n")
def __init__(self, IP: str, login: str, password: str):
import telnetlib
from collections import deque
# if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout):
# # login succesful
# from collections import deque
# self.fifo = deque()
# return
self.tn = telnetlib.Telnet(IP, timeout=15)
self.fifo = deque()
# raise Exception('Failed to establish a telnet connection with the board')
if b'Login as:' in self.tn.read_until(b'Login as:'):
self.tn.write(bytes(login, 'ascii') + b'\r\n')
# def __del__(self):
# self.close()
if b'Password:' in self.tn.read_until(b'Password:'):
time.sleep(0.2)
self.tn.write(bytes(password, 'ascii') + b'\r\n')
# def close(self):
# try:
# self.tn.close()
# except:
# # the telnet object might not exist yet, so ignore this one
# pass
if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.'):
return
# def read(self, size=1):
# while len(self.fifo) < size:
# timeout_count = 0
# data = self.tn.read_eager()
# if len(data):
# self.fifo.extend(data)
# timeout_count = 0
# else:
# time.sleep(0.25)
# if self.read_timeout is not None and timeout_count > 4 * self.read_timeout:
# break
# timeout_count += 1
raise Exception('Failed to establish a Telnet connection with the board')
# data = b''
# while len(data) < size and len(self.fifo) > 0:
# data += bytes([self.fifo.popleft()])
# return data
def __del__(self):
self.close()
# def write(self, data):
# self.tn.write(data)
# return len(data)
def close(self):
try:
self.tn.close()
except:
pass
# def in_waiting(self):
# n_waiting = len(self.fifo)
# if not n_waiting:
# data = self.tn.read_eager()
# self.fifo.extend(data)
# return len(data)
# else:
# return n_waiting
def read(self, size: int = 1) -> bytes:
timeout = 0
while len(self.fifo) < size and not timeout >= 8:
timeout = 0
data = self.tn.read_eager()
if len(data):
self.fifo.extend(data)
timeout = 0
else:
timeout += 1
time.sleep(0.25)
data = b''
while len(data) < size and len(self.fifo) > 0:
data += bytes([self.fifo.popleft()])
return data
def write(self, data: bytes):
self.tn.write(data)
return len(data)
def inWaiting(self) -> int:
n_waiting = len(self.fifo)
if not n_waiting:
data = self.tn.read_eager()
self.fifo.extend(data)
n_waiting = len(data)
return n_waiting
class Pyboard(object):
serial: serial.Serial
serial: Union[Serial, Telnet]
def __init__(self, device: str, baudrate: int = 115200, login: str = '', password: str = ''):
is_telnet = device and device.count('.') == 3
def __init__(self, device: str, baudrate: int = 115200):
for _ in range(0, 3):
try:
self.serial = serial.Serial(device, baudrate=baudrate, interCharTimeout=1)
if is_telnet:
self.serial = Telnet(device, login, password)
else:
self.serial = Serial(device, baudrate=baudrate, interCharTimeout=1)
break
except:
time.sleep(1)
@@ -530,12 +538,12 @@ class Picowatch(object):
def upload(self, filepath: str):
local = filepath
if local.startswith('/'):
local = '.' + local
if not local.startswith('./'):
local = './' + local
if not local.startswith('.'):
local = '.' + local
queue = []
if os.path.isdir(local):