#!/usr/bin/python3 ############################################################################ ## ## ir-aprsisd ## ## This is a KML-feed to APRS-IS forwarding daemon. It was written ## for Garmin/DeLorme inReach devices, but might work elsewhere too. ## It polls a KML feed in which it expects to find a point in rougly ## the form that the inReach online feeds use, with attendant course ## and altitude data. It transfers each new point found there to ## APRS-IS. ## ## K0SIN ## ########################################################################### import aprslib import urllib.request import xml.dom.minidom import time, calendar, math, re import platform, sys import configparser from optparse import OptionParser #Name of our configuration file. cf = "ir-aprsisd.cfg" #Command-line options op = OptionParser() op.add_option("-C","--config",action="store",type="string",dest="config",help="Load the named configuration file.") op.add_option("-s","--ssid",action="store",type="string",dest="ssid",help="APRS SSID") op.add_option("-p","--pass",action="store",type="int",dest="passwd",help="APRS-IS password") op.add_option("--port",action="store",type="int",dest="port",help="APRS-IS port") op.add_option("-u","--user",action="store",type="string",dest="user",help="inReach username") op.add_option("-P","--irpass",action="store",type="string",dest="irpass",help="inReach feed password") op.add_option("-U","--url",action="store",type="string",dest="url",help="URL for KML feed") op.add_option("-i","--imei",action="store",type="int",dest="imei",help="This instance should watch *only* for the single IMEI given in this option. For a more complicated mapping, use the Device section in the configuration file.") op.add_option("-c","--comment",action="store",type="string",dest="comment",help="APRS-IS location beacon comment text") op.add_option("-d","--delay",action="store",type="int",dest="delay",help="Delay between polls of KML feed") (opts,args) = op.parse_args() #This needs to be defined before the load below happens. #Load a configuration file, and try to validate that it is loaded. def loadConfig(cfile): global conf conf = configparser.ConfigParser() if conf.read(cfile): if conf.has_section('General'): return True return False #Handle loading of the configuration file first. #Other command-line options may override things defined in the file. if opts.config: if not loadConfig(opts.config): print("Can't load configuration: " + opts.config) sys.exit(1) else: #Default behavior if no file specified. if not loadConfig("/etc/" + cf): if not loadConfig(cf): print("Can't find default configuration: " + cf) sys.exit(1) #Allow command-line arguments to override the config file. if opts.ssid: conf['APRS']['SSID'] = opts.ssid if opts.passwd: conf['APRS']['Password'] = str(opts.passwd) if opts.port: conf['APRS']['Port'] = str(opts.port) if opts.user: conf['inReach']['User'] = opts.user if opts.irpass: conf['inReach']['Password'] = opts.irpass if opts.url: conf['inReach']['URL'] = opts.url if opts.comment: conf['APRS']['Comment'] = opts.comment if opts.delay: conf['General']['Period'] = opts.delay #Handle the special case where we've specified an IMEI on the command-line if opts.imei: conf['Devices'] = {} conf['Devices'][conf['APRS']['SSID']] = str(opts.imei) #Get the number and call from from the default SSID #If we have multiple devices with non-specific ID mapping, we'll make it up # from this. (Call,SSNum) = re.search('(\w+)-(\d+)$',conf['APRS']['SSID']).groups() SSNum = int(SSNum) #The beginning of our APRS packets should contain a source, path, # q construct, and gateway address. We'll reuse the same SSID as a gate. # This is the part between the source and the gate. ARPreamble = ''.join(['>APRS,','TCPIP*,','qAS,',conf['APRS']['SSID']]) NAME = "iR-APRSISD" REV = "0.2" #Start with the current time. We use this to keep track of whether we've # already sent a position, so if we cut it off when the program starts, we # won't (re-)send old stale entry from inReach #lastUpdate = calendar.timegm(time.gmtime()) #Set up the handler for HTTP connections if conf.has_option('inReach','Password'): passman = urllib.request.HTTPPasswordMgrWithDefaultRealm() passman.add_password(None, conf['inReach']['URL'],'',conf['inReach']['Password']) httpauth = urllib.request.HTTPBasicAuthHandler(passman) http = urllib.request.build_opener(httpauth) else: http = urllib.request.build_opener() urllib.request.install_opener(http) #Handle connection to APRS-IS def reconnect(): global AIS while True: AIS = aprslib.IS(conf['APRS']['SSID'],passwd=conf['APRS']['Password'],port=conf['APRS']['Port']) try: AIS.connect() break except Exception as e: print("Trouble connecting to APRS-IS server.") print(e) #We'll store the IMEI to ssid mapping here. SSIDList = {} #We'll store timestamps here lastUpdate = {} #Load any preconfigured IMEI mappings if conf.has_section('Devices'): for device in conf['Devices'].keys(): SSIDList[conf['Devices'][device]] = device.upper() #Get an SSID def getSSID(IMEI): global lastUpdate, SSIDList, SSNum, Call #If we have a Devices section, the SSID list is static. if IMEI not in SSIDList: if conf.has_section('Devices'): return None SSIDList[IMEI] = ''.join([Call,"-",str(SSNum)]) SSNum = SSNum + 1 #Add a timestamp on the first call # This prevents us from redelivering an old message, which can stay # in the feed. if IMEI not in lastUpdate: lastUpdate[IMEI] = calendar.timegm(time.gmtime()) return SSIDList[IMEI] #APRS-IS Setup AIS = None reconnect() #Generate an APRS packet from a Placemark def sendAPRS(Placemark): #We only care about the Placemarks with the ExtendedData sections. if not Placemark.getElementsByTagName('ExtendedData'): return None #Now process the extended data into something easier to handle. extended = {} for xd in Placemark.getElementsByTagName('ExtendedData')[0].getElementsByTagName('Data'): if not xd.getElementsByTagName('value')[0].firstChild == None: extended[xd.getAttribute('name')] = xd.getElementsByTagName('value')[0].firstChild.nodeValue try: #Here is the position vector latitude = extended['Latitude'] longitude = extended['Longitude'] elevation = extended['Elevation'] velocity = extended['Velocity'] course = extended['Course'] uttime = extended['Time UTC'] device = extended['Device Type'] IMEI = extended['IMEI'] SSID = getSSID(IMEI) #If there's no valid SSID mapping, we don't process this one. if not SSID: return None #Some time conversions. First the time struct. ts = time.strptime(''.join([uttime," UTC"]),"%m/%d/%Y %I:%M:%S %p %Z") #Unix epoch time for local record-keeping. etime = calendar.timegm(ts) #MonthDayHoursMinutes_z for APRS-IS packet aprstime = time.strftime("%d%H%Mz",ts) #Skip this one if it's old. if lastUpdate[IMEI] > etime: time.sleep(conf.getfloat('General','Period')) return None #If we have SMS data, add that. if 'Text' in extended: comment = extended['Text'] else: #Default comment comment = conf['APRS']['Comment'] comment = ''.join([" ", comment, " : ", NAME, " v", REV, " : ", platform.system(), " on ", platform.machine(), " : ", device]) #In the format we're using, APRS comments can be 36 characters # ... but APRS-IS doesn't seem to care, so leave this off. #comment = comment[:36] #Latitude conversion #We start with the truncated degrees, filled to two places # then add fractional minutes two 2x2 digits. latitude = float(latitude) aprslat = str(abs(math.trunc(latitude))).zfill(2) aprslat += '{:.02f}'.format(round((abs(latitude)-abs(math.trunc(latitude)))*60,2)).zfill(5) if latitude > 0: aprslat += "N" else: aprslat += "S" #Longitude next. longitude = float(longitude) aprslong = str(abs(math.trunc(longitude))).zfill(3) aprslong += '{:.02f}'.format(round((abs(longitude)-abs(math.trunc(longitude)))*60,2)).zfill(5) if longitude > 0: aprslong += "E" else: aprslong += "W" #Altitude needs to be in feet above sea-level # what we get instead is a string with a number of meters # at the beginning. aprsalt = re.sub(r'^(\d+\.?\d+)\s*m.*',r'\1',elevation) #We need integer feet, six digits aprsalt = str(round(float(aprsalt) * 3.2808399)).zfill(6) aprsalt = "/A=" + aprsalt[0:6] #Aprs position aprspos = ''.join([aprslat,conf['APRS']['Separator'],aprslong,conf['APRS']['Symbol']]) #Course is already in degrees, so we just need to reformat it. aprscourse = str(round(float(re.sub(r'(\d+\.?\d+).*',r'\1', course)))).zfill(3) #Speed is in km/h. aprsspeed = float(re.sub(r'(\d+\.?\d+).*',r'\1',velocity))*0.53995681 aprsspeed = str(min(round(aprsspeed),999)).zfill(3) #Here's the course/speed combination aprscs = ''.join([aprscourse,'/',aprsspeed]) except Exception as e: print("Some relevant position vector data is missing or malformatted in this record.") print(e) #Pass? Maybe... pass try: #According to spec, one valid format for location beacons is # @092345z/4903.50N/07201.75W>088/036 # with a comment on the end that can include altitude and other # information. aprsPacket = ''.join([getSSID(IMEI),ARPreamble,':@',aprstime,aprspos,aprscs,aprsalt,comment]) #This will throw an exception if the packet is somehow wrong. aprslib.parse(aprsPacket) AIS.sendall(aprsPacket) lastUpdate[IMEI] = etime return aprsPacket except Exception as e: print("Could not send the update: ") if 'aprsPacket' in locals(): print(aprsPacket) print(e) print("Attempting reconnection, just in case.") reconnect() pass while True: try: KML = http.open(conf['inReach']['URL']).read() except Exception as e: print(''.join(["Error reading URL: ", conf['inReach']['URL']])) continue try: data = xml.dom.minidom.parseString(KML).documentElement except Exception as e: print("Can't process KML feed on this pass.") continue #The first placemark will have the expanded current location information. for PM in data.getElementsByTagName('Placemark'): sendAPRS(PM) time.sleep(conf.getfloat('General','Period'))