N2DRC-BBS-mesh/config_init.py

180 lines
6.2 KiB
Python
Raw Permalink Normal View History

2024-06-25 05:50:52 -07:00
import configparser
import time
2024-06-28 13:35:45 -07:00
from typing import Any
import meshtastic.stream_interface
2024-06-25 05:50:52 -07:00
import meshtastic.serial_interface
import meshtastic.tcp_interface
import serial.tools.list_ports
2024-06-28 13:35:45 -07:00
import argparse
2024-06-25 05:50:52 -07:00
2024-06-28 13:35:45 -07:00
def init_cli_parser() -> argparse.Namespace:
"""Function build the CLI parser and parses the arguments.
Returns:
argparse.ArgumentParser: Argparse namespace with processed CLI args
"""
parser = argparse.ArgumentParser(description="Meshtastic BBS system")
parser.add_argument(
"--config", "-c",
action="store",
help="System configuration file",
default=None)
parser.add_argument(
"--interface-type", "-i",
action="store",
choices=['serial', 'tcp'],
help="Node interface type",
default=None)
parser.add_argument(
"--port", "-p",
action="store",
help="Serial port",
default=None)
parser.add_argument(
"--host",
action="store",
help="TCP host address",
default=None)
parser.add_argument(
"--mqtt-topic", '-t',
action="store",
help="MQTT topic to subscribe",
default='meshtastic.receive')
#
# Add extra arguments here
#...
args = parser.parse_args()
return args
def merge_config(system_config:dict[str, Any], args:argparse.Namespace) -> dict[str, Any]:
"""Function merges configuration read from the config file and provided on the CLI.
CLI arguments override values defined in the config file.
system_config argument is mutated by the function.
Args:
system_config (dict[str, Any]): System config dict returned by initialize_config()
args (argparse.Namespace): argparse namespace with parsed CLI args
Returns:
dict[str, Any]: system config dict with merged configurations
"""
if args.interface_type is not None:
system_config['interface_type'] = args.interface_type
if args.port is not None:
system_config['port'] = args.port
if args.host is not None:
system_config['host'] = args.host
return system_config
def initialize_config(config_file: str = None) -> dict[str, Any]:
"""
Function reads and parses system configuration file
2024-06-28 13:35:45 -07:00
Returns a dict with the following entries:
config - parsed config file
interface_type - type of the active interface
hostname - host name for TCP interface
port - serial port name for serial interface
bbs_nodes - list of peer nodes to sync with
Args:
config_file (str, optional): Path to config file. Function reads from './config.ini' if this arg is set to None. Defaults to None.
Returns:
dict: dict with system configuration, ad described above
"""
2024-06-25 05:50:52 -07:00
config = configparser.ConfigParser()
2024-06-28 13:35:45 -07:00
if config_file is None:
config_file = "config.ini"
config.read(config_file)
2024-06-25 05:50:52 -07:00
interface_type = config['interface']['type']
hostname = config['interface'].get('hostname', None)
port = config['interface'].get('port', None)
bbs_nodes = config.get('sync', 'bbs_nodes', fallback='').split(',')
if bbs_nodes == ['']:
bbs_nodes = []
print(f"Configured to sync with the following BBS nodes: {bbs_nodes}")
allowed_nodes = config.get('allow_list', 'allowed_nodes', fallback='').split(',')
if allowed_nodes == ['']:
allowed_nodes = []
print(f"Nodes with Urgent board permissions: {allowed_nodes}")
return {
'config': config,
'interface_type': interface_type,
'hostname': hostname,
'port': port,
'bbs_nodes': bbs_nodes,
'allowed_nodes': allowed_nodes,
'mqtt_topic': 'meshtastic.receive'
}
2024-06-28 13:35:45 -07:00
def get_interface(system_config:dict[str, Any]) -> meshtastic.stream_interface.StreamInterface:
"""
Function opens and returns an instance meshtastic interface of type specified by the configuration
2024-06-28 13:35:45 -07:00
Function creates and returns an instance of a class inheriting from meshtastic.stream_interface.StreamInterface.
The type of the class depends on the type of the interface specified by the system configuration.
For 'serial' interfaces, function returns an instance of meshtastic.serial_interface.SerialInterface,
and for 'tcp' interface, an instance of meshtastic.tcp_interface.TCPInterface.
Args:
system_config (dict[str, Any]): A dict with system configuration. See description of initialize_config() for details.
Raises:
ValueError: Exception raised in the following cases:
- Type of interface not provided in the system config
- Multiple serial ports present in the system, and no port specified in the configuration
- Serial port interface requested, but no ports found in the system
- Hostname not provided for TCP interface
2024-06-25 05:50:52 -07:00
2024-06-28 13:35:45 -07:00
Returns:
meshtastic.stream_interface.StreamInterface: An instance of StreamInterface
"""
2024-06-25 05:50:52 -07:00
while True:
try:
2024-06-28 13:35:45 -07:00
if system_config['interface_type'] == 'serial':
if system_config['port']:
return meshtastic.serial_interface.SerialInterface(system_config['port'])
2024-06-25 05:50:52 -07:00
else:
ports = list(serial.tools.list_ports.comports())
if len(ports) == 1:
return meshtastic.serial_interface.SerialInterface(ports[0].device)
elif len(ports) > 1:
port_list = ', '.join([p.device for p in ports])
raise ValueError(f"Multiple serial ports detected: {port_list}. Specify one with the 'port' argument.")
else:
raise ValueError("No serial ports detected.")
2024-06-28 13:35:45 -07:00
elif system_config['interface_type'] == 'tcp':
if not system_config['hostname']:
2024-06-25 05:50:52 -07:00
raise ValueError("Hostname must be specified for TCP interface")
2024-06-28 13:35:45 -07:00
return meshtastic.tcp_interface.TCPInterface(hostname=system_config['hostname'])
2024-06-25 05:50:52 -07:00
else:
raise ValueError("Invalid interface type specified in config file")
except PermissionError as e:
print(f"PermissionError: {e}. Retrying in 5 seconds...")
time.sleep(5)