1
0
mirror of http://aero2k.de/t/repos/urlbot-native.git synced 2017-09-06 15:25:38 +02:00
Files
urlbot-native-trex/common.py

208 lines
6.0 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
2015-11-30 19:50:11 +01:00
""" Common functions for urlbot """
import html.parser
2015-12-22 13:42:44 +01:00
import json
import logging
import re
import time
import urllib.request
from collections import namedtuple
from urllib.error import URLError
RATE_NO_LIMIT = 0x00
RATE_GLOBAL = 0x01
RATE_NO_SILENCE = 0x02
RATE_INTERACTIVE = 0x04
RATE_CHAT = 0x08
RATE_URL = 0x10
RATE_EVENT = 0x20
RATE_FUN = 0x40
BUFSIZ = 8192
2015-08-21 23:35:28 +02:00
EVENTLOOP_DELAY = 0.100 # seconds
2015-11-30 19:50:11 +01:00
USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64; rv:31.0) ' \
'Gecko/20100101 Firefox/31.0 Iceweasel/31.0'
Bucket = namedtuple("BucketConfig", ["history", "period", "max_hist_len"])
buckets = {
2015-11-30 19:17:40 +01:00
# everything else
RATE_GLOBAL: Bucket(history=[], period=60, max_hist_len=10),
# bot writes with no visible stimuli
RATE_NO_SILENCE: Bucket(history=[], period=10, max_hist_len=5),
# interactive stuff like ping
RATE_INTERACTIVE: Bucket(history=[], period=30, max_hist_len=5),
# chitty-chat, master volume control
RATE_CHAT: Bucket(history=[], period=10, max_hist_len=5),
# reacting on URLs
RATE_URL: Bucket(history=[], period=10, max_hist_len=5),
# triggering events
RATE_EVENT: Bucket(history=[], period=60, max_hist_len=10),
# bot blames people, produces cake and entertains
RATE_FUN: Bucket(history=[], period=180, max_hist_len=5),
}
rate_limit_classes = buckets.keys()
def rate_limit(rate_class=RATE_GLOBAL):
2015-11-30 19:17:40 +01:00
"""
Remember N timestamps,
if N[0] newer than now()-T then do not output, do not append.
else pop(0); append()
:param rate_class: the type of message to verify
:return: False if blocked, True if allowed
"""
if rate_class not in rate_limit_classes:
return all(rate_limit(c) for c in rate_limit_classes if c & rate_class)
now = time.time()
bucket = buckets[rate_class]
2015-11-30 19:50:11 +01:00
logging.getLogger(__name__).debug(
"[ratelimit][bucket=%x][time=%s]%s",
rate_class, now, bucket.history
)
2015-11-30 19:17:40 +01:00
if len(bucket.history) >= bucket.max_hist_len and bucket.history[0] > (now - bucket.period):
# print("blocked")
return False
else:
if bucket.history and len(bucket.history) > bucket.max_hist_len:
bucket.history.pop(0)
bucket.history.append(now)
return True
def rate_limited(max_per_second):
2015-11-30 19:17:40 +01:00
"""
very simple flow control context manager
:param max_per_second: how many events per second may be executed - more are delayed
:return:
"""
min_interval = 1.0 / float(max_per_second)
2015-11-30 19:17:40 +01:00
def decorate(func):
lasttimecalled = [0.0]
2015-11-30 19:17:40 +01:00
def ratelimitedfunction(*args, **kargs):
elapsed = time.clock() - lasttimecalled[0]
lefttowait = min_interval - elapsed
if lefttowait > 0:
time.sleep(lefttowait)
ret = func(*args, **kargs)
lasttimecalled[0] = time.clock()
return ret
2015-11-30 19:17:40 +01:00
return ratelimitedfunction
2015-11-30 19:17:40 +01:00
return decorate
2014-09-27 09:41:29 +02:00
def get_version_git():
2015-11-30 19:17:40 +01:00
import subprocess
2014-09-27 09:41:29 +02:00
2015-11-30 19:17:40 +01:00
cmd = ['git', 'log', '--oneline', '--abbrev-commit']
2014-09-27 09:41:29 +02:00
2015-11-30 19:17:40 +01:00
try:
p = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE)
first_line = p.stdout.readline()
line_count = len(p.stdout.readlines()) + 1
2014-09-27 09:41:29 +02:00
2015-11-30 19:17:40 +01:00
if 0 == p.wait():
# skip this 1st, 2nd, 3rd stuff and use always [0-9]th
return "version (Git, %dth rev) '%s'" % (
line_count, str(first_line.strip(), encoding='utf8')
)
else:
return "(unknown version)"
except:
return "cannot determine version"
2014-09-27 09:41:29 +02:00
2014-09-27 09:41:29 +02:00
VERSION = get_version_git()
def fetch_page(url):
2015-11-30 19:17:40 +01:00
log = logging.getLogger(__name__)
log.info('fetching page ' + url)
request = urllib.request.Request(url)
request.add_header('User-Agent', USER_AGENT)
response = urllib.request.urlopen(request)
html_text = response.read(BUFSIZ) # ignore more than BUFSIZ
if html_text[0] == 0x1f and html_text[1] == 0x8b:
import zlib
try:
gzip_data = zlib.decompress(html_text, zlib.MAX_WBITS | 16)
except:
pass
else:
html_text = gzip_data
response.close()
return html_text, response.headers
def extract_title(url):
2015-11-30 19:17:40 +01:00
log = logging.getLogger(__name__)
global parser
2015-11-30 19:17:40 +01:00
if 'repo/urlbot-native.git' in url:
log.info('repo URL found: ' + url)
return 'wee, that looks like my home repo!', []
2015-11-30 19:17:40 +01:00
log.info('extracting title from ' + url)
try:
(html_text, headers) = fetch_page(url)
except URLError as e:
return None
except Exception as e:
return 'failed: %s for %s' % (str(e), url)
charset = None
2015-11-30 19:17:40 +01:00
if 'content-type' in headers:
log.debug('content-type: ' + headers['content-type'])
2015-11-30 19:17:40 +01:00
if 'text/' != headers['content-type'][:len('text/')]:
return 1, headers['content-type']
2015-11-30 19:17:40 +01:00
charset = re.sub(
r'.*charset=(?P<charset>\S+).*',
r'\g<charset>', headers['content-type'], re.IGNORECASE
)
if charset:
2015-11-30 19:17:40 +01:00
try:
html_text = html_text.decode(charset)
except LookupError:
log.warn("invalid charset in '%s': '%s'" % (headers['content-type'], charset))
2015-11-30 19:17:40 +01:00
if str != type(html_text):
html_text = str(html_text)
2015-11-30 19:17:40 +01:00
result = re.match(r'.*?<title.*?>(.*?)</title>.*?', html_text, re.S | re.M | re.IGNORECASE)
if result:
match = result.groups()[0]
2015-11-30 19:17:40 +01:00
parser = html.parser.HTMLParser()
try:
expanded_html = parser.unescape(match)
except UnicodeDecodeError as e: # idk why this can happen, but it does
log.warn('parser.unescape() expoded here: ' + str(e))
expanded_html = match
return expanded_html
2015-11-30 19:17:40 +01:00
else:
return None
2015-12-22 13:42:44 +01:00
def giphy(subject, api_key):
url = 'http://api.giphy.com/v1/gifs/random?tag={}&api_key={}&limit=1&offset=0'.format(subject, api_key)
response = urllib.request.urlopen(url)
giphy_url = None
try:
data = json.loads(response.read().decode('utf-8'))
giphy_url = data['data']['image_url']
except:
pass
return giphy_url