January 4, 2012

Automated Discovery of Blog Feeds and Twitter, Facebook, LinkedIn Accounts Connected to Business Website

Prerequisites

  1. Python 2.7(or greater 2.x series)
  2. lxml.html
  3. parse_domain.py
  4. PyYAML

Script

This code is available at github.

fwc.py

#!/usr/bin/python2.7
import argparse
import sys
from focused_web_crawler import FocusedWebCrawler
import logging
import code
import yaml
from constraint import Constraint
def main():
   logger = logging.getLogger('data_big_bang.focused_web_crawler')
ap = argparse.ArgumentParser(description='Discover web resources associated with a site.')
ap.add_argument('input', metavar='input.yaml', type=str, nargs=1, help ='YAML file indicating the sites to crawl.')
ap.add_argument('output', metavar='output.yaml', type=str, nargs=1, help ='YAML file with the web resources discovered.')
args = ap.parse_args()
input = yaml.load(open(args.input[0], "rt"))
fwc = FocusedWebCrawler()
for e in input:
e.update({'constraint': Constraint()})
fwc.queue.put(e)
fwc.start()
fwc.join()
with open(args.output[0], "wt") as s:
yaml.dump(fwc.collection, s, default_flow_style = False)
if __name__ == '__main__':
main()

focused-web-crawler.py

from threading import Thread, Lock
from worker import Worker
from Queue import Queue
import logging
class FocusedWebCrawler(Thread):
NWORKERS = 10
def __init__(self, nworkers = NWORKERS):
Thread.__init__(self)
self.nworkers = nworkers
#self.queue = DualQueue()
self.queue = Queue()
self.visited_urls = set()
self.mutex = Lock()
self.workers = []
self.logger = logging.getLogger('data_big_bang.focused_web_crawler')
sh = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
sh.setFormatter(formatter)
self.logger.addHandler(sh)
self.logger.setLevel(logging.INFO)
self.collection = {}
self.collection_mutex = Lock()
def run(self):
self.logger.info('Focused Web Crawler launched')
self.logger.info('Starting workers')
for i in xrange(self.nworkers):
worker = Worker(self.queue, self.visited_urls, self.mutex, self.collection, self.collection_mutex)
self.workers.append(worker)
worker.start()
self.queue.join() # Wait until all items are consumed
for i in xrange(self.nworkers): # send a 'None signal' to finish workers
self.queue.put(None)
self.queue.join() # Wait until all workers are notified
#     for worker in self.workers:
#        worker.join()
self.logger.info('Finished workers')
self.logger.info('Focused Web Crawler finished')

worker.py

from threading import Thread
from fetcher import fetch
from evaluator import get_all_links, get_all_feeds
from collector import collect
from urllib2 import HTTPError
import logging
class Worker(Thread):
def __init__(self, queue, visited_urls, mutex, collection, collection_mutex):
Thread.__init__(self)
self.queue = queue
self.visited_urls = visited_urls
self.mutex = mutex
self.collection = collection
self.collection_mutex = collection_mutex
self.logger = logging.getLogger('data_big_bang.focused_web_crawler')
def run(self):
item = self.queue.get()
while item != None:
try:
url = item['url']
key = item['key']
constraint = item['constraint']
data = fetch(url)
if data == None:
self.logger.info('Not fetched: %s because type != text/html', url)
else:
links = get_all_links(data, base = url)
feeds = get_all_feeds(data, base = url)
interesting = collect(links)
if interesting:
self.collection_mutex.acquire()
if key not in self.collection:
self.collection[key] = {'feeds':{}}
if feeds:
for feed in feeds:
self.collection[key]['feeds'][feed['href']] = feed['type']
for service, accounts in interesting.items():
if service not in self.collection[key]:
self.collection[key][service]  = {}
for a,u in accounts.items():
self.collection[key][service][a] = {'url': u, 'depth':constraint.depth}
self.collection_mutex.release()
for l in links:
new_constraint = constraint.inherit(url, l)
if new_constraint == None:
continue
self.mutex.acquire()
if l not in self.visited_urls:
self.queue.put({'url':l, 'key':key, 'constraint': new_constraint})
self.visited_urls.add(l)
self.mutex.release()
except HTTPError:
self.logger.info('HTTPError exception on url: %s', url)
self.queue.task_done()
item = self.queue.get()
self.queue.task_done() # task_done on None

fetcher.py

import urllib2
import logging
def fetch(uri):
fetch.logger.info('Fetching: %s', uri)
#logger = logging.getLogger('data_big_bang.focused_web_crawler')
print uri
h = urllib2.urlopen(uri)
if h.headers.type == 'text/html':
data = h.read()
else:
data = None
return data
fetch.logger = logging.getLogger('data_big_bang.focused_web_crawler')

evaluator.py

import lxml.html
import urlparse
def get_all_links(page, base = ''):
doc = lxml.html.fromstring(page)
links = map(lambda x: urlparse.urljoin(base, x.attrib['href']), filter(lambda x: 'href' in x.attrib, doc.xpath('//a')))
return links
def get_all_feeds(page, base = ''):
doc = lxml.html.fromstring(page)
feeds = map(lambda x: {'href':urlparse.urljoin(base, x.attrib['href']),'type':x.attrib['type']}, filter(lambda x: 'type' in x.attrib and x.attrib['type'] in ['application/atom+xml', 'application/rss+xml'], doc.xpath('//link')))
return feeds

constraint.py

import urlparse
from parse_domain import parse_domain
class Constraint:
DEPTH = 1
def __init__(self):
self.depth = 0
def inherit(self, base_url, url):
base_up = urlparse.urlparse(base_url)
up = urlparse.urlparse(url)
base_domain = parse_domain(base_url, 2)
domain = parse_domain(url, 2)
if base_domain != domain:
return None
if self.depth >= Constraint.DEPTH: # only crawl two levels
return None
else:
new_constraint = Constraint()
new_constraint.depth = self.depth + 1
return new_constraint

collector.py

import urlparse
import re
twitter = re.compile('^http://twitter.com/(#!/)?(?P[a-zA-Z0-9_]{1,15})

发表回复