#!/usr/local/bin/python2.6 # # $HeadURL: https://svn.spodhuis.org/ksvn/websites/trunk/sks.spodhuis.org/wsgi/sks_peers.py $ # $LastChangedDate: 2011-05-20 05:48:31 -0400 (Fri, 20 May 2011) $ # $LastChangedRevision: 285 $ # # Author: Phil Pennock # Copyright 2009-2011; use/modify/copy freely with attribution. # No warranties whatsoever; only use if you're able to assess this code. # Prepare for Python 3.0 from __future__ import print_function """Spider the SKS peering mesh, provide UI. WSGI application to server up a table of peers of this server and span out from there to find more servers. Some extra web interfaces for producing data in useful formats. Main entry point is 'application', if 'main' invoked then can choose to start a standalone webserver. """ __author__ = 'Phil Pennock ' # Credit to Kristian Fiskerstrand of sks-keyservers.net, who provides both the # automatic DNS for a pool and a map of keyservers; when I realised how close # I was to being able to do the same thing, I decided to do so and prove to # myself that I could do it. After I produced a rather ugly dot-generated # map, Kristian was kind enough to point me to the .dot that he generates and # to confirm that we were doing the same thing. He was just generating better # .dot source. # This code grew somewhat organically from a tool which initially just listed # a table of my own peers with notes on key-counts, presence of mutual # peering and the geocoding of their IP, so I could see how diverse my peering # was. It gained a RecursingWalkManager to manage feeding lookups back into # the existing worker pools and then a GraphMaintainer to let me make graphs. # But it was ugly and the distinction between which bits of data we had when # got to an unhappy state. # # So, we now have a SpiderManager, which receives linkage requests. It # maintains all knowledge of what is out there, but does not track # "completion". That is still done by the DataGatherer. import cPickle as pickle import collections import gc import itertools import logging import logging.config import logging.handlers from math import sqrt import numbers import os import Queue as queue import random import re import signal import socket import sys import textwrap # purely for /internalz import threading import time import traceback import urllib import urllib2 import warnings import Cheetah.Template import dns.exception import dns.resolver # dnspython import html5lib import ipaddr import selector # And specific function imports: from paste.request import construct_url # We use python-graph http://code.google.com/p/python-graph/ # Note that release 1.6.0 changed the module name from "graph" to "pygraph". # 1.6.2 moved digraph into classes.digraph submodule # 1.6.2 started raising exceptions for duplicate edges, instead of being a noop # 1.6.3 changed add_edge/has_edge to take a tuple instead of two args. # Really, this is far more pain than keeping the breakage for a major version # release and sufficiently frustrating that more breakages are likely to see me # write a minimal graph implementation just to have a stable API. try: import pygraph import pygraph.readwrite.dot except ImportError: # We could do: # import graph as pygraph # pygraph.digraph = pygraph.Digraph.digraph # pygraph.graph = pygraph.Digraph.graph # but then we need edge_attributes to map to the old get_edge_attributes and # it's not worth downloading the old code to see where in the class hierarchy # it would go and then testing it. If someone else wants it, then they can # ask, but AFAIK I'm the only user. # also: node_attributes to old get_node_attributes raise Exception("You need pygraph >=1.7.0 from http://code.google.com/p/python-graph/") sks_data = None sks_data_lock = None startup_lock = threading.Lock() # see sks_peers_init() for rationale wsgi_selector = None config_file_timestamp = None kCONFIG_FILE = 'sks_peers.conf' # Note that we override this later if DOCUMENT_ROOT is in environ; we can't # reliably do that here because WSGI passes its own environ, instead of # os.environ # These, and localhost, can poke around to get internal data from the server # and trigger immediate rescans, etc kPRIVILEGED_ACCESS = () # These can be the same kHOSTNAME = 'sks.spodhuis.org' kHOSTNAME_PEER = 'sks-peer.spodhuis.org' # For the web UI to show kMAINT_EMAIL = 'webmaster@spodhuis.org' # Where the membership file is, to start kSKS_MEMBERSHIP = '/var/sks/membership' # Two HTML adjustment parameters kHTML_CSS_STYLESHEET = '/style/spodhuis-sks.css' kHTML_FAVICON = '/favicon.ico' # Performance tuning; '20' chosen as a wet-finger-in-air kRESOLVE_THREADS = 20 kSKS_POLL_THREADS = 20 # Standard service ports to query in absense of overrides kRECON_PORT = 11370 kHKP_PORT = 11371 # How many seconds to wait for a remote peer to respond: kSTATS_FETCH_TIMEOUT = 30 # Which zone to query to find the country of an IP; if you have a local # mirror, use that, otherwise it's impolite to use this without a local # DNS cache kCOUNTRIES_ZONE = 'zz.countries.nerd.dk.' # The normalised baseline number of keys in the server that we expect, when # reporting /ip-valid, must be at least this much or we error out: kIPLIST_SANITY_MIN = 2600000 # Keyservers calculate stats daily; normally these are close together, # assuming the operator doesn't change stat_hour; however, a restart with # initial_stat will pick up a more current figure. So servers with the same # keys can differ in value by as many new keys as we see in a day. We add # this jitter allowance to a threshold allowing 1stddev. kDAILY_KEYJITTER = 500 # How often to trigger a scan kINTER_SCAN_INTERVAL_SECS = 3600 * 4 # How much random interval we can go up or down from that interval kINTER_SCAN_INTERVAL_JITTER = 120 # How we log: kLOGGER_NAME = '' kLOGGER_CONFIG_FILE = '../etc/sks-peers.log.conf' kSYSLOG_FACILITY = 'LOG_DAEMON' kSYSLOG_ADDRESS = '/var/run/log' # Names we blacklist as we'll refuse to talk to them; yes, some sites # do have localhost in their membership files. kSKIP_ENTRIES = ('localhost', '127.0.0.1', '::1') # END OF TUNABLES # ###################################################################### # This one may *not* contain any variables or directives as it may be used # by quick & dirty handlers, rather than the template system kPAGE_TEMPLATE_BASIC_HEAD = \ """ """ % (kHTML_CSS_STYLESHEET, kHTML_FAVICON) kPAGE_TEMPLATE_BADUSER = kPAGE_TEMPLATE_BASIC_HEAD + """#slurp $summary

$summary

$error
""" kPAGE_TEMPLATE_HEAD = kPAGE_TEMPLATE_BASIC_HEAD + """#slurp $my_hostname Peer Mesh

$my_hostname Peer Mesh

$warning $scanning_active
Entries at depth 1 are direct peers. Others are seen by spidering the peers.
""" kPAGE_TEMPLATE_FOOT = """#slurp
HostInfoIPGeocodingMutualVersionKeysDistance
SKS has $peer_count peers of $mesh_count visible
""" kPAGE_TEMPLATE_HOST = """#slurp $html_link$host_aliases_text $ip $geo $mutual $version $keycount $distance """ kPAGE_TEMPLATE_HOSTERR = """#slurp $hostname Error: $error """ kPAGE_TEMPLATE_HOSTMORE = """#slurp $ip$geo """ kPAGE_TEMPLATE_HEAD_PEER_INFO = kPAGE_TEMPLATE_BASIC_HEAD + """#slurp Peer stats $peername

Peer stats $peername

