#!/usr/bin/env python #**************************************************************************** # weatherdata.py, provides non-GUI base classes for weather info # # FlyWay, a VFR/IFR Route Planner for Pilots # Copyright (C) 2002, Douglas W. Bell # # This is free software; you can redistribute it and/or modify it under the # terms of the GNU General Public License, Version 2. This program is # distributed in the hope that it will be useful, but WITTHOUT ANY WARRANTY. #***************************************************************************** from waypoint import Waypoint, Airport from route import RouteLeg import sys, math, urllib, telnetlib, socket, time _urlDict = {('METAR', 0): 'observations/metar/stations/K', \ ('METAR', 1): 'observations/metar/decoded/K', \ ('TAF', 0): 'forecasts/taf/stations/K', \ ('TAF', 1): 'forecasts/taf/stations/K'} _urlPrefix = 'http://weather.noaa.gov/pub/data/' _tnTimeout = 10 _timeZone = time.tzname[0][0] if _timeZone not in ['E', 'C', 'M', 'P']: _timeZone = 'E' _tnCmdStart = {0: '#WABS\\\\', 1: '#PABS\\n\\%s\\\\' % _timeZone} _tnCmdEnd = '\\20\\+30\\0030\\n\\n\n' class WeatherData: """Calculates and stores nearby airports to get weather info""" def __init__(self, route): self.route = route self.airportList = [] self.fdList = [] self.tn = None def findAirports(self, distance, fdDistance=0): """Update list of airports within distance of route legs distance is used for normal list, fdDistance (greater) is used to add more winds aloft locations""" if not self.route.wpList: self.airportList = [] return maxLat = max([wp.latitudeValue() for wp in self.route.wpList]) minLat = min([wp.latitudeValue() for wp in self.route.wpList]) maxLng = max([abs(wp.longitudeValue()) for wp in self.route.wpList]) minLng = min([abs(wp.longitudeValue()) for wp in self.route.wpList]) lngFactor = (math.cos(minLat * math.pi/180) \ + math.cos(maxLat * math.pi/180)) / 2.0 latDelta = (fdDistance and fdDistance or distance) * 1.3 / \ RouteLeg.nmLat maxLat += latDelta minLat -= latDelta lngDelta = latDelta / lngFactor maxLng += lngDelta minLng -= lngDelta roughList = [wp for wp in Airport().getAllWaypoints() \ if wp.dataList[Waypoint.aptWthr] and \ minLat <= wp.latitudeValue() <= maxLat and \ minLng <= abs(wp.longitudeValue()) <= maxLng] self.airportList = [wp for wp in roughList \ if self.route.isWithinDistance(wp, distance)] if fdDistance: self.fdList = [wp for wp in roughList if \ 'FD' in wp.dataList[Waypoint.aptWthr].split() and \ self.route.isWithinDistance(wp, fdDistance)] self.airportList = self.route.sortWpByRoute(self.airportList) def numOfType(self, type='METAR'): """Return num of wp in list matching given type""" wpList = (type == 'FD' and self.fdList) and self.fdList or \ self.airportList return len([wp for wp in wpList if \ type in wp.dataList[Waypoint.aptWthr].split()]) def fetchHttpReports(self, outputFunction, type='METAR', plain=0): """Call outputFunction with text of all weather reports individually""" results = [] for wp in self.airportList: if type in wp.dataList[Waypoint.aptWthr].split(): outputFunction(fetchHttpReport(wp, type, plain)) def openTelnet(self, accessCode, password, outputFunction): """Start telnet session""" try: self.tn = telnetlib.Telnet('direct.duats.com') self.tn.read_until('last name: ', _tnTimeout * 2) self.tn.write('#%s\n' % accessCode) if not self.tn.read_until('password: ', _tnTimeout).\ endswith('password: '): raise socket.error(0, 'Invalid DUAT telnet access code') self.tn.write('%s\n' % password) if not self.tn.read_until('OLAB: ', _tnTimeout).endswith('OLAB: '): raise socket.error(0, 'Invalid DUAT telnet password') except (socket.error, socket.gaierror, socket.herror), value: outputFunction('\nUnable to create DUAT telnet connection:\n%s' % \ value[1]) self.tn = None except EOFError, value: outputFunction('\nUnable to create DUAT telnet connection:\n%s' % \ value) self.tn = None def fetchTelnetReports(self, outputFunction, type='METAR', plain=0): """Call outputFunction with text of all weather reports individually""" if not self.tn: return results = [] wpList = (type == 'FD' and self.fdList) and self.fdList or \ self.airportList try: for wp in wpList: if type in wp.dataList[Waypoint.aptWthr].split(): outputFunction(fetchTelnetReport(self.tn, wp, type, plain)) except (socket.error, socket.gaierror, socket.herror), value: outputFunction('\nProblems with DUAT telnet connection:\n%s' % \ value[1]) except EOFError, value: outputFunction('\nProblems with DUAT telnet connection:\n%s' % \ value) def closeTelnet(self): """Quietly close connection to telnet""" if self.tn: try: self.tn.write('#BYE\\Y\\\n') except (socket.error, socket.gaierror, socket.herror, EOFError): pass #### Utility Functions #### def fetchHttpReport(wp, type='METAR', plain=0): """Return text of weather report for wp from noaa http""" results = [] results.append(wp.description()) try: f = urllib.urlopen('%s%s%s.TXT' % \ (_urlPrefix, _urlDict[(type, plain)], wp.ident())) lines = [line.rstrip() for line in f.readlines()] lines = filter(None, lines) if not lines or (lines[0].startswith('') or \ lines[-1].endswith('')): raise IOError # web returned an error page results.extend(lines[1:]) # cut off header f.close() except IOError: results.append('No %s data available for %s' % (type, wp.ident())) return '\n'.join(results) def fetchTelnetReport(tn, wp, type='METAR', plain=0): """Return text of weather report for wp from duats telnet""" results = [] results.append(wp.description()) tn.write('%s%s\\%s%s' % (_tnCmdStart[plain], type, wp.ident(), _tnCmdEnd)) lines = [text.rstrip() for text in \ tn.read_until('OLAB: ', _tnTimeout).split('\n')] if not lines[-1].endswith('OLAB:'): raise socket.error(0, 'No DUAT prompt') while lines and not lines[0].startswith('****'): # find *** at start del lines[0] if not lines: # no **** found for data results.append('No %s data available for %s' % (type, wp.ident())) else: lines = lines[1:-1] # remove **** line and prompt while lines and not lines[-1]: del lines[-1] # remove trailing blank lines if type == 'METAR': if plain: searchText = [wp.description().split(' ', 1)[0], \ wp.dataList[Waypoint.city].split(' ', 1)[0]] searchText = filter(None, searchText) else: searchText = ['METAR K%s' % wp.ident()] match = 0 # remove duplicates, last is newest for num, line in zip(xrange(sys.maxint), lines): for text in searchText: if line.startswith(text): match = num lines = lines[match:] results.extend(lines) return '\n'.join(results)