mirror of
https://github.com/kemenril/iR-APRSISD.git
synced 2024-11-09 23:24:07 -08:00
342 lines
12 KiB
Python
Executable file
342 lines
12 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
############################################################################
|
|
##
|
|
## ir-aprsisd
|
|
##
|
|
## This is a KML-feed to APRS-IS forwarding daemon. It was written
|
|
## for Garmin/DeLorme inReach devices, but might work elsewhere too.
|
|
## It polls a KML feed in which it expects to find a point in rougly
|
|
## the form that the inReach online feeds use, with attendant course
|
|
## and altitude data. It transfers each new point found there to
|
|
## APRS-IS.
|
|
##
|
|
## K0SIN
|
|
##
|
|
###########################################################################
|
|
|
|
|
|
import aprslib
|
|
import urllib.request
|
|
import xml.dom.minidom
|
|
import time, calendar, math, re
|
|
import platform, sys, signal
|
|
import configparser
|
|
from optparse import OptionParser
|
|
|
|
|
|
#Name of our configuration file.
|
|
cf = "ir-aprsisd.cfg"
|
|
|
|
#Command-line options
|
|
op = OptionParser()
|
|
|
|
op.add_option("-C","--config",action="store",type="string",dest="config",help="Load the named configuration file.")
|
|
op.add_option("-s","--ssid",action="store",type="string",dest="ssid",help="APRS SSID")
|
|
op.add_option("-p","--pass",action="store",type="int",dest="passwd",help="APRS-IS password")
|
|
op.add_option("--port",action="store",type="int",dest="port",help="APRS-IS port")
|
|
op.add_option("-u","--user",action="store",type="string",dest="user",help="inReach username")
|
|
op.add_option("-P","--irpass",action="store",type="string",dest="irpass",help="inReach feed password")
|
|
op.add_option("-U","--url",action="store",type="string",dest="url",help="URL for KML feed")
|
|
op.add_option("-i","--imei",action="store",type="int",dest="imei",help="This instance should watch *only* for the single IMEI given in this option. For a more complicated mapping, use the Device section in the configuration file.")
|
|
op.add_option("-c","--comment",action="store",type="string",dest="comment",help="APRS-IS location beacon comment text")
|
|
op.add_option("-d","--delay",action="store",type="int",dest="delay",help="Delay between polls of KML feed")
|
|
op.add_option("--genpass",action="store_true",dest="genpass",help="Generate the correct passcode for the SSID given in the configuration, or on the command line, print it, and exit.")
|
|
(opts,args) = op.parse_args()
|
|
|
|
#Handle term and int signals
|
|
def trapexit(_signo,_stack_frame):
|
|
print()
|
|
print("Exiting.")
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGTERM,trapexit)
|
|
signal.signal(signal.SIGINT,trapexit)
|
|
|
|
#This needs to be defined before the load below happens.
|
|
#Load a configuration file, and try to validate that it is loaded.
|
|
def loadConfig(cfile):
|
|
global conf
|
|
conf = configparser.ConfigParser()
|
|
if conf.read(cfile):
|
|
if conf.has_section('General'):
|
|
return True
|
|
return False
|
|
|
|
|
|
#Handle loading of the configuration file first.
|
|
#Other command-line options may override things defined in the file.
|
|
if opts.config:
|
|
if not loadConfig(opts.config):
|
|
print("Can't load configuration: " + opts.config)
|
|
sys.exit(1)
|
|
else: #Default behavior if no file specified.
|
|
if not loadConfig("/etc/" + cf):
|
|
if not loadConfig(cf):
|
|
print("Can't find default configuration: " + cf)
|
|
sys.exit(1)
|
|
|
|
#Allow command-line arguments to override the config file.
|
|
if opts.ssid:
|
|
conf['APRS']['SSID'] = opts.ssid
|
|
if opts.passwd:
|
|
conf['APRS']['Password'] = str(opts.passwd)
|
|
if opts.port:
|
|
conf['APRS']['Port'] = str(opts.port)
|
|
if opts.user:
|
|
conf['inReach']['User'] = opts.user
|
|
if opts.irpass:
|
|
conf['inReach']['Password'] = opts.irpass
|
|
if opts.url:
|
|
conf['inReach']['URL'] = opts.url
|
|
if opts.comment:
|
|
conf['APRS']['Comment'] = opts.comment
|
|
if opts.delay:
|
|
conf['General']['Period'] = opts.delay
|
|
|
|
#SSID should be standardized to upper-case.
|
|
conf['APRS']['SSID'] = conf['APRS']['SSID'].upper()
|
|
|
|
#Running in passcode generator mode.
|
|
if opts.genpass:
|
|
print("Using SSID: " + conf['APRS']['SSID'])
|
|
print("The passcode is: " + str(aprslib.passcode(conf['APRS']['SSID'])))
|
|
print()
|
|
sys.exit(0)
|
|
|
|
|
|
#Handle the special case where we've specified an IMEI on the command-line
|
|
if opts.imei:
|
|
conf['Devices'] = {}
|
|
conf['Devices'][conf['APRS']['SSID']] = str(opts.imei)
|
|
|
|
#Get the number and call from from the default SSID
|
|
#If we have multiple devices with non-specific ID mapping, we'll make it up
|
|
# from this.
|
|
(Call,SSNum) = re.search('(\w+)-(\d+)$',conf['APRS']['SSID']).groups()
|
|
SSNum = int(SSNum)
|
|
|
|
#The beginning of our APRS packets should contain a source, path,
|
|
# q construct, and gateway address. We'll reuse the same SSID as a gate.
|
|
# This is the part between the source and the gate.
|
|
ARPreamble = ''.join(['>APRS,','TCPIP*,','qAS,',conf['APRS']['SSID']])
|
|
|
|
NAME = "iR-APRSISD"
|
|
REV = "0.3"
|
|
|
|
#Set up the handler for HTTP connections
|
|
if conf.has_option('inReach','Password'):
|
|
passman = urllib.request.HTTPPasswordMgrWithDefaultRealm()
|
|
passman.add_password(None, conf['inReach']['URL'],'',conf['inReach']['Password'])
|
|
httpauth = urllib.request.HTTPBasicAuthHandler(passman)
|
|
http = urllib.request.build_opener(httpauth)
|
|
else:
|
|
http = urllib.request.build_opener()
|
|
urllib.request.install_opener(http)
|
|
|
|
#Handle connection to APRS-IS
|
|
def reconnect():
|
|
global AIS
|
|
while True:
|
|
AIS = aprslib.IS(conf['APRS']['SSID'],passwd=conf['APRS']['Password'],port=conf['APRS']['Port'])
|
|
try:
|
|
AIS.connect()
|
|
break
|
|
except Exception as e:
|
|
print("Trouble connecting to APRS-IS server.")
|
|
print(e)
|
|
time.sleep(3)
|
|
continue
|
|
|
|
#We'll store the device to ssid mapping here.
|
|
SSIDList = {}
|
|
#We'll store timestamps here
|
|
lastUpdate = {}
|
|
|
|
#Load any preconfigured mappings
|
|
if conf.has_section('Devices'):
|
|
for device in conf['Devices'].keys():
|
|
SSIDList[conf['Devices'][device]] = device.upper()
|
|
|
|
|
|
#Get an SSID
|
|
#An apocryphal concern is that this will happily generate an SSID for None,
|
|
# or whatever else might get passed along to it.
|
|
# There's a record in the usual set of inReach Placemarks where this happens,
|
|
# but because the record has no extended data, no packets are ever generated
|
|
# for it. Such a record is generally not interesting and on the end of the list,
|
|
# so there has been no reason to explicitly skip it.
|
|
def getSSID(DID):
|
|
global lastUpdate, SSIDList, SSNum, Call
|
|
#If we have a Devices section, the SSID list is static.
|
|
if DID not in SSIDList:
|
|
if conf.has_section('Devices'):
|
|
return None
|
|
SSIDList[DID] = ''.join([Call,"-",str(SSNum)])
|
|
SSNum = SSNum + 1
|
|
#Add a timestamp on the first call
|
|
# This prevents us from redelivering an old message, which can stay
|
|
# in the feed.
|
|
if DID not in lastUpdate:
|
|
lastUpdate[DID] = calendar.timegm(time.gmtime())
|
|
return SSIDList[DID]
|
|
|
|
|
|
#APRS-IS Setup
|
|
AIS = None
|
|
reconnect()
|
|
|
|
#Get information from a Placemark
|
|
def parsePlacemark(Placemark):
|
|
#We only care about the Placemarks with the ExtendedData sections.
|
|
if not Placemark.getElementsByTagName('ExtendedData'):
|
|
return None
|
|
|
|
#Now process the extended data into something easier to handle.
|
|
extended = {}
|
|
for xd in Placemark.getElementsByTagName('ExtendedData')[0].getElementsByTagName('Data'):
|
|
if not xd.getElementsByTagName('value')[0].firstChild == None:
|
|
extended[xd.getAttribute('name')] = xd.getElementsByTagName('value')[0].firstChild.nodeValue
|
|
|
|
#Make sure the device mapping is good.
|
|
if not 'IMEI' in extended:
|
|
return None
|
|
if not getSSID(extended['IMEI']):
|
|
return None
|
|
|
|
#Now build the position vector
|
|
latitude = None
|
|
longitude = None
|
|
elevation = None
|
|
velocity = None
|
|
course = None
|
|
uttime = None
|
|
device = None
|
|
IMEI = extended['IMEI']
|
|
|
|
if 'Latitude' in extended and 'Longitude' in extended:
|
|
latitude = float(extended['Latitude'])
|
|
longitude = float(extended['Longitude'])
|
|
if 'Elevation' in extended:
|
|
#Altitude needs to be in feet above sea-level
|
|
# what we get instead is a string with a number of meters
|
|
# at the beginning.
|
|
elevation = re.sub(r'^(\d+\.?\d+)\s*m.*',r'\1',extended['Elevation'])
|
|
elevation = float(elevation) * 3.2808399
|
|
if 'Velocity' in extended:
|
|
#Velocity in knots, according to APRS.
|
|
velocity = float(re.sub(r'(\d+\.?\d+).*',r'\1', extended['Velocity']))*0.5399568
|
|
if 'Course' in extended:
|
|
#... and the course is just a heading in degrees.
|
|
course = float(re.sub(r'(\d+\.?\d+).*',r'\1', extended['Course']))
|
|
uttime = time.strptime(extended['Time UTC'] + " UTC","%m/%d/%Y %I:%M:%S %p %Z")
|
|
device = extended['Device Type']
|
|
|
|
#If we have SMS data, add that.
|
|
if 'Text' in extended:
|
|
comment = extended['Text']
|
|
else: #Default comment
|
|
comment = conf['APRS']['Comment']
|
|
|
|
return [device,IMEI,ARPreamble,uttime,latitude,longitude,elevation,course,velocity,comment]
|
|
|
|
#Return a list of all events available. Each one is a list of arguments
|
|
# for the below sendAPRS function
|
|
def getEvents():
|
|
events = []
|
|
try:
|
|
KML = http.open(conf['inReach']['URL']).read()
|
|
except Exception as e:
|
|
print("Error reading URL: " + conf['inReach']['URL'])
|
|
return None
|
|
try:
|
|
data = xml.dom.minidom.parseString(KML).documentElement
|
|
except Exception as e:
|
|
print("Can't process KML feed on this pass.")
|
|
return None
|
|
#The first placemark will have the expanded current location information.
|
|
for PM in data.getElementsByTagName('Placemark'):
|
|
res = parsePlacemark(PM)
|
|
if res:
|
|
events.append(res)
|
|
return events
|
|
|
|
#Compile and send an APRS packet from the given information
|
|
#According to spec, one valid format for location beacons is
|
|
# @092345z/4903.50N/07201.75W>088/036
|
|
# with a comment on the end that can include altitude and other
|
|
# information.
|
|
## Arguments are:
|
|
# Device type, Device ID, APRS Preamble, struct Timestamp, float Latitude
|
|
# float Longitude, float Altitude in feet, int float course in degrees,
|
|
# float speed in knots, comment
|
|
def sendAPRS(device, DevID, ARPreamble, tstamp, lat, long, alt, course, speed, comment):
|
|
global conf
|
|
etime = calendar.timegm(tstamp)
|
|
|
|
#Latitude conversion
|
|
#We start with the truncated degrees, filled to two places
|
|
# then add fractional minutes two 2x2 digits.
|
|
slat = str(abs(math.trunc(lat))).zfill(2)
|
|
slat += '{:.02f}'.format(round((abs(lat)-abs(math.trunc(lat)))*60,2)).zfill(5)
|
|
if lat > 0:
|
|
slat += "N"
|
|
else:
|
|
slat += "S"
|
|
|
|
#Longitude
|
|
slong = str(abs(math.trunc(long))).zfill(3)
|
|
slong += '{:.02f}'.format(round((abs(long)-abs(math.trunc(long)))*60,2)).zfill(5)
|
|
if long > 0:
|
|
slong += "E"
|
|
else:
|
|
slong += "W"
|
|
|
|
|
|
pos = ''.join([slat,conf['APRS']['Separator'],slong,conf['APRS']['Symbol']])
|
|
gateInfo = ''.join([" ", comment, " : ", NAME, " v", REV, " : ", platform.system(), " on ", platform.machine(), " : "])
|
|
|
|
if device:
|
|
gateInfo = gateInfo + device
|
|
|
|
#In the format we're using, APRS comments can be 36 characters
|
|
# ... but APRS-IS doesn't seem to care, so leave this off.
|
|
#comment = comment[:36]
|
|
|
|
aprsPacket = ''.join([getSSID(DevID),ARPreamble, ':@', time.strftime("%d%H%Mz",tstamp), pos])
|
|
|
|
#Check to make sure the update is new:
|
|
if lastUpdate[DevID] > etime:
|
|
return None
|
|
|
|
#In theory a course/speed of 000/000 sholdn't be much different
|
|
# from not reporting one, but also in theory, more space is
|
|
# available for a comment if we don't add the data extension.
|
|
if speed and course:
|
|
aprsPacket = ''.join([aprsPacket,str(round(course)).zfill(3), '/', str(min(round(speed),999)).zfill(3)])
|
|
#Same with altitude:
|
|
if alt:
|
|
#We need six digits of altitude.
|
|
aprsPacket = aprsPacket + "/A=" + str(round(alt)).zfill(6)[0:6]
|
|
# Regardless:
|
|
aprsPacket = aprsPacket + comment + gateInfo
|
|
try:
|
|
aprslib.parse(aprsPacket) # For syntax
|
|
AIS.sendall(aprsPacket)
|
|
except Exception as e:
|
|
print("Could not send APRS packet: ")
|
|
print(aprsPacket)
|
|
print("Attempting reconnect just in case.")
|
|
reconnect()
|
|
pass
|
|
#Last update in UTC
|
|
lastUpdate[DevID] = calendar.timegm(tstamp)
|
|
|
|
#... and here is the main loop.
|
|
while True:
|
|
for packet in getEvents():
|
|
sendAPRS(*packet) #Otherwise a list of sendAPRS args for the next packet to send.
|
|
time.sleep(conf.getfloat('General','Period'))
|
|
|
|
|