$warning """ # keycount ips version mailsync ip_count mailsync_count all_peers # [ and peername from main dict ] kPAGE_TEMPLATE_PEER_INFO_MAIN = """#slurp #if $mailsync_count > 0 #for $msout in $mailsync[1:] #end for #else #end if
Name$peername
IPs$ips
SKS Version$version
Key count$keycount
Mailsync$mailsync[0]
$msout
MailsyncNone
""" kPAGE_TEMPLATE_PEER_INFO_PEERS_START = """#slurp """ # name in out common in_only out_only kPAGE_TEMPLATE_PEER_INFO_PEERS = """#slurp """ kPAGE_TEMPLATE_PEER_INFO_PEERS_END = """#slurp
Peers of $peername
NameCommonOutboundInbound
$name$common$out$in
""" kPAGE_TEMPLATE_FOOT_PEER_INFO = """#slurp """ # ###################################################################### class Error(Exception): """Generic top-level error exception for this tool.""" pass class InternalUsageError(Error): """We screwed the pooch so badly, we don't even call ourselves right.""" pass class MissingElement(Error): """Page element not present.""" pass # ---------------------------------------------------------------------- # Use event purely for thread-safe bool check _logging_init_called = threading.Event() def log_init(stderr=False, debug=False): _logging_init_called.set() manual_config = True if os.path.exists(kLOGGER_CONFIG_FILE): logging.config.fileConfig(kLOGGER_CONFIG_FILE) manual_config = False logger = logging.getLogger(kLOGGER_NAME) if manual_config: level = logging.INFO if not debug else logging.DEBUG logger.setLevel(level) if stderr: output = logging.StreamHandler() if debug: output.setLevel(logging.DEBUG) logger.addHandler(output) else: logger.addHandler(logging.handlers.SysLogHandler( address=kSYSLOG_ADDRESS, facility=getattr(logging.handlers.SysLogHandler, kSYSLOG_FACILITY) )) def debug(msg='?', *args): logging.getLogger(kLOGGER_NAME).debug(msg, *args) def log(msg='?', *args, **kwargs): if 'level' in kwargs: level = kwargs['level'] if type(level) != type(42): level = getattr(logging, level.upper()) else: level = logging.INFO if 'args' in kwargs: args = kwargs['args'] logging.getLogger(kLOGGER_NAME).log(level, msg, *args) def errlog(msg='?', *args): log(msg, args=args, level=logging.ERROR) def suicide_delayed(delay=3): def kill_me(): time.sleep(delay) os.kill(os.getpid(), signal.SIGTERM) threading.Thread(target=kill_me).start() # signal handler params must come first def act_reload_queue(signal=None, frame=None, note=None): global sks_data_lock, sks_data with sks_data_lock: if sks_data is None: return False if 'gatherer' not in sks_data: return False g = sks_data['gatherer'] if note is None and signal is not None: note = 'Signal %d received' % signal return g.rescan(note) def exceptlog(description, level=None): if level is None: level = logging.DEBUG # will mostly be retrieval errors einfo = sys.exc_info() log('%s raised: %s', description, einfo[1], level=level) for l in traceback.format_tb(einfo[2]): log('...: %s', l, level=level) return einfo[1] def html_exception_grab(): einfo = sys.exc_info() if einfo[0] is None: return '' res = ['
%s
' % html_escape(einfo[1])] for l in traceback.format_tb(einfo[2]): res.append('%s
' % html_escape(l)) res.append('
\n') return '\n'.join(res) # ---------------------------------------------------------------------- # This might be able to override the logging constants too. Ideally, it # should and it's a bug if it can't, but a bug we might live with short-term. # More specifically, this should be called very early in the entry points, so # the log init functions should not have been called yet. They might, if # called from the top-level parse, which I believe includes function # decorators. So what I forsee happening is that in debugging a wrap_*() # function, we invoke debug logging and break the ability for the main config # to override those constants. Not the end of the world, but is it worth # putting in a guard here for this? OTgH, I'd just end up cussing and # removing the guard to add the debug. OTOH, spotting why the logging has # gone wrong would be a pain to debug when I've forgotten all about this. # Conclusion: whine loudly if logging already initialised, by taking advantage # of that initialisation # Note that when we're done, we can use debug() in any case. def load_config_file(environ): """Load the kCONFIG_FILE if it exists, reloading if needed. Will allow overrides of any of our 'constant's, which means kFOO. """ confdict = {} global config_file_timestamp, sks_data_lock # sigh; need to re-examine the control flows to see if we can rationalise # away this additional check, just because I want to not change the config # in the middle of an update. Although I probably access the config unlocked # and only update with the results of those checks, so this all needs a # re-think. # TODO: re-think with startup_lock: if sks_data_lock is None: our_lock = threading.Lock() else: our_lock = sks_data_lock with our_lock: if _logging_init_called.is_set() and config_file_timestamp is None: errlog('Logging initialised before config file load!') config_file = kCONFIG_FILE if 'DOCUMENT_ROOT' in environ: # no need to globalise, but could, to provide better /environz feedback? config_file = os.path.normpath(os.path.join( environ['DOCUMENT_ROOT'], os.path.pardir, 'etc', kCONFIG_FILE)) try: statres = os.stat(config_file) have_config = True except OSError, e: have_config = False if config_file_timestamp: if not have_config: debug('No longer have a config file') return if statres.st_mtime <= config_file_timestamp: debug('Config file still the latest') return debug('Config file timestamp changed: %d -> %d' % ( config_file_timestamp, long(statres.st_mtime))) try: execfile(config_file, confdict) except IOError, e: # it's fine for the config file to not exist debug('Failed to load config file: %s', e) return # race condition: if file modified after we stat'd it, then we have a more # recent configuration than we realize; so next time around, we'll stat, # decide to pick up a new config file and then repeat ourselves, loading # the config we already have. Not perfect, but this is the safe way # around. config_file_timestamp = long(statres.st_mtime) for item in confdict: if not item.startswith('k'): continue if item not in globals(): continue globals()[item] = confdict[item] # ---------------------------------------------------------------------- def html_escape(s): """HTML Entity escaping.""" return str(s).replace('&', '&').replace('<', '<').replace('>', '>') def html_interp(*params): """HTML Entity escape each param, returning results as single tuple. Useful for interpolation by providing this as single % param. """ return tuple(map(html_escape, params)) # ---------------------------------------------------------------------- def wrap_check_privileged(handler): """WSGI method decorator, check authorisation to access.""" # *sigh* API change in ipaddr, IP class initiator gone, have two generator # functions now? Of course, for some reason now only IPv4 *networks* have the # IsLoopback() method. Also, we can no longer test if it's an IPv6 address by # testing isinstance(..., ipaddr.IPv6). It's BaseV6 or more specific? def _wsgi_hook(environ, start_response): ip = environ.get('REMOTE_ADDR', None) if ip: ip = ipaddr.IPAddress(ip) else: return return_error(environ, start_response) load_config_file(environ) # On the bright side, both IPv4 and IPv6 now have a consistent new attribute, # is_loopback, which raises the question of why IPv4 networks also have # IsLoopback() as a method ... if ip.is_loopback: return handler(environ, start_response) for block in kPRIVILEGED_ACCESS: net = ipaddr.IPNetwork(block) if net.Contains(ip): return handler(environ, start_response) return return_error(environ, start_response) pass return _wsgi_hook # ---------------------------------------------------------------------- def _sort_hostnames(iterable): """Sort the iterable parameter as hostnames, least-specific label to most.""" keyed = [] for entry in iterable: key = entry.split('.') key.reverse() keyed.append( (key, entry) ) for entry in sorted(keyed): yield entry[1] # ---------------------------------------------------------------------- def load_membership(): """Returns dict of data, incl list of hosts from the membership file. Returns a dict with members: hostnames: hostnames extracted membership_stat: result of stat()ing the membership file """ peer_re = re.compile('^([A-Za-z0-9]\S+)\s+\d') hostnames = set() with open(kSKS_MEMBERSHIP) as fn: config_stat = os.fstat(fn.fileno()) for line in fn: m = peer_re.match(line) if not m: continue hostnames.add(Hostname(m.group(1))) return { 'hostnames': hostnames, 'membership_stat': config_stat, } # ====================================================================== class Hostname(str): """A simple textual hostname.""" pass class AddressInfo(object): _all_lock = threading.Lock() _all_ai = [] NEW_ITEM = 1 NEW_NAME = 2 OLD_ITEM = 3 @classmethod def clear_all_addresses(cls): """Reset state of known addresses.""" with cls._all_lock: cls._all_ai = [] @classmethod def find_address_info(cls, obj, insert=False): newips = [x[0] for x in obj] #debug('AI.fai(%s): have IPs: %s', obj.hostname, ' '.join(newips)) with cls._all_lock: for ai in cls._all_ai: for ip in newips: if ip in ai.addresses: return ai if insert: cls._all_ai.append(obj) return None @classmethod def register_address_info(cls, obj): existing = cls.find_address_info(obj, insert=True) if not existing: return cls.NEW_ITEM added_name = existing.add_alt_name(obj.hostname) if added_name: return cls.NEW_NAME return cls.OLD_ITEM """Information about the IP addressing of an SKS peer.""" def __init__(self, hostname, walk_state, walker_cb=None, **kwargs): self.hostname = Hostname(hostname) self.portstr = '%d' % kRECON_PORT self._walk_state = walk_state self._walker_cb = walker_cb self.exception = None self.addresses = list() self.alt_names = list() self.geography = dict() #debug('AddressInfo(%s): created', hostname) self.update_lock = threading.Lock() with self.update_lock: try: self.resolve_ips() self.resolve_geographies() # resolve geography first, to populate the dict, to keep iter sane below status = self.__class__.register_address_info(self) except: exceptlog(level=logging.ERROR) self._walker_cb(self._walk_state, (self.hostname, self, status)) @classmethod def _resolve_txt(cls, searchname): try: answers = dns.resolver.query(searchname, 'TXT') except dns.exception.DNSException as e: return [] if not answers: return [] res = [] for rdata in answers: res.extend(rdata.strings) return res @classmethod def _resolve_ip_geography(cls, ip): # This sucks, but even googleip doesn't have a 'reverse' method # suitable for use with DNS if ip.find(':') != -1: return None # XXX: Assumes either IPv6 or IPv4 search = ip.split('.') search.reverse() search = '.'.join(search) + '.' + kCOUNTRIES_ZONE return list(cls._resolve_txt(search)) def add_alt_name(self, newname): with self.update_lock: if newname == self.hostname: return False if newname in self.alt_names: return False if newname in self.addresses: return False self.alt_names.append(newname) return True def resolve_ips(self): addrs = [] try: for (family, socktype, proto, canonname, sockaddr) in socket.getaddrinfo( self.hostname, self.portstr, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG | socket.AI_NUMERICSERV): (addr, port) = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) addrs.append(addr) self.addresses.extend(addrs) except socket.gaierror as e: self.exception = e def resolve_geographies(self): for ip in self.addresses: self.geography[ip] = self._resolve_ip_geography(ip) def __repr__(self): return 'AddressInfo(%s)' % self.hostname def __len__(self): return len(self.addresses) def __iter__(self): return self.iter_ips() def __getitem__(self, i): return self.addresses[i] def iter_ip_geos(self): for ip in self.addresses: yield (ip, self.geography[ip]) def iter_ips(self): for x in self.addresses: yield x def iter_all(self): yield self.hostname for x in self.alt_names: yield x for x in self.addresses: yield x # ====================================================================== class SksNode(object): """Information retrieved from an SKS peer.""" # If we get rid of namespaceHTMLElements=False then we enter this mess: #parse_namespaces = { 'html': 'http://www.w3.org/1999/xhtml' } def __init__(self, hostname, walk_state, walker_cb=None, **kwargs): self.hostname = hostname self.portstr = '%d' % kHKP_PORT self._url = 'http://%s:%s/pks/lookup?op=stats' % (self.hostname, self.portstr) self.exception = None self._walk_state = walk_state self.distance = isinstance(walk_state, numbers.Integral) and str(walk_state) or '-1' self._walker_cb = walker_cb #debug('SksNode(%s): created', hostname) self.fetch_page() try: if self.exception is None: self.analyse() except: exceptlog('Analyse(%s)' % hostname) if self._walker_cb: self._walker_cb(self._walk_state, hostname, i_am=self) def fetch_page(self): try: request = urllib2.Request(self._url) request.add_header('User-Agent', 'sks_peers/0.1 (SKS mesh spidering)') self._data = urllib2.urlopen(request, timeout=kSTATS_FETCH_TIMEOUT).read() # urllib2 has URLError and HTTPError, subclassing from IOError; however, # it uses httplib internally which tends to throw BadStatusLine; all of # httplib's exceptions subclass from a local HTTPException class except (IOError, urllib2.httplib.HTTPException) as e: #debug('SksNode/Fetch(%s): %s', self.hostname, e) self.exception = e def _table_following(self, search): if '"' in search: raise InternalUsageError('Malformed search pattern: {{{%s}}}' % search) s = self._parsedhtml.xpath('//*[text()="%s"]' % search) if not len(s): raise MissingElement(search) s = s[0].getnext() while s.tag != 'table': s = s.getnext() return s def _plain_rows_of(self, search): table = self._table_following(search) return set([str(x.text.strip()) for x in table.findall('.//td')]) def _dict_from_plain_rows(self, search): table = self._table_following(search) # Bug-fix: used split(None, 2) by mistake; the second param is number of # splits, not number of elements in result return dict([x.text.strip().split(None, 1) for x in table.findall('.//td')]) def _kvdict_from_table(self, search): table = self._table_following(search) d = dict() for r in table.findall('.//tr'): pair = [x.text.strip() for x in r.findall('td')] d[pair[0].rstrip(':')] = pair[1] return d def analyse(self): if self.exception is not None: return try: # BeautifulSoup was a good choice at the time, but is no longer viable; # I initially switched to using html5lib to build a beautifulsoup tree # but that is now deprecated. self._parsedhtml = html5lib.parse(self._data, treebuilder='lxml', namespaceHTMLElements=False) except: self.exception = html_exception_grab() return self.peers = self._dict_from_plain_rows('Gossip Peers') # Have seen malformed stuff where people had trailing garbage in the lines for p in self.peers: m = re.match(r'^(\d+)\s.*', self.peers[p]) if m: self.peers[p] = m.groups()[0] if self._walker_cb: for peer in self.peers: if peer in kSKIP_ENTRIES: #debug('Analyse(%s): skipping blacklisted item {%s}', self.hostname, peer) pass else: self._walker_cb(self._walk_state, Hostname(peer), origin=self) else: debug('Analyse(%s): no walker', self.hostname) self.mutual = False for me in (kHOSTNAME, kHOSTNAME_PEER): try: if me in self.peers and int(self.peers[me]) == kRECON_PORT: self.mutual = True except ValueError as e: pass self.mailsync = self._plain_rows_of('Outgoing Mailsync Peers') self._settings = self._kvdict_from_table('Settings') # settings should have: Hostname Version 'HTTP port' 'Recon port' 'Debug level' self.version = self._settings['Version'] try: t1 = self._parsedhtml.xpath('//h2[text()="Statistics"]') if not len(t1): raise MissingElement('Statistics (h2)') t = t1[0].getnext().text if t.startswith('Total number of keys'): self.keycount = int(t.split(':')[1]) except (AttributeError, MissingElement) as e: self.keycount = -1 del self._data, self._parsedhtml def __nonzero__(self): if self.exception: return False return True def own_hostname(self): try: return str(self._settings['Hostname']) except: return self.hostname def __str__(self): if self.exception: return str(self.exception) try: return str(self._settings['Hostname']) except: return self.hostname # ====================================================================== SM_R_DNS = collections.namedtuple('SM_R_DNS', 'TAG walk_state hostname ai status') SM_R_Node = collections.namedtuple('SM_R_Node', 'TAG walk_state hostname node') SM_R_Link = collections.namedtuple('SM_R_Link', 'TAG walk_state peername origin') class StubGraph(object): """Used to hold a length snapshot of a graph.""" def __init__(self, graph): self.len = len(graph.nodes()) + len(graph.edges()) def __len__(self): return self.len class SpiderManager(threading.Thread): """Class to manage spidering all the nodes in the SKS peering mesh. We receive add requests; we manage lookups for those and dispatch to the relevant lookup pools for work. We maintain a graph of visited nodes. """ def __init__(self): threading.Thread.__init__(self) self.name = 'SpiderManager' self.daemon = True self._results_q = queue.Queue() self._have_managers = threading.Semaphore(0) self._data_lock = threading.Lock() self.reset() def reset(self): self._awaiting_nodes = set() self._awaiting_ai = set() self._broken_ai = set() self._broken_node = set() self._canon_hostname = {} # Hostname -> Hostname self._ip_to_hostname = {} # We build the graph with the node itself being the Hostname but with # attributes upon the node, 'sks' and 'ai', giving data structure. # We use pygraphviz for rendering but not inside the SpiderManager # Note that attributes are multivalue so we'll end up with multiple depths self._graph = pygraph.classes.digraph.digraph() def __len__(self): with self._data_lock: return self._results_q.qsize() + len(self._awaiting_nodes) + len(self._awaiting_ai) def set_managers(self, manager_dns, manager_node): """Sets the manager objects for handling work dispatch.""" self._manager_dns = manager_dns self._manager_node = manager_node self._have_managers.release() #debug('Spider: have managers') def cb_dns_resolution(self, walk_state, results): """Called from DNS resolution pool to tell us the hostname and newness.""" (hostname, ai, status) = results self._results_q.put(SM_R_DNS('DNS', walk_state, hostname, ai, status)) def cb_node_newnode(self, walk_state, hostname, origin=None, i_am=None): """Called from SksNode scraping pool to give us information. Either origin or i_am will contain the node that called us. If origin, then hostname is a new peer, else hostname is the node. """ # Note that SksNode overrides __nonzero__ so test against None if i_am is not None: self._results_q.put(SM_R_Node('NODE', walk_state, hostname, i_am)) elif origin is not None: self._results_q.put(SM_R_Link('LINK', walk_state, hostname, origin)) else: errlog('Unknown cb_node_newnode(%s) invocation [%s]', hostname, walk_state) def _get_canon_hostname(self, hn): if hn in self._canon_hostname: return self._canon_hostname[hn] if hn in self._ip_to_hostname: return self._ip_to_hostname[hn] return hn def _have_node_item(self, hostname, item): hostname = self._get_canon_hostname(hostname) if not self._graph.has_node(hostname): return False for attr_tuple in self._graph.node_attributes(hostname): if attr_tuple[0] == item: return True return False def _get_node_item(self, hostname, item, multi=False): hostname = self._get_canon_hostname(hostname) if not self._graph.has_node(hostname): return None res = [] for attr_tuple in self._graph.node_attributes(hostname): if attr_tuple[0] == item: if multi: res.append(attr_tuple[1]) else: return attr_tuple[1] return res or None # There is no way to remove an attribute sanely; removing the node removes # its edges and adding an attribute with an existing name is multivalue. def _set_node_item(self, hostname, item, value): if not self._graph.has_node(hostname): self._graph.add_node(hostname) self._graph.add_node_attribute(hostname, (item, value)) def _have_sksnode(self, hostname): return self._have_node_item(hostname, 'sks') def _have_ai(self, hostname): return self._have_node_item(hostname, 'ai') def _get_sksnode(self, hostname): return self._get_node_item(hostname, 'sks') def _get_ai(self, hostname): return self._get_node_item(hostname, 'ai') def _get_depth(self, hostname): return min(self._get_node_item(hostname, 'depth', multi=True)) def _set_sksnode(self, hostname, sks): if hostname in self._awaiting_nodes: self._awaiting_nodes.remove(hostname) return self._set_node_item(hostname, 'sks', sks) def _set_ai(self, hostname, ai): if hostname in self._awaiting_ai: self._awaiting_ai.remove(hostname) return self._set_node_item(hostname, 'ai', ai) def _set_depth_if_lower(self, hostname, depth): if not self._have_node_item(hostname, 'depth'): self._set_node_item(hostname, 'depth', depth) return old = self._get_node_item(hostname, 'depth') if old > depth: self._set_node_item(hostname, 'depth', depth) def _graph_node(self, hostname, depth=None): hostname = self._get_canon_hostname(hostname) if depth is None: return if not self._graph.has_node(hostname): self._graph.add_node(hostname, attrs=[('depth', depth)]) return self._set_depth_if_lower(hostname, depth) def _graph_collapse_node_into(self, bad, good): self._canon_hostname[bad] = good g = self._graph if not g.has_node(good): g.add_node(good) for attr_tuple in g.node_attributes(bad): g.add_node_attribute(good, attr_tuple) for src in g.incidents(bad): if not g.has_edge((src, good)): g.add_edge((src, good)) g.add_edge_attributes((src, good), g.edge_attributes((src, bad))) for dest in g.neighbors(bad): if not g.has_edge((good, dest)): g.add_edge((good, dest)) g.add_edge_attributes((bad, dest), g.edge_attributes((bad, dest))) g.del_node(bad) if bad in self._awaiting_nodes: self._awaiting_nodes.remove(bad) self._awaiting_nodes.add(good) if bad in self._awaiting_ai: self._awaiting_ai.remove(bad) self._awaiting_ai.add(good) def _trigger_dns_ifneeded(self, hn, depth=None): if not isinstance(hn, Hostname): raise TypeError('hn(%s) is not Hostname() type' % hn) if self._have_ai(hn): return if hn not in self._awaiting_ai: self._awaiting_ai.add(hn) self._manager_dns.add(hn, walk_state=depth) def _trigger_sksnode_ifneeded(self, hn, depth=None): if not isinstance(hn, Hostname): raise TypeError('hn(%s) is not Hostname() type' % hn) if self._have_sksnode(hn): return if hn not in self._awaiting_nodes: self._awaiting_nodes.add(hn) self._manager_node.add(hn, walk_state=depth) def add_origin(self, hn): self._graph_node(hn, depth=0) self._trigger_dns_ifneeded(hn, depth=0) self._trigger_sksnode_ifneeded(hn, depth=0) def do_host(self, hn, depth=None): self._graph_node(hn, depth=depth) self._trigger_dns_ifneeded(hn, depth=depth) self._trigger_sksnode_ifneeded(hn, depth=depth) # hit = handle in thread; self._data_lock held def _hit_dns(self, result): """We have some DNS results.""" # result.status has values AddressInfo.{NEW_ITEM, NEW_NAME, OLD_ITEM} # currently we don't use that if result.hostname not in self._awaiting_ai: errlog('Seen DNS results which we were not expecting [%s] -> (%s)', result.hostname, ' '.join(result.ai)) if self._have_ai(result.hostname): # Turns out graph lets you have multiple attr,val with same attr self._awaiting_ai.discard(result.hostname) else: self._graph_node(result.hostname, depth=result.walk_state) self._set_ai(result.hostname, result.ai) extant_hostname = None for ip in result.ai: if ip in self._ip_to_hostname: extant_hostname = self._ip_to_hostname[ip] break new_register = extant_hostname or result.hostname for ip in result.ai: if ip not in self._ip_to_hostname: self._ip_to_hostname[ip] = new_register if extant_hostname and extant_hostname != result.hostname: self._graph_collapse_node_into(result.hostname, extant_hostname) return if result.hostname in self._awaiting_nodes: return # query pending if self._have_sksnode(result.hostname): return self._trigger_sksnode_ifneeded(result.hostname, result.walk_state) #debug('Spider: new Node lookup post-DNS for %s', result.hostname) return def _hit_node(self, result): """Knowledge of a node in itself.""" if result.hostname not in self._awaiting_nodes: errlog('Seen node which we were not expecting? [%s]', result.hostname) hostname = self._get_canon_hostname(result.hostname) if self._have_sksnode(hostname): prev = self._get_sksnode(hostname) if (prev is None or ( prev.exception is None and result.node.exception is not None)): self._set_sksnode(hostname, result.node) self._set_depth_if_lower(result.hostname, result.walk_state) self._awaiting_nodes.discard(hostname) return #debug('Spider: Node result for %s', result.hostname) self._set_sksnode(result.hostname, result.node) self._trigger_dns_ifneeded(result.hostname, result.walk_state) def _hit_peer(self, result): """New link in graph, have node as origin.""" peername = self._get_canon_hostname(result.peername) srcname = self._get_canon_hostname(result.origin.hostname) self._trigger_dns_ifneeded(peername, (result.walk_state+1)) # don't trigger sksnode based on peer name, wait for DNS results; we # don't want a new peer sksnode object unless the name really is new self._graph_node(srcname, depth=result.walk_state) self._graph_node(peername, depth=(result.walk_state+1)) # we can get dup edges if two hosts resolve to the same IP, so that we get # one canonical host if not self._graph.has_edge((srcname, peername)): self._graph.add_edge((srcname, peername)) def run(self): self.reset() self._have_managers.acquire() while True: try: result = self._results_q.get() with self._data_lock: if result.TAG == 'DNS': self._hit_dns(result) elif result.TAG == 'NODE': self._hit_node(result) elif result.TAG == 'LINK': self._hit_peer(result) finally: self._results_q.task_done() def collect(self): #c_n = len(self._graph.nodes()) #c_an = len(self._awaiting_nodes) #c_aa = len(self._awaiting_ai) #debug('Spider/collect: N=%d Awaiting H->N=%d H->A=%d', c_n, c_an, c_aa) return StubGraph(self._graph) def GetGraph(self): """Return a sanitised copy of the collection graph.""" gr = pygraph.classes.digraph.digraph() for n in self._graph.nodes(): ad = {} for attr, val in self._graph.node_attributes(n): if attr not in ad: ad[attr] = val continue if attr == 'depth' and val < ad[attr]: ad[attr] = val if 'sks' in ad and ad['sks'].exception is not None: ad['sks_failure'] = ad['sks'] del ad['sks'] gr.add_node(n, list(ad.iteritems())) if 'ai' in ad: for ip in ad['ai']: gr.add_node_attribute(n, ('ip', ip)) if 'sks' in ad: for var, fcb in [ ('keycount', lambda x: x > 0), ('version', None), ]: try: val = getattr(ad['sks'], var) if val is not None and fcb is not None: if not fcb(val): val = None if val is not None: gr.add_node_attribute(n, (var, str(val))) except: pass for e in self._graph.edges(): gr.add_edge((e[0], e[1])) return gr # ====================================================================== class PoolWorkerFactory(object): """Make generic pool workers.""" @classmethod def make(cls, name, worker_class): """Our factory entry point. Generate a PoolWorker.""" def run_init(self, index, inq, outq, walker_cb): """PoolWorker __init__.""" threading.Thread.__init__(self) self.name = '%s[%d]' % (name, index) self.daemon = True self._in_queue = inq self._out_queue = outq self._walker_cb = walker_cb def do_worker(self, input_data, depth): """Invoke the wrapper class's constructor for the current data.""" try: return worker_class(input_data, walk_state=depth, walker_cb=self._walker_cb) except Exception as e: return e def do_run(self): """The thread run() method for the constructed classes.""" while True: depth, data = self._in_queue.get() self._out_queue.put( (data, self._do_worker(data, depth=depth)) ) self._in_queue.task_done() d = dict() d['__init__'] = run_init d['_do_worker'] = do_worker d['run'] = do_run return type(name, (threading.Thread,), d) # ---------------------------------------------------------------------- class PoolWorkManager(object): """Run workers asynchronously, emitting results in a queue. Eg, with ThreadResolve, we create the resolver, pass it hostnames, pull out tuples of (hostname, AddressInfo) object in non-deterministic order. """ # There's not much point to getting a shutdown Event too, as we can't have # timeouts in Queue obj.join() so no way to sensibly interrupt a collect. def __init__(self, worker_name, data_class, thread_count, walker_cb=None): if not isinstance(thread_count, numbers.Integral) or thread_count < 1: raise InternalUsageError("Not a positive integer: %s" % str(thread_count)) worker_class = PoolWorkerFactory.make(worker_name, data_class) self._in_queue = queue.Queue() self._out_queue = queue.Queue() self._threads = list() self._previous_results = {} for i in range(0, thread_count): t = worker_class(i, self._in_queue, self._out_queue, walker_cb) t.start() self._threads.append(t) def __len__(self): """Number of results still to be returned. Includes pending.""" return self._in_queue.qsize() + self._out_queue.qsize() def add(self, query, walk_state=None): self._in_queue.put((walk_state, query)) def join(self): self._in_queue.join() def get(self): res = self._out_queue.get() self._out_queue.task_done() return res def collect(self): results = self._previous_results self.join() while self: (k, d) = self.get() results[k] = d self._previous_results = results return results # ====================================================================== class DataGatherer(threading.Thread): """Master persistent thread responsible for keeping data current. This controls all data gathering threads and sleeps between collections. """ def __init__(self, notify_ev): threading.Thread.__init__(self) self._notify_ev = notify_ev self._cmd_queue = queue.Queue() self.name = 'DataGatherer' self.daemon = False self.defer = 0 self.shutdown = False def _get_cmd(self): if not self._notify_ev.is_set(): return None self._notify_ev.clear() try: cmd = self._cmd_queue.get(block=False) except queue.Empty: log('ProgError: notify with empty queue') return 'kill' if cmd in ('kill', 'scan'): return cmd raise InternalUsageError('Unknown DG queue cmd {%s}' % cmd) def _run(self): """The data gathering function. Does not return until shutdown. This manages collection threads, puts the data together in a top-level container and replaces the global object with that one, under a lock, before sleeping to re-do this. """ global sks_data, sks_data_lock spider = SpiderManager() spider.start() resolver = PoolWorkManager( 'Resolve', AddressInfo, kRESOLVE_THREADS, walker_cb=spider.cb_dns_resolution) node_checker = PoolWorkManager( 'SksNodeCheck', SksNode, kSKS_POLL_THREADS, walker_cb=spider.cb_node_newnode) spider.set_managers(resolver, node_checker) while True: with sks_data_lock: if sks_data is not None: sks_data['updating'] = time.time() log('DG: Gathering data') start_over = False # Working from membership always gets some peers. # Working from web-UI is more consistent, but relies upon initial_stat # So we work from both and let the de-dup happen AddressInfo.clear_all_addresses() spider.reset() spider.add_origin(Hostname(kHOSTNAME_PEER)) members = load_membership() hostnames, config_stat = members['hostnames'], members['membership_stat'] self.hostnames = hostnames del members for h in hostnames: spider.do_host(h, depth=1) pass sizes = {} new_data = {} keep_checking = True final_safety = False while keep_checking: keep_checking = False for (target, pooled) in [ (None, None), # DNS is fast; we can finish collecting DNS after a subset, stalling on # the URL retrievals, then continue to walk further on and have more DNS # to do. This is a change in integrity semantics with the introduction of # walking. ('node_data', node_checker), ('addresses', resolver), ('stub_graph', spider), ]: #debug('DG: iter of collection') time.sleep(2) if target is not None: new_data[target] = pooled.collect() old_size = sizes.get(target, 0) new_size = len(new_data[target]) if new_size != old_size: sizes[target] = new_size keep_checking = True cmd = self._get_cmd() if cmd == 'kill': log('DG: termination request received (during scan), returning') return if cmd == 'scan': start_over = True break if len(spider) or len(node_checker) or len(resolver): keep_checking = True # If things completed before that len check, then we'd be incomplete # If we check "one more time" and find differences, then we need to redo # But if we saw empty and that last "one more time" saw no changes, then # we've shown that in the time between seeing no changes and the time # when the queue was empty, there were no changes because the next time # around there were no changes either. So we've covered that interval. if keep_checking: final_safety = False elif not final_safety: keep_checking = True final_safety = True # I *think* the above is correct but should look into formally proving it if start_over: log('DG: early termination of run, starting over') continue new_data['graph'] = spider.GetGraph() del new_data['stub_graph'] new_data['graph_str'] = str(new_data['graph']) new_data.update({ 'mtime': config_stat.st_mtime, 'finished': time.time(), 'gatherer': self, }) with sks_data_lock: sks_data = new_data gc.collect() delay = float(random.randint( kINTER_SCAN_INTERVAL_SECS - kINTER_SCAN_INTERVAL_JITTER, kINTER_SCAN_INTERVAL_SECS + kINTER_SCAN_INTERVAL_JITTER)) log('DG: Gathering data complete, sleeping %.0f seconds', delay) self._sleep_a_while(delay) if self.shutdown: return def _sleep_a_while(self, delay): while True: sleep_starting = time.time() self._notify_ev.wait(delay) slept = time.time() - sleep_starting if float(slept) > (delay - 1): return else: delay -= float(slept) cmd = self._get_cmd() if cmd == 'kill': log('DG: termination request received, returning') self.shutdown = True return if cmd == 'scan': log('DG: received scan request, starting now') return def run(self): """Wrapper around _run to collect exceptions safely.""" global sks_data, sks_data_lock if self.defer: self._sleep_a_while(self.defer) if self.shutdown: return try: self._run() except: e = exceptlog('DataGatherer died;', level=logging.ERROR) with sks_data_lock: sks_data = { 'exception': e } def kill(self): """Returns true iff successfully killed.""" self._cmd_queue.put('kill') self._notify_ev.set() self.join(3.0) return not self.is_alive() def rescan(self, reason=None): """Cut short the sleep, scan now.""" global sks_data, sks_data_lock if reason is None: reason = 'no reason given for rescan' with sks_data_lock: if 'rescan_triggered' in sks_data: log('DG/rescan ignoring retrigger [%s]', reason) return False sks_data['rescan_triggered'] = time.time() log('DG/rescan asking for scan [%s]', reason) self._cmd_queue.put('scan') self._notify_ev.set() return True def defer_initial_scan(self, delay=3600): """Don't start first scan immediately.""" i = int(delay) if i > 0: self.defer = i # ====================================================================== # Should any sanitisation be done on these, or just accept that it's # a debugging aid? def snapshot_data_save(stream): """Takes an existing data and writes a serialisation to stream. WARNING: this method will modify the data being returned. It will remove items not needed once the data has already made it into the global state object. """ global sks_data, sks_data_lock with sks_data_lock: snap = dict(sks_data) # I can del _walker_cb in the constructor but update_lock is needed # post-construction; it's not needed once the data collection is # finished though; the next data collection will create new objects. for ai in snap['addresses'].itervalues(): del ai._walker_cb del ai.update_lock for sks in snap['node_data'].itervalues(): del sks._walker_cb del snap['gatherer'] try: pickle.dump(snap, stream) except Exception as e: print('Pickling failed: %s' % e, file=sys.stderr) def snapshot_data_load(stream): global sks_data, sks_data_lock try: snap = pickle.load(stream) except Exception as e: print('Unpickling failed: %s' % e, file=sys.stderr) sys.exit(1) with sks_data_lock: sks_data = snap # ====================================================================== _kWSGI_IPVALID_STATS = 'pdp.enable.ipvalid-stats' _kWSGI_QUERYDICT = 'pdp.query' _kWSGI_APPROOT = 'pdp.approot' def sks_peers_init(do_signals=True, snapshot_src_stream=None): """Init global state, creating threads as needed. On first call, returns an object which has a kill() method, to try to shut things down cleanly. On subsequent calls, returns None. Normally we will only ever be called once, but if we're being served as an app so that we're launched on-demand then we'll be called via application(); if we are set to multi-threaded, then there can be concurrent calls to application() and thus a race. Break the race with a lock. It's okay to create a lock at top-level, but not threads. """ global startup_lock, sks_data, sks_data_lock, wsgi_selector with startup_lock: if sks_data_lock is not None: return None sks_data_lock = threading.Lock() dg_shutdown = threading.Event() gatherer = DataGatherer(dg_shutdown) if snapshot_src_stream is not None: snapshot_data_load(snapshot_src_stream) gatherer.defer_initial_scan() gatherer.start() s = selector.Selector() url_handlers = [] def _add_handler(url, func, *methods): url_handlers.append(url) md = dict([(x, func) for x in methods]) md['GET'] = func s.add(url, method_dict=md) def _set_approot(app): def _wrap(environ, start_response): environ[_kWSGI_APPROOT] = environ['SCRIPT_NAME'] return app(environ, start_response) return _wrap # No matter how I hook in, when using wsgiref.simple_server.make_server I # need '/' but with mod_wsgi it's ''. _add_handler('', handle_peers_page) _add_handler('/', handle_peers_page) _add_handler('/peer-info', handle_peer_info_page) _add_handler('/ip-valid', handle_ip_valid) _add_handler('/ip-valid-stats', handle_ip_valid_stats) _add_handler('/hostnames-json', handle_hostnames_json) _add_handler('/graph-dot', handle_graph_dot, 'HEAD') _add_handler('/helpz', get_handler_helpz(url_handlers)) _add_handler('/threadz', handle_threadz) _add_handler('/environz', handle_environz) _add_handler('/rescanz', handle_rescanz) _add_handler('/internalz', handle_internalz) _add_handler('/quitz', get_handler_quitz(gatherer)) s.status404 = return_error wsgi_selector = _set_approot(s) log_init() if do_signals: signal.signal(signal.SIGUSR1, act_reload_queue) log('Initialised (uid=%d)', os.getuid()) return gatherer # ====================================================================== def gen_page_namespace(**kwdict): """Return a page namespace dictionary with standard values filled in.""" page_namespace = dict() #TODO: should really URL-encode maintainer too page_namespace['maintainer'] = html_escape(kMAINT_EMAIL) page_namespace['my_hostname'] = html_escape(kHOSTNAME) page_namespace['warning'] = '' page_namespace.update(kwdict) return page_namespace def handle_peers_page(environ, start_response): """Write the peers page. WSGI.""" global sks_data, sks_data_lock still_gathering = True try: with sks_data_lock: _sks_data = sks_data if _sks_data is None: addresses = {} node_data = {} last_mtime = 0.0 gatherer = None updating_since = None elif 'addresses' in _sks_data: still_gathering = False addresses = _sks_data['addresses'] node_data = _sks_data['node_data'] elif 'exception' in _sks_data: data_gather_exception = _sks_data['exception'] addresses = {} node_data = {} else: raise InternalUsageError("sks_data is not None but lacks addresses field") if _sks_data is not None: last_mtime = _sks_data.get('mtime', 0.0) gatherer = _sks_data.get('gatherer') updating_since = _sks_data.get('updating') del _sks_data page_namespace = gen_page_namespace() page_head = Cheetah.Template.Template(kPAGE_TEMPLATE_HEAD, searchList=[page_namespace]) page_foot = Cheetah.Template.Template(kPAGE_TEMPLATE_FOOT, searchList=[page_namespace]) if updating_since is None: page_namespace['scanning_active'] = '' else: scanning_for = time.time() - updating_since page_namespace['scanning_active'] = '

