Module meshtastic.serial_interface
Serial interface class
Expand source code
""" Serial interface class
"""
import logging
import time
import platform
import serial
import meshtastic.util
from meshtastic.stream_interface import StreamInterface
if platform.system() != 'Windows':
import termios
class SerialInterface(StreamInterface):
"""Interface class for meshtastic devices over a serial link"""
def __init__(self, devPath=None, debugOut=None, noProto=False, connectNow=True):
"""Constructor, opens a connection to a specified serial port, or if unspecified try to
find one Meshtastic device by probing
Keyword Arguments:
devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None})
debugOut {stream} -- If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})
"""
self.noProto = noProto
self.devPath = devPath
if self.devPath is None:
ports = meshtastic.util.findPorts(True)
logging.debug(f"ports:{ports}")
if len(ports) == 0:
meshtastic.util.our_exit("Warning: No Meshtastic devices detected.")
elif len(ports) > 1:
message = "Warning: Multiple serial ports were detected so one serial port must be specified with the '--port'.\n"
message += f" Ports detected:{ports}"
meshtastic.util.our_exit(message)
else:
self.devPath = ports[0]
logging.debug(f"Connecting to {self.devPath}")
# first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR
# see https://github.com/pyserial/pyserial/issues/124
if platform.system() != 'Windows':
with open(self.devPath, encoding='utf8') as f:
attrs = termios.tcgetattr(f)
attrs[2] = attrs[2] & ~termios.HUPCL
termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
f.close()
time.sleep(0.1)
self.stream = serial.Serial(self.devPath, 115200, exclusive=True, timeout=0.5, write_timeout=0)
self.stream.flush()
time.sleep(0.1)
StreamInterface.__init__(self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
def close(self):
"""Close a connection to the device"""
self.stream.flush()
time.sleep(0.1)
self.stream.flush()
time.sleep(0.1)
logging.debug("Closing Serial stream")
StreamInterface.close(self)
Classes
class SerialInterface (devPath=None, debugOut=None, noProto=False, connectNow=True)
-
Interface class for meshtastic devices over a serial link
Constructor, opens a connection to a specified serial port, or if unspecified try to find one Meshtastic device by probing
Keyword Arguments: devPath {string} – A filepath to a device, i.e. /dev/ttyUSB0 (default: {None}) debugOut {stream} – If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None})
Expand source code
class SerialInterface(StreamInterface): """Interface class for meshtastic devices over a serial link""" def __init__(self, devPath=None, debugOut=None, noProto=False, connectNow=True): """Constructor, opens a connection to a specified serial port, or if unspecified try to find one Meshtastic device by probing Keyword Arguments: devPath {string} -- A filepath to a device, i.e. /dev/ttyUSB0 (default: {None}) debugOut {stream} -- If a stream is provided, any debug serial output from the device will be emitted to that stream. (default: {None}) """ self.noProto = noProto self.devPath = devPath if self.devPath is None: ports = meshtastic.util.findPorts(True) logging.debug(f"ports:{ports}") if len(ports) == 0: meshtastic.util.our_exit("Warning: No Meshtastic devices detected.") elif len(ports) > 1: message = "Warning: Multiple serial ports were detected so one serial port must be specified with the '--port'.\n" message += f" Ports detected:{ports}" meshtastic.util.our_exit(message) else: self.devPath = ports[0] logging.debug(f"Connecting to {self.devPath}") # first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR # see https://github.com/pyserial/pyserial/issues/124 if platform.system() != 'Windows': with open(self.devPath, encoding='utf8') as f: attrs = termios.tcgetattr(f) attrs[2] = attrs[2] & ~termios.HUPCL termios.tcsetattr(f, termios.TCSAFLUSH, attrs) f.close() time.sleep(0.1) self.stream = serial.Serial(self.devPath, 115200, exclusive=True, timeout=0.5, write_timeout=0) self.stream.flush() time.sleep(0.1) StreamInterface.__init__(self, debugOut=debugOut, noProto=noProto, connectNow=connectNow) def close(self): """Close a connection to the device""" self.stream.flush() time.sleep(0.1) self.stream.flush() time.sleep(0.1) logging.debug("Closing Serial stream") StreamInterface.close(self)
Ancestors
Inherited members