Data gathering run in progress (for last %.0f seconds)

' % scanning_for if still_gathering: page_namespace['warning'] = '

Still gathering data!

' if 'data_gather_exception' in locals(): page_namespace['warning'] = '

DATA GATHERING FAILED

%s
' % html_escape(data_gather_exception) if gatherer is not None: if not gatherer.is_alive(): page_namespace['warning'] += '

DATA GATHERING THREAD DEAD

' suicide_delayed() if addresses: config_stat = os.stat(kSKS_MEMBERSHIP) if config_stat.st_mtime != last_mtime: if gatherer.rescan('membership mtime update'): page_namespace['warning'] += '\n

Config changed, will rescan in background

' host_namespace = dict() host = Cheetah.Template.Template(kPAGE_TEMPLATE_HOST, searchList=[host_namespace, page_namespace]) hostmore = Cheetah.Template.Template(kPAGE_TEMPLATE_HOSTMORE, searchList=[host_namespace, page_namespace]) hosterr = Cheetah.Template.Template(kPAGE_TEMPLATE_HOSTERR, searchList=[host_namespace, page_namespace]) except Exception as e: start_response('500 Error', [('Content-Type', 'text/plain; charset=UTF-8')], sys.exc_info()) yield 'Bleh, we failed:\n\n' yield str(e) yield '\n' raise StopIteration() start_response('200 OK', [ ('Content-Type', 'text/html; charset=UTF-8'), ]) # PEP 333 is explicit that WSGI uses strings, not Unicode yield str(page_head) count_total = 0 count_direct = 0 ordered_entries = [] by_depth = {} # depth, hostname if node_data: for h, v in node_data.iteritems(): if hasattr(v, 'distance'): depth = int(v.distance) else: depth = -1 if depth not in by_depth: by_depth[depth] = [] by_depth[depth].append(h) if -1 in by_depth and len(by_depth) > 1: shuffle = by_depth[-1] del by_depth[-1] shuffle_to = sorted(by_depth.keys()).pop() + 1 by_depth[shuffle_to] = shuffle for d in sorted(by_depth.keys()): ordered_entries.extend(_sort_hostnames(by_depth[d])) if 1 in by_depth: count_direct = len(by_depth[1]) else: count_direct = 0 else: ordered_entries = _sort_hostnames(addresses.keys()) count_direct = len(addresses) for h in ordered_entries: host_namespace.clear() host_namespace['hostname'] = html_escape(h) ainfo = addresses.get(h, [('n/a', None)]) sksnode = node_data.get(h, False) count_total += 1 host_namespace['rowcount'] = count_total host_namespace['rowclass'] = count_total % 2 and 'odd' or 'even' host_namespace['info_page'] = html_escape( environ[_kWSGI_APPROOT] + '/peer-info?peer=%s' % h) if isinstance(ainfo, Exception) or not ainfo: if ainfo is not None and not isinstance(ainfo, Exception) and hasattr(ainfo, 'exception'): host_namespace['error'] = str(ainfo.exception) else: host_namespace['error'] = str(ainfo) yield str(hosterr) continue if len(ainfo) > 1: host_namespace['rowspan'] = ' rowspan="%d"' % len(ainfo) else: host_namespace['rowspan'] = '' sks_url = 'http://%s:11371/pks/lookup?op=stats' % h html_link = '%s' % html_interp(sks_url, h) host_namespace['sks_url'] = sks_url host_namespace['html_link'] = html_link for x, default in ( ('version', '?'), ('keycount', '0'), ('distance', '?'), ): host_namespace[x] = str(getattr(sksnode, x, default)) host_namespace['mutual'] = getattr(sksnode, 'mutual', False) and 'Yes' or 'No' if ainfo.alt_names: host_namespace['host_aliases_text'] = \ ' %s' % ' '.join(ainfo.alt_names) else: host_namespace['host_aliases_text'] = '' first = True for item in ainfo.iter_ip_geos(): host_namespace['ip'] = html_escape(item[0]) geo = item[1] if geo is None: host_namespace['geo'] = '' else: host_namespace['geo'] = ' '.join(geo).upper() if first: yield str(host) first = False else: yield str(hostmore) page_namespace['peer_count'] = count_direct page_namespace['mesh_count'] = count_total yield str(page_foot) # ====================================================================== def wrap_set_query(handler): """WSGI wrapper to set pdp.query from QUERY_STRING.""" def _wrap(environ, start_response): query = {} for p in re.split('[&;]', environ.get('QUERY_STRING', '')): if '=' in p: k, v = p.split('=', 2) query[k] = v else: query[p] = True environ[_kWSGI_QUERYDICT] = query return handler(environ, start_response) return _wrap def return_error(environ, start_response, summary='Page not found', error='The URL which you entered is invalid', code='404'): page_namespace = gen_page_namespace( summary=html_escape(summary), error=html_escape(error)) error_page = Cheetah.Template.Template(kPAGE_TEMPLATE_BADUSER, searchList=[page_namespace]) start_response('%s nope' % code, [ ('Content-Type', 'text/html; charset=UTF-8'), ]) yield str(error_page) raise StopIteration() def get_handler_helpz(registered): @wrap_check_privileged def _handle_helpz(environ, start_response): start_response('200 OK', [ ('Content-Type', 'text/html; charset=UTF-8'), ]) yield kPAGE_TEMPLATE_BASIC_HEAD + \ '/helpz

/helpz

\n' return _handle_helpz @wrap_check_privileged def handle_threadz(environ, start_response): start_response('200 OK', [ ('Content-Type', 'text/plain; charset=UTF-8'), ]) yield 'Threads active:\n\n' for t in sorted(threading.enumerate()): aliveness = not t.is_alive() and ' DEAD' or '' ident = t.ident and ' [%d]' % t.ident or '' daemonic = not t.daemon and ' ' or '' yield '%s%s%s%s\n' % (t.name, aliveness, ident, daemonic) yield '.\n' @wrap_check_privileged def handle_environz(environ, start_response): start_response('200 OK', [ ('Content-Type', 'text/plain; charset=UTF-8'), ]) for pair in sorted(environ.items()): yield '%s = {%s}\n' % pair yield '.\n' @wrap_check_privileged def handle_rescanz(environ, start_response): global sks_data, sks_data_lock start_response('200 OK', [ ('Content-Type', 'text/plain; charset=UTF-8'), ]) if act_reload_queue(note='rescanz hit'): yield 'rescan triggered\n' else: yield 'rescan not triggered\n' with sks_data_lock: d = sks_data if d is None: yield 'First scan not completed, have no sks_data\n' elif 'rescan_triggered' in d: timediff = time.time() - d['rescan_triggered'] yield 'Rescan in progress, triggered %.0f seconds ago\n' % timediff else: yield 'Reason unknown\n' def get_handler_quitz(gatherer): @wrap_check_privileged def handle_quitz(environ, start_response): killed = gatherer.kill() if killed: start_response('200 OK', [ ('Content-Type', 'text/plain; charset=UTF-8'), ]) return 'Shutting down.' else: start_response('500 Internal error', [ ('Content-Type', 'text/plain; charset=UTF-8'), ]) return 'Failed to initiate shutdown.' return handle_quitz # ---------------------------------------------------------------------- @wrap_check_privileged @wrap_set_query def handle_internalz(environ, start_response): start_response('200 OK', [ ('Content-Type', 'text/plain; charset=UTF-8'), ]) def _report_obj(depth, d, name): prefix = ' ' * depth + '* ' results = [] try: for i in sorted(d.keys()): n = name and name + '[' + i + ']' or i try: if isinstance(d[i], dict) and not i.startswith('__'): results.append(prefix + n + ':\n') results.extend(_report_obj(depth + 1, d[i], n)) elif isinstance(d[i], (set, list)) and len(d[i]) > 1: results.append(prefix + n + ':\n') indent = ' ' * (len(prefix) + 2) # textwrap drops trailing newline, even if in input data results.append(textwrap.fill(' '.join(map(str, d[i])), width=110, initial_indent=indent, subsequent_indent=indent) + '\n') elif isinstance(d[i], (pygraph.classes.digraph.digraph, pygraph.classes.graph.graph)): results.append(prefix + n + ': \n') results.append(d[i].write(fmt='dot')) else: results.append(prefix + n + ': ' + repr(d[i]) + '\n') if isinstance(d[i], object) and ( d[i].__class__.__module__ == '__main__' or d[i].__class__.__module__.startswith('_mod_wsgi_')): results.extend(_report_obj(depth + 1, d[i].__dict__, n)) elif isinstance(d[i], type) and d[i].__module__ == '__main__': results.extend(_report_obj(depth + 1, dict([(x, getattr(d[i], x)) for x in dir(d[i]) if not x.startswith('__')]), n)) except Exception as e: results.append('Walking dict %s failed: %s\n' % (n, e)) except Exception as e: results.append('Iter dict failed %s\n' % name) return results g = globals() return [x for x in _report_obj(1, g, '') if isinstance(x, str)] # ====================================================================== def page_redirect(environ, start_response, uri_fragment, reason): reason_h = html_escape(reason) uri = environ[_kWSGI_APPROOT] + uri_fragment uri_h = html_escape(uri) start_response('302 Found', [ ('Content-Type', 'text/html; charset=UTF-8'), ('Location', uri), ]) yield kPAGE_TEMPLATE_BASIC_HEAD yield """Redirect (%s)

Redirecting ...

%s
""" % (reason_h, uri_h, reason_h) @wrap_set_query def handle_peer_info_page(environ, start_response): global sks_data, sks_data_lock with sks_data_lock: _sks_data = sks_data try: peername = urllib.unquote_plus(environ[_kWSGI_QUERYDICT]['peer']) return _handle_peer_info_page(environ, start_response, _sks_data, peername) except KeyError as e: return page_redirect(environ, start_response, '/', 'No "peer" attribute on query') def _handle_peer_info_page(environ, start_response, sks_data, peername): start_response('200 OK', [ ('Content-Type', 'text/html; charset=UTF-8'), ]) page_namespace = gen_page_namespace(peername=html_escape(peername)) page_head = Cheetah.Template.Template(kPAGE_TEMPLATE_HEAD_PEER_INFO, searchList=[page_namespace]) page_foot = Cheetah.Template.Template(kPAGE_TEMPLATE_FOOT_PEER_INFO, searchList=[page_namespace]) warning = '' if sks_data is None: warning += '

Still gathering data!

\n' else: if 'addresses' not in sks_data: warning += '

Still gathering data!

\n' if 'updating' in sks_data: warning += '

Data gathering run in progress (for last %.0f seconds)

\n' % ( time.time() - sks_data['updating']) if 'exception' in sks_data: e = sks_data['exception'] warning += '

DATA GATHERING FAILED

%s
\n' % html_escape(e) node_graph = sks_data.get('graph', None) if not node_graph.has_node(peername): warning += '

Peer "%s" not found

\n' % html_escape(peername) if warning: page_namespace['warning'] = warning yield str(page_head) yield str(page_foot) raise StopIteration() attr_list = node_graph.node_attributes(peername) ip_list = [] ai = None sks = None all_peer_info = {} _COPY_ATTRS = ('keycount', 'version') for attr, val in attr_list: if attr == 'ip': ip_list.append(val) elif attr == 'sks_failure': all_peer_info['sks_failure'] = val elif attr == 'sks': sks = val elif attr == 'ai': ai = val elif attr in _COPY_ATTRS: all_peer_info[attr] = html_escape(str(val)) all_peer_info['ip_count'] = len(ip_list) for attr in _COPY_ATTRS: if attr not in all_peer_info: all_peer_info[attr] = '?' if ip_list: all_peer_info['ips'] = html_escape('[' + '], ['.join(ip_list) + ']') else: all_peer_info['ips'] = 'None' if ai is not None and ai.exception is not None: all_peer_info['ips'] += ' {%s}' % ai.exception if sks is not None and hasattr(sks, 'mailsync'): all_peer_info['mailsync'] = map(html_escape, sorted(sks.mailsync)) all_peer_info['mailsync_count'] = len(sks.mailsync) else: all_peer_info['mailsync'] = '' all_peer_info['mailsync_count'] = 0 sks_url = 'http://%s:11371/pks/lookup?op=stats' % peername all_peer_info['peer_statsurl'] = html_escape(sks_url) outbound = set(node_graph.neighbors(peername)) inbound = set(node_graph.incidents(peername)) common = inbound & outbound out_only = outbound - common in_only = inbound - common all_peers = inbound | outbound all_peer_info['all_peers'] = map(html_escape, all_peers) peer_status = {} page_peer_main = Cheetah.Template.Template(kPAGE_TEMPLATE_PEER_INFO_MAIN, searchList=[all_peer_info, page_namespace]) page_peer_itspeers = Cheetah.Template.Template(kPAGE_TEMPLATE_PEER_INFO_PEERS, searchList=[peer_status]) page_peer_itspeers_start = Cheetah.Template.Template(kPAGE_TEMPLATE_PEER_INFO_PEERS_START, searchList=[all_peer_info, page_namespace]) page_peer_itspeers_end = Cheetah.Template.Template(kPAGE_TEMPLATE_PEER_INFO_PEERS_END, searchList=[all_peer_info, page_namespace]) yield str(page_head) yield str(page_peer_main) yield str(page_peer_itspeers_start) for p in _sort_hostnames(all_peers): peer_status['name'] = html_escape(p) qs = html_escape('peer=%s' % urllib.quote_plus(p)) peer_status['ref_url'] = construct_url(environ, querystring=qs) peer_status['in'] = p in inbound peer_status['out'] = p in outbound peer_status['common'] = p in common peer_status['in_only'] = p in in_only peer_status['out_only'] = p in out_only yield str(page_peer_itspeers) yield str(page_peer_itspeers_end) yield str(page_foot) # ====================================================================== def handle_ip_valid_stats(environ, start_response): environ[_kWSGI_IPVALID_STATS] = 1 return handle_ip_valid(environ, start_response) @wrap_set_query def handle_ip_valid(environ, start_response): """Emit a list of IPs, first line status/control, last line '.'""" global sks_data, sks_data_lock with sks_data_lock: data = sks_data show_stats = False emit_json = 'json' in environ[_kWSGI_QUERYDICT] if _kWSGI_IPVALID_STATS in environ: show_stats = True if emit_json: stats_comma = '' def _stats(text, state={'stats_comma': ''}): ret = '%s"%s"' % (state['stats_comma'], text) if not state['stats_comma']: state['stats_comma'] = ',\n' return ret else: def _stats(text): return 'STATS: %s\n' % text else: def _stats(text): return '' if emit_json: content_type = 'application/json' else: content_type = 'text/plain; charset=UTF-8' start_response('200 OK', [ ('Content-Type', content_type), ]) if emit_json: if show_stats: yield '{\n "stats": [\n' else: yield '{\n' def _json_error_wrapup(reason): if show_stats: preamble = '\n],\n' else: preamble = '' return (preamble + '"status": { "status": "INVALID", "count": 0, "reason": "%s" }' + '\n}\n') % reason if data is None or 'node_data' not in data or not len(data['node_data']): if not emit_json: yield 'IP-Gen/1.1: status=INVALID count=0 reason=first_scan\n.\n' if show_stats: if data is None: x = _stats('sks_data is None') elif 'node_data' not in data: x = _stats('no node_data in sks_data') elif not len(data['node_data']): x = _stats('node_data in sks_data has empty len') else: x = _stats('unknown reason to believe first scan') yield x if emit_json: yield _json_error_wrapup('first_scan') raise StopIteration() try: scan_finished = data['finished'] except: pass ips_one_per_server = {} ips_all = {} ips_skip_1010 = set() servers_1010_count = 0 for name, node in data['node_data'].iteritems(): try: skip_this = False ai = data['addresses'][name] if node: if int(node.keycount) <= 1: if show_stats: yield _stats('dropping server <%s> with %d keys' % ( name, int(node.keycount))) continue if str(node.version) == '1.0.10': if show_stats: yield _stats('will drop 1.0.10 server <%s>' % name) skip_this = True servers_1010_count += 1 if len(ai): ips_one_per_server[ai[0]] = int(node.keycount) for ip in ai: ips_all[ip] = int(node.keycount) if skip_this: ips_skip_1010.add(ip) except: if show_stats: yield _stats('failed to get data for server <%s>' % name) continue # We want to discard statistic-distorting outliers, then of what remains, # discard those too far away from "normal", but we really want the "best" # servers to be our guide, so 1 std-dev of the second-highest remaining # value should be safe; in fact, we'll hardcode a limit of how far below. # To discard, find mode size (knowing that value can be split across two # buckets) and discard more than five stddevs from mode. The bucketing # should be larger than the distance from desired value so that the mode # is only split across two buckets, if we assume enough servers that a # small number will be down, most will be valid-if-large-enough, so that # splitting the count across two buckets won't let the third-best value win kBUCKET_SIZE = 3000 buckets = {} for ip, count in ips_one_per_server.iteritems(): bucket = int(count // kBUCKET_SIZE) if bucket not in buckets: buckets[bucket] = [] buckets[bucket].append(count) if not len(buckets): if emit_json: yield _json_error_wrapup('broken_no_buckets') else: yield 'IP-Gen/1.1: status=INVALID count=0 reason=broken_no_buckets\n.\n' raise StopIteration() largest_bucket = max(buckets, key=lambda b: len(buckets[b])) first_n = len(buckets[largest_bucket]) first_mean = sum(buckets[largest_bucket]) / first_n first_sd = sqrt(sum((x-first_mean)**2 for x in buckets[largest_bucket]) / first_n) first_bounds = (int(first_mean - 5*first_sd), int(first_mean + 5*first_sd)) first_ips_list = filter(lambda x: first_bounds[0] <= ips_all[x] <= first_bounds[1], ips_one_per_server) first_ips_alllist = filter(lambda x: first_bounds[0] <= ips_all[x] <= first_bounds[1], ips_all) first_ips = dict([(x, ips_all[x]) for x in first_ips_list]) first_ips_all = dict([(x, ips_all[x]) for x in first_ips_alllist]) second_mean = sum(first_ips.values()) / len(first_ips) second_sd = sqrt(sum((x-second_mean)**2 for x in first_ips.values()) / len(first_ips.values())) if show_stats: yield _stats('have %d servers in %d buckets (%d ips total)' % ( len(ips_one_per_server), len(buckets), len(ips_all))) for b in sorted(buckets): yield _stats('%6d: %s' % (b, '*'*len(buckets[b]))) yield _stats('largest bucket is %d with %d entries' % (largest_bucket, first_n)) yield _stats('bucket size %d means bucket %d is [%d, %d)' % ( kBUCKET_SIZE, largest_bucket, kBUCKET_SIZE * largest_bucket, kBUCKET_SIZE * (largest_bucket+1))) yield _stats('largest bucket: mean=%f sd=%f' % (first_mean, first_sd)) yield _stats('first bounds: [%d, %d]' % first_bounds) yield _stats('have %d servers within bounds, mean value %f sd=%f' % (len(first_ips_list), second_mean, second_sd)) if second_mean < kIPLIST_SANITY_MIN: if show_stats: yield _stats('mean %f < %d' % (second_mean, int(kIPLIST_SANITY_MIN))) if emit_json: yield _json_error_wrapup('broken_data') else: yield 'IP-Gen/1.1: status=INVALID count=0 reason=broken_data\n.\n' raise StopIteration() threshold = sorted(first_ips.values())[-2] - (kDAILY_KEYJITTER + second_sd) try: i = int(environ[_kWSGI_QUERYDICT]['threshold']) if i > 0: if show_stats: yield _stats('Overriding threshold from CGI parameter; %d -> %d', threshold, i) threshold = i except (KeyError, ValueError) as e: pass ips = [x for x in first_ips_all if first_ips_all[x] >= threshold] if show_stats: yield _stats('Second largest count within bounds: %d' % sorted(first_ips.values())[-2]) yield _stats('threshold: %d' % threshold) not_in_use = len(ips_skip_1010 - set(ips)) yield _stats('dropping all %d servers running version v1.0.10, for %d possible IPs but %d of those already dropped' % ( servers_1010_count, len(ips_skip_1010), not_in_use)) ips = [x for x in ips if x not in ips_skip_1010] timestamp = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime(scan_finished)) count = len(ips) log('ip-valid: Yielding %d of %d values', count, len(ips_all)) # skip 1.0.10 -> skip_1010, because of lookup problems biting gnupg # alg_1 used a fixed threshold (too small to deal with jitter) # alg_2 used stddev+jitter # alg_3 fixed maximum bucket selection (was a code bug) # alg_4 stopped double-counting servers with multiple IP addresses # alg_5 keep 1.0.10 servers for long enough to calculate stats, drop afterwards if emit_json: if show_stats: yield '\n],\n' yield ('"status": { "status": "COMPLETE", "count": %d, "min": %d, "collected": "%s", ' + '"tags": [ "skip_1010", "alg_5" ] },\n') % (count, threshold, timestamp) yield '"ips": [\n "%s"\n]\n}\n' % '",\n "'.join(ips) else: yield 'IP-Gen/1.1: status=COMPLETE count=%d tags=skip_1010,alg_5 min=%d collected=%s\n' % ( count, threshold, timestamp) yield '\n'.join(ips) yield '\n.\n' # ====================================================================== @wrap_set_query def handle_hostnames_json(environ, start_response): global sks_data, sks_data_lock host_names = None addrs = None # should rename 'all' to 'mesh' but too late now if 'all' in environ[_kWSGI_QUERYDICT]: with sks_data_lock: try: node_data = sks_data.get('node_data', None) addrs = sks_data.get('addresses', None) if node_data: if 'really_all' in environ[_kWSGI_QUERYDICT]: host_names = node_data.iterkeys() else: host_names = [hn for hn in node_data.iterkeys() if len(addrs[hn]) and hasattr(node_data[hn], 'version')] except Exception, e: # sks_data still None, most likely debug('No host_names for "all": %s' % e) pass else: host_names = load_membership()['hostnames'] if not host_names: start_response('503 Service unavailable', [ ('Content-Type', 'text/plain'), ]) if 'all' in environ[_kWSGI_QUERYDICT]: yield 'Still waiting for data collection' else: yield 'Problem loading membership file' raise StopIteration() content_type = 'application/json' if 'textplain' in environ[_kWSGI_QUERYDICT]: # debugging aid, seeing results in text browser content_type = 'text/plain' start_response('200 OK', [ ('Content-Type', content_type), ]) # must be at least one entry, if we're still checking host_names for # truth above, so this is safe; note that trailing commas are not permitted # in the JSON spec, per grammar in RFC 4627 and on http://www.json.org/ yield '{\n "hostnames": [\n' prev = '' for hn in _sort_hostnames(host_names): yield '%s "%s"' % (prev, hn) if not prev: prev = ',\n' yield '\n ]\n}\n' # ====================================================================== def handle_graph_dot(environ, start_response): global sks_data, sks_data_lock with sks_data_lock: try: node_graph = sks_data.get('graph', None) finished = sks_data.get('finished', None) except: node_graph = None if node_graph is None: start_response('503 Service unavailable', [ ('Content-Type', 'text/plain'), ]) yield 'Still waiting for data collection' raise StopIteration() if finished is None: finished = time.time() # *cough* timestamp = time.strftime('%Y%m%d_%H%M%SZ') filename = 'sks-peers-%s.dot' % timestamp start_response('200 OK', [ ('Content-Type', 'text/x-graphviz; charset=UTF-8'), ('Content-Disposition', 'attachment; filename="%s"' % filename), ]) if environ['REQUEST_METHOD'] == 'GET': #yield node_graph.write(fmt='dot') # pygraph 1.6.1 removed write and moved it to pygraph algorithms # per the changelog; in fact, appears to be pygraph.readwrite.dot if hasattr(node_graph, 'write'): yield node_graph.write(fmt='dot') else: yield pygraph.readwrite.dot.write(node_graph) else: yield '' # ====================================================================== def application(environ, start_response): """The main entry point for serving as an application. WSGI interface. Will init global context if not already init'd. """ global sks_data_lock, wsgi_selector if sks_data_lock is None: # We're embedded in a web-server and need to bootstrap # Because we're in a web-server, don't register signal handlers; mod_wsgi # fortunately blocks this (but is (appropriately) spammy about it). load_config_file(environ) sks_peers_init(do_signals=False) if wsgi_selector is None: return return_error(environ, start_response, 'Internal problem', 'Failed to initialise correctly, please report this', '500') i_am = environ.get('SCRIPT_NAME', None) if i_am is not None: t = environ.get('REQUEST_URI','').replace(i_am, '', 1) if not len(t): t = '/' environ['REQUEST_URI'] = t return wsgi_selector(environ, start_response) # ====================================================================== # TODO: really need to switch to an option-parsing library def main(argv=None): """Entry point when invoked as a normal binary.""" load_config_file(os.environ) if argv is None: argv = [] snapshot = None log_to_stderr = False log_include_debug = False if len(argv) and argv[0] == 'standalone': from wsgiref.simple_server import make_server log_to_stderr = True argv.pop(0) if len(argv) and argv[0] == 'debug': log_include_debug = True argv.pop(0) if len(argv) >= 2 and argv[0] in ('dump', 'load'): snapshot = argv.pop(0) snapshot_file = argv.pop(0) print('Will %s snapshot in file "%s"' % (snapshot, snapshot_file), file=sys.stderr) log_init(stderr=log_to_stderr, debug=log_include_debug) if log_include_debug: debug('logging set to DEBUG level') if snapshot == 'load': with file(snapshot_file, 'r') as snapin: shutdown = sks_peers_init(snapshot_src_stream=snapin) else: shutdown = sks_peers_init() if shutdown is None: raise InternalUsageError('init failed to return a shutdown object') if 'make_server' in locals(): server_host = 'localhost' port = 8080 if len(argv) >= 1: if argv[0].startswith(':'): argv.insert(0, 'localhost') argv[1] = argv[1][1:] else: server_host = argv[0] try: if len(argv) >= 2: port = int(argv[1]) if port < 2 or port > 65535: raise OverflowError('port out of range') except ValueError as e: print('Unable to parse as valid port: %s' % argv[1], file=sys.stderr) except OverflowError as e: print('Port number %d out of range, resetting' % port, file=sys.stderr) port = 8080 global wsgi_selector server = make_server(server_host, port, wsgi_selector) print('Starting up webserver listening on {%s} %d' % (server_host, port), file=sys.stderr) try: server.serve_forever() except KeyboardInterrupt as e: print('Received keyboard interrupt, shutting down', file=sys.stderr) if not shutdown.kill(): print('Shutdown failed, aargh.', file=sys.stderr) sys.exit(1) else: import wsgiref.handlers global sks_data, sks_data_lock scanning = True while scanning: with sks_data_lock: _sks_data = sks_data if _sks_data is None: time.sleep(1) else: scanning = False if snapshot == 'dump': with file(snapshot_file, 'w') as out: snapshot_data_save(out) print('Wrote snapshot to: %s' % snapshot_file, file=sys.stderr) else: wsgiref.handlers.CGIHandler().run(handle_peers_page) if shutdown.kill(): return 0 return 1 if __name__ == '__main__': main(sys.argv[1:]) # vim: set filetype=python sw=2 expandtab :