From a7b53d855adbe4bad6a561de54b34c025b122581 Mon Sep 17 00:00:00 2001 From: Thorsten Date: Mon, 30 Nov 2015 19:17:40 +0100 Subject: [PATCH] all death to the tab character --- common.py | 266 +++---- idlebot.py | 170 ++--- local_config.py.skel | 82 +-- plugins.py | 1602 +++++++++++++++++++++--------------------- string_constants.py | 44 +- test_urlbot.py | 70 +- urlbot.py | 423 +++++------ 7 files changed, 1328 insertions(+), 1329 deletions(-) diff --git a/common.py b/common.py index 48b392b..bdcab1b 100644 --- a/common.py +++ b/common.py @@ -26,195 +26,195 @@ USER_AGENT = '''Mozilla/5.0 (X11; Linux x86_64; rv:31.0) Gecko/20100101 Firefox/ basedir = '.' if 2 == len(sys.argv): - basedir = sys.argv[1] + basedir = sys.argv[1] def conf_save(obj): - with open(conf('persistent_storage'), 'wb') as fd: - return pickle.dump(obj, fd) + with open(conf('persistent_storage'), 'wb') as fd: + return pickle.dump(obj, fd) def conf_load(): - path = conf('persistent_storage') - if os.path.isfile(path): - with open(path, 'rb') as fd: - fd.seek(0) - return pickle.load(fd) - else: - return {} + path = conf('persistent_storage') + if os.path.isfile(path): + with open(path, 'rb') as fd: + fd.seek(0) + return pickle.load(fd) + else: + return {} def conf_set(key, value): - blob = conf_load() - blob[key] = value - conf_save(blob) + blob = conf_load() + blob[key] = value + conf_save(blob) def conf_get(key, default=None): - blob = conf_load() - return blob.get(key, default) + blob = conf_load() + return blob.get(key, default) Bucket = namedtuple("BucketConfig", ["history", "period", "max_hist_len"]) buckets = { - # everything else - RATE_GLOBAL: Bucket(history=[], period=60, max_hist_len=10), - # bot writes with no visible stimuli - RATE_NO_SILENCE: Bucket(history=[], period=10, max_hist_len=5), - # interactive stuff like ping - RATE_INTERACTIVE: Bucket(history=[], period=30, max_hist_len=5), - # chitty-chat, master volume control - RATE_CHAT: Bucket(history=[], period=10, max_hist_len=5), - # reacting on URLs - RATE_URL: Bucket(history=[], period=10, max_hist_len=5), - # triggering events - RATE_EVENT: Bucket(history=[], period=60, max_hist_len=10), - # bot blames people, produces cake and entertains - RATE_FUN: Bucket(history=[], period=180, max_hist_len=5), + # everything else + RATE_GLOBAL: Bucket(history=[], period=60, max_hist_len=10), + # bot writes with no visible stimuli + RATE_NO_SILENCE: Bucket(history=[], period=10, max_hist_len=5), + # interactive stuff like ping + RATE_INTERACTIVE: Bucket(history=[], period=30, max_hist_len=5), + # chitty-chat, master volume control + RATE_CHAT: Bucket(history=[], period=10, max_hist_len=5), + # reacting on URLs + RATE_URL: Bucket(history=[], period=10, max_hist_len=5), + # triggering events + RATE_EVENT: Bucket(history=[], period=60, max_hist_len=10), + # bot blames people, produces cake and entertains + RATE_FUN: Bucket(history=[], period=180, max_hist_len=5), } rate_limit_classes = buckets.keys() def rate_limit(rate_class=RATE_GLOBAL): - """ - Remember N timestamps, - if N[0] newer than now()-T then do not output, do not append. - else pop(0); append() + """ + Remember N timestamps, + if N[0] newer than now()-T then do not output, do not append. + else pop(0); append() - :param rate_class: the type of message to verify - :return: False if blocked, True if allowed - """ - if rate_class not in rate_limit_classes: - return all(rate_limit(c) for c in rate_limit_classes if c & rate_class) + :param rate_class: the type of message to verify + :return: False if blocked, True if allowed + """ + if rate_class not in rate_limit_classes: + return all(rate_limit(c) for c in rate_limit_classes if c & rate_class) - now = time.time() - bucket = buckets[rate_class] - logging.getLogger(__name__).debug("[ratelimit][bucket=%x][time=%s]%s" % (rate_class, now, bucket.history)) + now = time.time() + bucket = buckets[rate_class] + logging.getLogger(__name__).debug("[ratelimit][bucket=%x][time=%s]%s" % (rate_class, now, bucket.history)) - if len(bucket.history) >= bucket.max_hist_len and bucket.history[0] > (now - bucket.period): - # print("blocked") - return False - else: - if bucket.history and len(bucket.history) > bucket.max_hist_len: - bucket.history.pop(0) - bucket.history.append(now) - return True + if len(bucket.history) >= bucket.max_hist_len and bucket.history[0] > (now - bucket.period): + # print("blocked") + return False + else: + if bucket.history and len(bucket.history) > bucket.max_hist_len: + bucket.history.pop(0) + bucket.history.append(now) + return True def rate_limited(max_per_second): - """ - very simple flow control context manager - :param max_per_second: how many events per second may be executed - more are delayed - :return: - """ - min_interval = 1.0 / float(max_per_second) + """ + very simple flow control context manager + :param max_per_second: how many events per second may be executed - more are delayed + :return: + """ + min_interval = 1.0 / float(max_per_second) - def decorate(func): - lasttimecalled = [0.0] + def decorate(func): + lasttimecalled = [0.0] - def ratelimitedfunction(*args, **kargs): - elapsed = time.clock() - lasttimecalled[0] - lefttowait = min_interval - elapsed - if lefttowait > 0: - time.sleep(lefttowait) - ret = func(*args, **kargs) - lasttimecalled[0] = time.clock() - return ret + def ratelimitedfunction(*args, **kargs): + elapsed = time.clock() - lasttimecalled[0] + lefttowait = min_interval - elapsed + if lefttowait > 0: + time.sleep(lefttowait) + ret = func(*args, **kargs) + lasttimecalled[0] = time.clock() + return ret - return ratelimitedfunction + return ratelimitedfunction - return decorate + return decorate def get_version_git(): - import subprocess + import subprocess - cmd = ['git', 'log', '--oneline', '--abbrev-commit'] + cmd = ['git', 'log', '--oneline', '--abbrev-commit'] - try: - p = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE) - first_line = p.stdout.readline() - line_count = len(p.stdout.readlines()) + 1 + try: + p = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE) + first_line = p.stdout.readline() + line_count = len(p.stdout.readlines()) + 1 - if 0 == p.wait(): - # skip this 1st, 2nd, 3rd stuff and use always [0-9]th - return "version (Git, %dth rev) '%s'" % ( - line_count, str(first_line.strip(), encoding='utf8') - ) - else: - return "(unknown version)" - except: - return "cannot determine version" + if 0 == p.wait(): + # skip this 1st, 2nd, 3rd stuff and use always [0-9]th + return "version (Git, %dth rev) '%s'" % ( + line_count, str(first_line.strip(), encoding='utf8') + ) + else: + return "(unknown version)" + except: + return "cannot determine version" VERSION = get_version_git() def fetch_page(url): - log = logging.getLogger(__name__) - log.info('fetching page ' + url) - try: - request = urllib.request.Request(url) - request.add_header('User-Agent', USER_AGENT) - response = urllib.request.urlopen(request) - html_text = response.read(BUFSIZ) # ignore more than BUFSIZ - response.close() - return 0, html_text, response.headers - except Exception as e: - log.warn('failed: %s' % e) - return 1, str(e), 'dummy' + log = logging.getLogger(__name__) + log.info('fetching page ' + url) + try: + request = urllib.request.Request(url) + request.add_header('User-Agent', USER_AGENT) + response = urllib.request.urlopen(request) + html_text = response.read(BUFSIZ) # ignore more than BUFSIZ + response.close() + return 0, html_text, response.headers + except Exception as e: + log.warn('failed: %s' % e) + return 1, str(e), 'dummy' def extract_title(url): - log = logging.getLogger(__name__) - global parser + log = logging.getLogger(__name__) + global parser - if 'repo/urlbot-native.git' in url: - log.info('repo URL found: ' + url) - return 3, 'wee, that looks like my home repo!' + if 'repo/urlbot-native.git' in url: + log.info('repo URL found: ' + url) + return 3, 'wee, that looks like my home repo!' - log.info('extracting title from ' + url) + log.info('extracting title from ' + url) - (code, html_text, headers) = fetch_page(url) + (code, html_text, headers) = fetch_page(url) - if 1 == code: - return 3, 'failed: %s for %s' % (html_text, url) + if 1 == code: + return 3, 'failed: %s for %s' % (html_text, url) - if not html_text: - return -1, 'error' + if not html_text: + return -1, 'error' - charset = '' - if 'content-type' in headers: - log.debug('content-type: ' + headers['content-type']) + charset = '' + if 'content-type' in headers: + log.debug('content-type: ' + headers['content-type']) - if 'text/' != headers['content-type'][:len('text/')]: - return 1, headers['content-type'] + if 'text/' != headers['content-type'][:len('text/')]: + return 1, headers['content-type'] - charset = re.sub( - r'.*charset=(?P\S+).*', - r'\g', headers['content-type'], re.IGNORECASE - ) + charset = re.sub( + r'.*charset=(?P\S+).*', + r'\g', headers['content-type'], re.IGNORECASE + ) - if '' != charset: - try: - html_text = html_text.decode(charset) - except LookupError: - log.warn("invalid charset in '%s': '%s'" % (headers['content-type'], charset)) + if '' != charset: + try: + html_text = html_text.decode(charset) + except LookupError: + log.warn("invalid charset in '%s': '%s'" % (headers['content-type'], charset)) - if str != type(html_text): - html_text = str(html_text) + if str != type(html_text): + html_text = str(html_text) - result = re.match(r'.*?(.*?).*?', html_text, re.S | re.M | re.IGNORECASE) - if result: - match = result.groups()[0] + result = re.match(r'.*?(.*?).*?', html_text, re.S | re.M | re.IGNORECASE) + if result: + match = result.groups()[0] - parser = html.parser.HTMLParser() - try: - expanded_html = parser.unescape(match) - except UnicodeDecodeError as e: # idk why this can happen, but it does - log.warn('parser.unescape() expoded here: ' + str(e)) - expanded_html = match - return 0, expanded_html - else: - return 2, 'no title' + parser = html.parser.HTMLParser() + try: + expanded_html = parser.unescape(match) + except UnicodeDecodeError as e: # idk why this can happen, but it does + log.warn('parser.unescape() expoded here: ' + str(e)) + expanded_html = match + return 0, expanded_html + else: + return 2, 'no title' diff --git a/idlebot.py b/idlebot.py index 607b668..057d14c 100755 --- a/idlebot.py +++ b/idlebot.py @@ -6,18 +6,18 @@ import sys from common import VERSION, EVENTLOOP_DELAY, conf_load try: - from local_config import conf, set_conf + from local_config import conf, set_conf except ImportError: - sys.stderr.write(''' + sys.stderr.write(''' %s: E: local_config.py isn't tracked because of included secrets and %s site specific configurations. Rename local_config.py.skel and %s adjust to you needs. '''[1:] % ( - sys.argv[0], - ' ' * len(sys.argv[0]), - ' ' * len(sys.argv[0]) - )) - sys.exit(1) + sys.argv[0], + ' ' * len(sys.argv[0]), + ' ' * len(sys.argv[0]) + )) + sys.exit(1) from sleekxmpp import ClientXMPP @@ -25,98 +25,98 @@ got_hangup = False class IdleBot(ClientXMPP): - def __init__(self, jid, password, rooms, nick): - ClientXMPP.__init__(self, jid, password) + def __init__(self, jid, password, rooms, nick): + ClientXMPP.__init__(self, jid, password) - self.rooms = rooms - self.nick = nick + self.rooms = rooms + self.nick = nick - self.add_event_handler('session_start', self.session_start) - self.add_event_handler('groupchat_message', self.muc_message) - self.priority = 0 - self.status = None - self.show = None + self.add_event_handler('session_start', self.session_start) + self.add_event_handler('groupchat_message', self.muc_message) + self.priority = 0 + self.status = None + self.show = None - self.logger = logging.getLogger(__name__) + self.logger = logging.getLogger(__name__) - def session_start(self, _): - self.get_roster() - self.send_presence(ppriority=self.priority, pstatus=self.status, pshow=self.show) + def session_start(self, _): + self.get_roster() + self.send_presence(ppriority=self.priority, pstatus=self.status, pshow=self.show) - for room in self.rooms: - self.logger.info('%s: joining' % room) - ret = self.plugin['xep_0045'].joinMUC( - room, - self.nick, - wait=True - ) - self.logger.info('%s: joined with code %s' % (room, ret)) + for room in self.rooms: + self.logger.info('%s: joining' % room) + ret = self.plugin['xep_0045'].joinMUC( + room, + self.nick, + wait=True + ) + self.logger.info('%s: joined with code %s' % (room, ret)) - def muc_message(self, msg_obj): - """ - Handle muc messages, return if irrelevant content or die by hangup. - :param msg_obj: - :return: - """ - # don't talk to yourself - if msg_obj['mucnick'] == self.nick or 'groupchat' != msg_obj['type']: - return False - elif msg_obj['body'].startswith(conf('bot_user')) and 'hangup' in msg_obj['body']: - self.logger.warn("got 'hangup' from '%s': '%s'" % ( - msg_obj['mucnick'], msg_obj['body'] - )) - global got_hangup - got_hangup = True - return False - elif msg_obj['mucnick'] in conf_load().get("other_bots", ()): - # not talking to the other bot. - return False - else: - return True + def muc_message(self, msg_obj): + """ + Handle muc messages, return if irrelevant content or die by hangup. + :param msg_obj: + :return: + """ + # don't talk to yourself + if msg_obj['mucnick'] == self.nick or 'groupchat' != msg_obj['type']: + return False + elif msg_obj['body'].startswith(conf('bot_user')) and 'hangup' in msg_obj['body']: + self.logger.warn("got 'hangup' from '%s': '%s'" % ( + msg_obj['mucnick'], msg_obj['body'] + )) + global got_hangup + got_hangup = True + return False + elif msg_obj['mucnick'] in conf_load().get("other_bots", ()): + # not talking to the other bot. + return False + else: + return True def start(botclass, active=False): - logging.basicConfig( - level=conf('loglevel', logging.INFO), - format=sys.argv[0] + ' %(asctime)s %(levelname).1s %(funcName)-15s %(message)s' - ) - logger = logging.getLogger(__name__) - logger.info(VERSION) + logging.basicConfig( + level=conf('loglevel', logging.INFO), + format=sys.argv[0] + ' %(asctime)s %(levelname).1s %(funcName)-15s %(message)s' + ) + logger = logging.getLogger(__name__) + logger.info(VERSION) - jid = conf('jid') - if '/' not in jid: - jid = '%s/%s' % (jid, botclass.__name__) - bot = botclass( - jid=jid, - password=conf('password'), - rooms=conf('rooms'), - nick=conf('bot_user') - ) - import plugins + jid = conf('jid') + if '/' not in jid: + jid = '%s/%s' % (jid, botclass.__name__) + bot = botclass( + jid=jid, + password=conf('password'), + rooms=conf('rooms'), + nick=conf('bot_user') + ) + import plugins - if active: - plugins.register_all() - if plugins.plugin_enabled_get(plugins.command_dsa_watcher): - # first result is lost. - plugins.command_dsa_watcher(['dsa-watcher', 'crawl']) + if active: + plugins.register_all() + if plugins.plugin_enabled_get(plugins.command_dsa_watcher): + # first result is lost. + plugins.command_dsa_watcher(['dsa-watcher', 'crawl']) - bot.connect() - bot.register_plugin('xep_0045') - bot.process() - global got_hangup + bot.connect() + bot.register_plugin('xep_0045') + bot.process() + global got_hangup - while 1: - try: - # print("hangup: %s" % got_hangup) - if got_hangup or not plugins.event_trigger(): - bot.disconnect() - sys.exit(1) + while 1: + try: + # print("hangup: %s" % got_hangup) + if got_hangup or not plugins.event_trigger(): + bot.disconnect() + sys.exit(1) - time.sleep(EVENTLOOP_DELAY) - except KeyboardInterrupt: - print('') - exit(130) + time.sleep(EVENTLOOP_DELAY) + except KeyboardInterrupt: + print('') + exit(130) if '__main__' == __name__: - start(IdleBot) + start(IdleBot) diff --git a/local_config.py.skel b/local_config.py.skel index 1194787..d7302dc 100644 --- a/local_config.py.skel +++ b/local_config.py.skel @@ -3,63 +3,63 @@ import time if '__main__' == __name__: - print('''this is a config file, which is not meant to be executed''') - exit(-1) + print('''this is a config file, which is not meant to be executed''') + exit(-1) config = { - 'jid': 'FIXME', - 'password': 'FIXME', - 'rooms': ['FIXME'], + 'jid': 'FIXME', + 'password': 'FIXME', + 'rooms': ['FIXME'], - 'src-url': 'http://aero2k.de/t/repos/urlbot-native.git', + 'src-url': 'http://aero2k.de/t/repos/urlbot-native.git', - 'bot_user': 'native-urlbot', - 'bot_owner': 'FIXME', + 'bot_user': 'native-urlbot', + 'bot_owner': 'FIXME', - 'hist_max_count': 5, - 'hist_max_time': 10 * 60, + 'hist_max_count': 5, + 'hist_max_time': 10 * 60, - 'uptime': -time.time(), - 'request_counter': 0, + 'uptime': -time.time(), + 'request_counter': 0, - 'persistent_storage': 'urlbot.persistent', - 'persistent_locked': False, + 'persistent_storage': 'urlbot.persistent', + 'persistent_locked': False, - 'url_blacklist': [ - r'^.*heise\.de/.*-[0-9]+\.html$', - r'^.*wikipedia\.org/wiki/.*$', - r'^.*blog\.fefe\.de/\?ts=[0-9a-f]+$', - r'^.*ibash\.de/zitat.*$', - r'^.*golem\.de/news/.*$' - r'^.*paste\.debian\.net/((hidden|plainh?)/)?[0-9a-f]+/?$', - r'^.*example\.(org|net|com).*$', - r'^.*sprunge\.us/.*$', - r'^.*ftp\...\.debian\.org.*$' - ], + 'url_blacklist': [ + r'^.*heise\.de/.*-[0-9]+\.html$', + r'^.*wikipedia\.org/wiki/.*$', + r'^.*blog\.fefe\.de/\?ts=[0-9a-f]+$', + r'^.*ibash\.de/zitat.*$', + r'^.*golem\.de/news/.*$' + r'^.*paste\.debian\.net/((hidden|plainh?)/)?[0-9a-f]+/?$', + r'^.*example\.(org|net|com).*$', + r'^.*sprunge\.us/.*$', + r'^.*ftp\...\.debian\.org.*$' + ], - # the "dice" feature will use more efficient random data (0) for given users - 'enhanced-random-user': ('FIXME', 'FIXME'), + # the "dice" feature will use more efficient random data (0) for given users + 'enhanced-random-user': ('FIXME', 'FIXME'), - # the "moin" feature will be "disabled" for given users - 'moin-modified-user': (), - 'moin-disabled-user': (), + # the "moin" feature will be "disabled" for given users + 'moin-modified-user': (), + 'moin-disabled-user': (), - 'tea_steep_time': (3*60 + 40), + 'tea_steep_time': (3 * 60 + 40), - 'image_preview': True, - 'dsa_watcher_interval': 15 * 60 + 'image_preview': True, + 'dsa_watcher_interval': 15 * 60 } def conf(val): - import logging - logger = logging.getLogger(__name__) - if val in list(config.keys()): - return config[val] - logger.warn('conf(): unknown key ' + str(val)) - return None + import logging + logger = logging.getLogger(__name__) + if val in list(config.keys()): + return config[val] + logger.warn('conf(): unknown key ' + str(val)) + return None def set_conf(key, val): - config[key] = val - return None + config[key] = val + return None diff --git a/plugins.py b/plugins.py index e2d3046..e96507c 100644 --- a/plugins.py +++ b/plugins.py @@ -11,7 +11,7 @@ import urllib.parse import urllib.request from common import conf_load, conf_save, RATE_GLOBAL, RATE_NO_SILENCE, VERSION, RATE_INTERACTIVE, BUFSIZ, \ - USER_AGENT, extract_title, RATE_FUN, RATE_NO_LIMIT, conf_get, RATE_URL + USER_AGENT, extract_title, RATE_FUN, RATE_NO_LIMIT, conf_get, RATE_URL from local_config import set_conf, conf from string_constants import excuses, moin_strings_hi, moin_strings_bye, cakes @@ -27,1138 +27,1138 @@ log = logging.getLogger(__name__) def plugin_enabled_get(urlbot_plugin): - blob = conf_load() + blob = conf_load() - if 'plugin_conf' in blob: - if urlbot_plugin.plugin_name in blob['plugin_conf']: - return blob['plugin_conf'][urlbot_plugin.plugin_name].get('enabled', urlbot_plugin.is_enabled) + if 'plugin_conf' in blob: + if urlbot_plugin.plugin_name in blob['plugin_conf']: + return blob['plugin_conf'][urlbot_plugin.plugin_name].get('enabled', urlbot_plugin.is_enabled) - return urlbot_plugin.is_enabled + return urlbot_plugin.is_enabled def plugin_enabled_set(plugin, enabled): - if conf('persistent_locked'): - log.warn("couldn't get exclusive lock") - return False + if conf('persistent_locked'): + log.warn("couldn't get exclusive lock") + return False - set_conf('persistent_locked', True) - blob = conf_load() + set_conf('persistent_locked', True) + blob = conf_load() - if 'plugin_conf' not in blob: - blob['plugin_conf'] = {} + if 'plugin_conf' not in blob: + blob['plugin_conf'] = {} - if plugin.plugin_name not in blob['plugin_conf']: - blob['plugin_conf'][plugin.plugin_name] = {} + if plugin.plugin_name not in blob['plugin_conf']: + blob['plugin_conf'][plugin.plugin_name] = {} - blob['plugin_conf'][plugin.plugin_name]['enabled'] = enabled + blob['plugin_conf'][plugin.plugin_name]['enabled'] = enabled - conf_save(blob) - set_conf('persistent_locked', False) + conf_save(blob) + set_conf('persistent_locked', False) - return True + return True def pluginfunction(name, desc, plugin_type, ratelimit_class=RATE_GLOBAL, enabled=True): - """A decorator to make a plugin out of a function - :param enabled: - :param ratelimit_class: - :param plugin_type: - :param desc: - :param name: - """ - if plugin_type not in ptypes: - raise TypeError('Illegal plugin_type: %s' % plugin_type) + """A decorator to make a plugin out of a function + :param enabled: + :param ratelimit_class: + :param plugin_type: + :param desc: + :param name: + """ + if plugin_type not in ptypes: + raise TypeError('Illegal plugin_type: %s' % plugin_type) - def decorate(f): - f.is_plugin = True - f.is_enabled = enabled - f.plugin_name = name - f.plugin_desc = desc - f.plugin_type = plugin_type - f.ratelimit_class = ratelimit_class - return f + def decorate(f): + f.is_plugin = True + f.is_enabled = enabled + f.plugin_name = name + f.plugin_desc = desc + f.plugin_type = plugin_type + f.ratelimit_class = ratelimit_class + return f - return decorate + return decorate def register_event(t, callback, args): - joblist.append((t, callback, args)) + joblist.append((t, callback, args)) @pluginfunction('mental_ill', 'parse mental illness', ptypes_PARSE, ratelimit_class=RATE_NO_SILENCE | RATE_GLOBAL) def parse_mental_ill(**args): - min_ill = 3 - c = 0 - flag = False + min_ill = 3 + c = 0 + flag = False - # return True for min_ill '!' in a row - for d in args['data']: - if '!' == d or '?' == d: - c += 1 - else: - c = 0 - if min_ill <= c: - flag = True - break + # return True for min_ill '!' in a row + for d in args['data']: + if '!' == d or '?' == d: + c += 1 + else: + c = 0 + if min_ill <= c: + flag = True + break - if flag: - log.info('sent mental illness reply') - return { - 'msg': ( - '''Multiple exclamation/question marks are a sure sign of mental disease, with %s as a living example.''' % - args['reply_user'] - ) - } + if flag: + log.info('sent mental illness reply') + return { + 'msg': ( + '''Multiple exclamation/question marks are a sure sign of mental disease, with %s as a living example.''' % + args['reply_user'] + ) + } @pluginfunction('debbug', 'parse Debian bug numbers', ptypes_PARSE, ratelimit_class=RATE_NO_SILENCE | RATE_GLOBAL) def parse_debbug(**args): - bugs = re.findall(r'#(\d{4,})', args['data']) - if not bugs: - return None + bugs = re.findall(r'#(\d{4,})', args['data']) + if not bugs: + return None - out = [] - for b in bugs: - log.info('detected Debian bug #%s' % b) + out = [] + for b in bugs: + log.info('detected Debian bug #%s' % b) - url = 'https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=%s' % b - status, title = extract_title(url) + url = 'https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=%s' % b + status, title = extract_title(url) - if 0 == status: - out.append('Debian Bug: %s: %s' % (title, url)) - elif 3 == status: - out.append('error for #%s: %s' % (b, title)) - else: - log.info('unknown status %d' % status) + if 0 == status: + out.append('Debian Bug: %s: %s' % (title, url)) + elif 3 == status: + out.append('error for #%s: %s' % (b, title)) + else: + log.info('unknown status %d' % status) - return { - 'msg': out - } + return { + 'msg': out + } @pluginfunction('cve', 'parse a CVE handle', ptypes_PARSE, ratelimit_class=RATE_NO_SILENCE | RATE_GLOBAL) def parse_cve(**args): - cves = re.findall(r'(CVE-\d\d\d\d-\d+)', args['data'].upper()) - if not cves: - return None + cves = re.findall(r'(CVE-\d\d\d\d-\d+)', args['data'].upper()) + if not cves: + return None - log.info('detected CVE handle') - return { - 'msg': ['https://security-tracker.debian.org/tracker/%s' % c for c in cves] - } + log.info('detected CVE handle') + return { + 'msg': ['https://security-tracker.debian.org/tracker/%s' % c for c in cves] + } @pluginfunction('dsa', 'parse a DSA handle', ptypes_PARSE, ratelimit_class=RATE_NO_SILENCE | RATE_GLOBAL) def parse_dsa(**args): - dsas = re.findall(r'(DSA-\d\d\d\d-\d+)', args['data'].upper()) - if not dsas: - return None + dsas = re.findall(r'(DSA-\d\d\d\d-\d+)', args['data'].upper()) + if not dsas: + return None - log.info('detected DSA handle') - return { - 'msg': ['https://security-tracker.debian.org/tracker/%s' % d for d in dsas] - } + log.info('detected DSA handle') + return { + 'msg': ['https://security-tracker.debian.org/tracker/%s' % d for d in dsas] + } @pluginfunction('skynet', 'parse skynet', ptypes_PARSE, ratelimit_class=RATE_FUN | RATE_GLOBAL) def parse_skynet(**args): - if 'skynet' in args['data'].lower(): - return { - 'msg': 'I\'ll be back.' - } + if 'skynet' in args['data'].lower(): + return { + 'msg': 'I\'ll be back.' + } @pluginfunction('moin', 'parse hi/bye', ptypes_PARSE) def parse_moin(**args): - for direction in [moin_strings_hi, moin_strings_bye]: - for d in direction: - words = re.split(r'\W+', args['data']) + for direction in [moin_strings_hi, moin_strings_bye]: + for d in direction: + words = re.split(r'\W+', args['data']) - # assumption: longer sentences are not greetings - if 3 < len(args['data'].split()): - continue + # assumption: longer sentences are not greetings + if 3 < len(args['data'].split()): + continue - for w in words: - if d.lower() == w.lower(): - if args['reply_user'] in conf('moin-disabled-user'): - log.info('moin blacklist match') - return + for w in words: + if d.lower() == w.lower(): + if args['reply_user'] in conf('moin-disabled-user'): + log.info('moin blacklist match') + return - if args['reply_user'] in conf('moin-modified-user'): - log.info('being "quiet" for %s' % w) - return { - 'msg': '/me %s' % random.choice([ - "doesn't say anything at all", - 'whistles uninterested', - 'just ignores this incident' - ]) - } + if args['reply_user'] in conf('moin-modified-user'): + log.info('being "quiet" for %s' % w) + return { + 'msg': '/me %s' % random.choice([ + "doesn't say anything at all", + 'whistles uninterested', + 'just ignores this incident' + ]) + } - log.info('sent %s reply for %s' % ( - 'hi' if direction is moin_strings_hi else 'bye', w - )) - return { - 'msg': '''%s, %s''' % ( - random.choice(direction), - args['reply_user'] - ) - } + log.info('sent %s reply for %s' % ( + 'hi' if direction is moin_strings_hi else 'bye', w + )) + return { + 'msg': '''%s, %s''' % ( + random.choice(direction), + args['reply_user'] + ) + } @pluginfunction('latex', r'reacts on \LaTeX', ptypes_PARSE, ratelimit_class=RATE_FUN | RATE_GLOBAL) def parse_latex(**args): - if r'\LaTeX' in args['data']: - return { - 'msg': '''LaTeX is way too complex for me, I'm happy with fmt(1)''' - } + if r'\LaTeX' in args['data']: + return { + 'msg': '''LaTeX is way too complex for me, I'm happy with fmt(1)''' + } @pluginfunction('me-action', 'reacts to /me.*%{bot_user}', ptypes_PARSE, ratelimit_class=RATE_FUN | RATE_GLOBAL) def parse_slash_me(**args): - if args['data'].lower().startswith('/me') and (conf('bot_user') in args['data'].lower()): - log.info('sent /me reply') + if args['data'].lower().startswith('/me') and (conf('bot_user') in args['data'].lower()): + log.info('sent /me reply') - me_replys = [ - 'are you that rude to everybody?', - 'oh, thank you...', - 'do you really think that was nice?', - 'that sounds very interesting...', - "excuse me, but I'm already late for an appointment" - ] + me_replys = [ + 'are you that rude to everybody?', + 'oh, thank you...', + 'do you really think that was nice?', + 'that sounds very interesting...', + "excuse me, but I'm already late for an appointment" + ] - return { - 'msg': args['reply_user'] + ': %s' % random.choice(me_replys) - } + return { + 'msg': args['reply_user'] + ': %s' % random.choice(me_replys) + } @pluginfunction('help', 'print help for a command or all known commands', ptypes_COMMAND) def command_help(argv, **args): - command = argv[0] - what = argv[1] if len(argv) > 1 else None + command = argv[0] + what = argv[1] if len(argv) > 1 else None - if 'help' != command: - return + if 'help' != command: + return - if not what: - log.info('empty help request, sent all commands') - commands = args['cmd_list'] - commands.sort() - parsers = args['parser_list'] - parsers.sort() - return { - 'msg': [ - '%s: known commands: %s' % ( - args['reply_user'], ', '.join(commands) - ), - 'known parsers: %s' % ', '.join(parsers) - ] - } + if not what: + log.info('empty help request, sent all commands') + commands = args['cmd_list'] + commands.sort() + parsers = args['parser_list'] + parsers.sort() + return { + 'msg': [ + '%s: known commands: %s' % ( + args['reply_user'], ', '.join(commands) + ), + 'known parsers: %s' % ', '.join(parsers) + ] + } - for p in plugins[ptypes_COMMAND] + plugins[ptypes_PARSE]: - if what == p.plugin_name: - log.info('sent help for %s' % what) - return { - 'msg': args['reply_user'] + ': help for %s %s %s: %s' % ( - 'enabled' if plugin_enabled_get(p) else 'disabled', - 'parser' if p.plugin_type == ptypes_PARSE else 'command', - what, p.plugin_desc - ) - } - log.info('no help found for %s' % what) - return { - 'msg': args['reply_user'] + ': no such command: %s' % what - } + for p in plugins[ptypes_COMMAND] + plugins[ptypes_PARSE]: + if what == p.plugin_name: + log.info('sent help for %s' % what) + return { + 'msg': args['reply_user'] + ': help for %s %s %s: %s' % ( + 'enabled' if plugin_enabled_get(p) else 'disabled', + 'parser' if p.plugin_type == ptypes_PARSE else 'command', + what, p.plugin_desc + ) + } + log.info('no help found for %s' % what) + return { + 'msg': args['reply_user'] + ': no such command: %s' % what + } @pluginfunction('version', 'prints version', ptypes_COMMAND) def command_version(argv, **args): - if 'version' != argv[0]: - return + if 'version' != argv[0]: + return - log.info('sent version string') - return { - 'msg': args['reply_user'] + (''': I'm running ''' + VERSION) - } + log.info('sent version string') + return { + 'msg': args['reply_user'] + (''': I'm running ''' + VERSION) + } @pluginfunction('klammer', 'prints an anoying paper clip aka. Karl Klammer', ptypes_COMMAND, - ratelimit_class=RATE_FUN | RATE_GLOBAL) + ratelimit_class=RATE_FUN | RATE_GLOBAL) def command_klammer(argv, **args): - if 'klammer' != argv[0]: - return + if 'klammer' != argv[0]: + return - log.info('sent karl klammer') - return { - 'msg': ( - args['reply_user'] + ',', - r''' _, Was moechten''', - r'''( _\_ Sie tun?''', - r''' \0 O\ ''', - r''' \\ \\ [ ] ja ''', - r''' \`' ) [ ] noe''', - r''' `'' ''' - ) - } + log.info('sent karl klammer') + return { + 'msg': ( + args['reply_user'] + ',', + r''' _, Was moechten''', + r'''( _\_ Sie tun?''', + r''' \0 O\ ''', + r''' \\ \\ [ ] ja ''', + r''' \`' ) [ ] noe''', + r''' `'' ''' + ) + } @pluginfunction('unikot', 'prints an unicode string', ptypes_COMMAND, ratelimit_class=RATE_FUN | RATE_GLOBAL) def command_unicode(argv, **args): - if 'unikot' != argv[0]: - return + if 'unikot' != argv[0]: + return - log.info('sent some unicode') - return { - 'msg': ( - args['reply_user'] + ''', here's some''', - '''┌────────┐''', - '''│Unicode!│''', - '''└────────┘''' - ) - } + log.info('sent some unicode') + return { + 'msg': ( + args['reply_user'] + ''', here's some''', + '''┌────────┐''', + '''│Unicode!│''', + '''└────────┘''' + ) + } @pluginfunction('source', 'prints git URL', ptypes_COMMAND) def command_source(argv, **_): - if argv[0] not in ('source', 'src'): - return + if argv[0] not in ('source', 'src'): + return - log.info('sent source URL') - return { - 'msg': 'My source code can be found at %s' % conf('src-url') - } + log.info('sent source URL') + return { + 'msg': 'My source code can be found at %s' % conf('src-url') + } @pluginfunction('dice', 'rolls a dice, optional N times', ptypes_COMMAND, ratelimit_class=RATE_INTERACTIVE) def command_dice(argv, **args): - if 'dice' != argv[0]: - return - try: - count = 1 if len(argv) < 2 else int(argv[1]) - except ValueError as e: - return { - 'msg': '%s: dice: error when parsing int(%s): %s' % ( - args['reply_user'], argv[1], str(e) - ) - } + if 'dice' != argv[0]: + return + try: + count = 1 if len(argv) < 2 else int(argv[1]) + except ValueError as e: + return { + 'msg': '%s: dice: error when parsing int(%s): %s' % ( + args['reply_user'], argv[1], str(e) + ) + } - if 0 >= count or 5 <= count: - return { - 'msg': '%s: dice: invalid arguments (0 < N < 5)' % args['reply_user'] - } + if 0 >= count or 5 <= count: + return { + 'msg': '%s: dice: invalid arguments (0 < N < 5)' % args['reply_user'] + } - dice_char = ['◇', '⚀', '⚁', '⚂', '⚃', '⚄', '⚅'] + dice_char = ['◇', '⚀', '⚁', '⚂', '⚃', '⚄', '⚅'] - msg = 'rolling %s for %s:' % ( - 'a dice' if 1 == count else '%d dices' % count, args['reply_user'] - ) + msg = 'rolling %s for %s:' % ( + 'a dice' if 1 == count else '%d dices' % count, args['reply_user'] + ) - for i in range(count): - if args['reply_user'] in conf('enhanced-random-user'): - rnd = 0 # this might confuse users. good. - log.info('sent random (enhanced)') - else: - rnd = random.randint(1, 6) - log.info('sent random') + for i in range(count): + if args['reply_user'] in conf('enhanced-random-user'): + rnd = 0 # this might confuse users. good. + log.info('sent random (enhanced)') + else: + rnd = random.randint(1, 6) + log.info('sent random') - # the \u200b chars ('ZERO WIDTH SPACE') avoid interpreting stuff as smileys - # by some strange clients - msg += ' %s (\u200b%d\u200b)' % (dice_char[rnd], rnd) + # the \u200b chars ('ZERO WIDTH SPACE') avoid interpreting stuff as smileys + # by some strange clients + msg += ' %s (\u200b%d\u200b)' % (dice_char[rnd], rnd) - return { - 'msg': msg - } + return { + 'msg': msg + } @pluginfunction('choose', 'chooses randomly between arguments', ptypes_COMMAND, ratelimit_class=RATE_INTERACTIVE) def command_choose(argv, **args): - if 'choose' != argv[0]: - return + if 'choose' != argv[0]: + return - alternatives = argv[1:] + alternatives = argv[1:] - if 2 > len(alternatives): - return { - 'msg': '%s: choosing between one or less things is pointless' % args['reply_user'] - } + if 2 > len(alternatives): + return { + 'msg': '%s: choosing between one or less things is pointless' % args['reply_user'] + } - choice = random.choice(alternatives) + choice = random.choice(alternatives) - log.info('sent random choice') - return { - 'msg': '%s: I prefer %s!' % (args['reply_user'], choice) - } + log.info('sent random choice') + return { + 'msg': '%s: I prefer %s!' % (args['reply_user'], choice) + } @pluginfunction('uptime', 'prints uptime', ptypes_COMMAND) def command_uptime(argv, **args): - if 'uptime' != argv[0]: - return + if 'uptime' != argv[0]: + return - u = int(conf('uptime') + time.time()) - plural_uptime = 's' - plural_request = 's' + u = int(conf('uptime') + time.time()) + plural_uptime = 's' + plural_request = 's' - if 1 == u: - plural_uptime = '' - if 1 == conf('request_counter'): - plural_request = '' + if 1 == u: + plural_uptime = '' + if 1 == conf('request_counter'): + plural_request = '' - log.info('sent statistics') - return { - 'msg': args['reply_user'] + (''': happily serving for %d second%s, %d request%s so far.''' % ( - u, plural_uptime, int(conf('request_counter')), plural_request)) - } + log.info('sent statistics') + return { + 'msg': args['reply_user'] + (''': happily serving for %d second%s, %d request%s so far.''' % ( + u, plural_uptime, int(conf('request_counter')), plural_request)) + } @pluginfunction('ping', 'sends pong', ptypes_COMMAND, ratelimit_class=RATE_INTERACTIVE) def command_ping(argv, **args): - if 'ping' != argv[0]: - return + if 'ping' != argv[0]: + return - rnd = random.randint(0, 3) # 1:4 - if 0 == rnd: - msg = args['reply_user'] + ''': peng (You're dead now.)''' - log.info('sent pong (variant)') - elif 1 == rnd: - msg = args['reply_user'] + ''': I don't like you, leave me alone.''' - log.info('sent pong (dontlike)') - else: - msg = args['reply_user'] + ''': pong''' - log.info('sent pong') + rnd = random.randint(0, 3) # 1:4 + if 0 == rnd: + msg = args['reply_user'] + ''': peng (You're dead now.)''' + log.info('sent pong (variant)') + elif 1 == rnd: + msg = args['reply_user'] + ''': I don't like you, leave me alone.''' + log.info('sent pong (dontlike)') + else: + msg = args['reply_user'] + ''': pong''' + log.info('sent pong') - return { - 'msg': msg - } + return { + 'msg': msg + } @pluginfunction('info', 'prints info message', ptypes_COMMAND) def command_info(argv, **args): - if 'info' != argv[0]: - return + if 'info' != argv[0]: + return - log.info('sent long info') - return { - 'msg': args['reply_user'] + ( - ''': I'm a bot, my job is to extract tags from posted URLs. In case I'm annoying or for further - questions, please talk to my master %s. I'm rate limited. - To make me exit immediately, highlight me with 'hangup' in the message - (emergency only, please). For other commands, highlight me with 'help'.''' % ( - conf('bot_owner'))) - } + log.info('sent long info') + return { + 'msg': args['reply_user'] + ( + ''': I'm a bot, my job is to extract <title> tags from posted URLs. In case I'm annoying or for further + questions, please talk to my master %s. I'm rate limited. + To make me exit immediately, highlight me with 'hangup' in the message + (emergency only, please). For other commands, highlight me with 'help'.''' % ( + conf('bot_owner'))) + } @pluginfunction('teatimer', 'sets a tea timer to $1 or currently %d seconds' % conf('tea_steep_time'), ptypes_COMMAND) def command_teatimer(argv, **args): - if 'teatimer' != argv[0]: - return + if 'teatimer' != argv[0]: + return - steep = conf('tea_steep_time') + steep = conf('tea_steep_time') - if len(argv) > 1: - try: - steep = int(argv[1]) - except Exception as e: - return { - 'msg': args['reply_user'] + ': error when parsing int(%s): %s' % ( - argv[1], str(e) - ) - } + if len(argv) > 1: + try: + steep = int(argv[1]) + except Exception as e: + return { + 'msg': args['reply_user'] + ': error when parsing int(%s): %s' % ( + argv[1], str(e) + ) + } - ready = time.time() + steep + ready = time.time() + steep - try: - log.info('tea timer set to %s' % time.strftime('%F.%T', time.localtime(ready))) - except (ValueError, OverflowError) as e: - return { - 'msg': args['reply_user'] + ': time format error: ' + str(e) - } + try: + log.info('tea timer set to %s' % time.strftime('%F.%T', time.localtime(ready))) + except (ValueError, OverflowError) as e: + return { + 'msg': args['reply_user'] + ': time format error: ' + str(e) + } - return { - 'msg': args['reply_user'] + ': Tea timer set to %s' % time.strftime( - '%F.%T', time.localtime(ready) - ), - 'event': { - 'time': ready, - 'msg': (args['reply_user'] + ': Your tea is ready!') - } - } + return { + 'msg': args['reply_user'] + ': Tea timer set to %s' % time.strftime( + '%F.%T', time.localtime(ready) + ), + 'event': { + 'time': ready, + 'msg': (args['reply_user'] + ': Your tea is ready!') + } + } @pluginfunction('decode', 'prints the long description of an unicode character', ptypes_COMMAND, - ratelimit_class=RATE_INTERACTIVE) + ratelimit_class=RATE_INTERACTIVE) def command_decode(argv, **args): - if 'decode' != argv[0]: - return + if 'decode' != argv[0]: + return - if len(argv) <= 1: - return { - 'msg': args['reply_user'] + ': usage: decode {single character}' - } + if len(argv) <= 1: + return { + 'msg': args['reply_user'] + ': usage: decode {single character}' + } - log.info('decode called for %s' % argv[1]) + log.info('decode called for %s' % argv[1]) - out = [] - for i, char in enumerate(argv[1]): - if i > 9: - out.append('... limit reached.') - break + out = [] + for i, char in enumerate(argv[1]): + if i > 9: + out.append('... limit reached.') + break - char_esc = str(char.encode('unicode_escape'))[3:-1] + char_esc = str(char.encode('unicode_escape'))[3:-1] - if 0 == len(char_esc): - char_esc = '' - else: - char_esc = ' (%s)' % char_esc + if 0 == len(char_esc): + char_esc = '' + else: + char_esc = ' (%s)' % char_esc - try: - uni_name = unicodedata.name(char) - except Exception as e: - log.info('decode(%s) failed: %s' % (char, e)) - out.append("can't decode %s%s: %s" % (char, char_esc, e)) - continue + try: + uni_name = unicodedata.name(char) + except Exception as e: + log.info('decode(%s) failed: %s' % (char, e)) + out.append("can't decode %s%s: %s" % (char, char_esc, e)) + continue - out.append('%s%s is called "%s"' % (char, char_esc, uni_name)) + out.append('%s%s is called "%s"' % (char, char_esc, uni_name)) - if 1 == len(out): - return { - 'msg': args['reply_user'] + ': %s' % out[0] - } - else: - return { - 'msg': [args['reply_user'] + ': decoding %s:' % argv[1]] + out - } + if 1 == len(out): + return { + 'msg': args['reply_user'] + ': %s' % out[0] + } + else: + return { + 'msg': [args['reply_user'] + ': decoding %s:' % argv[1]] + out + } @pluginfunction('show-blacklist', 'show the current URL blacklist, optionally filtered', ptypes_COMMAND) def command_show_blacklist(argv, **args): - if 'show-blacklist' != argv[0]: - return + if 'show-blacklist' != argv[0]: + return - log.info('sent URL blacklist') + log.info('sent URL blacklist') - argv1 = None if len(argv) < 2 else argv[1] + argv1 = None if len(argv) < 2 else argv[1] - return { - 'msg': [ - args['reply_user'] + ': URL blacklist%s: ' % ( - '' if not argv1 else ' (limited to %s)' % argv1 - ) - ] + [ - b for b in conf('url_blacklist') if not argv1 or argv1 in b - ] - } + return { + 'msg': [ + args['reply_user'] + ': URL blacklist%s: ' % ( + '' if not argv1 else ' (limited to %s)' % argv1 + ) + ] + [ + b for b in conf('url_blacklist') if not argv1 or argv1 in b + ] + } def usersetting_get(argv, args): - blob = conf_load() + blob = conf_load() - arg_user = args['reply_user'] - arg_key = argv[1] + arg_user = args['reply_user'] + arg_key = argv[1] - if arg_user not in blob['user_pref']: - return { - 'msg': args['reply_user'] + ': user key not found' - } + if arg_user not in blob['user_pref']: + return { + 'msg': args['reply_user'] + ': user key not found' + } - return { - 'msg': args['reply_user'] + ': %s == %s' % ( - arg_key, - 'on' if blob['user_pref'][arg_user][arg_key] else 'off' - ) - } + return { + 'msg': args['reply_user'] + ': %s == %s' % ( + arg_key, + 'on' if blob['user_pref'][arg_user][arg_key] else 'off' + ) + } @pluginfunction('set', 'modify a user setting', ptypes_COMMAND, ratelimit_class=RATE_NO_LIMIT) def command_usersetting(argv, **args): - if 'set' != argv[0]: - return + if 'set' != argv[0]: + return - settings = ['spoiler'] - arg_user = args['reply_user'] - arg_key = argv[1] if len(argv) > 1 else None - arg_val = argv[2] if len(argv) > 2 else None + settings = ['spoiler'] + arg_user = args['reply_user'] + arg_key = argv[1] if len(argv) > 1 else None + arg_val = argv[2] if len(argv) > 2 else None - if arg_key not in settings: - return { - 'msg': args['reply_user'] + ': known settings: ' + (', '.join(settings)) - } + if arg_key not in settings: + return { + 'msg': args['reply_user'] + ': known settings: ' + (', '.join(settings)) + } - if arg_val not in ['on', 'off', None]: - return { - 'msg': args['reply_user'] + ': possible values for %s: on, off' % arg_key - } + if arg_val not in ['on', 'off', None]: + return { + 'msg': args['reply_user'] + ': possible values for %s: on, off' % arg_key + } - if not arg_val: - # display current value - return usersetting_get(argv, args) + if not arg_val: + # display current value + return usersetting_get(argv, args) - if conf('persistent_locked'): - return { - 'msg': args['reply_user'] + ''': couldn't get exclusive lock''' - } + if conf('persistent_locked'): + return { + 'msg': args['reply_user'] + ''': couldn't get exclusive lock''' + } - set_conf('persistent_locked', True) - blob = conf_load() + set_conf('persistent_locked', True) + blob = conf_load() - if 'user_pref' not in blob: - blob['user_pref'] = {} + if 'user_pref' not in blob: + blob['user_pref'] = {} - if arg_user not in blob['user_pref']: - blob['user_pref'][arg_user] = {} + if arg_user not in blob['user_pref']: + blob['user_pref'][arg_user] = {} - blob['user_pref'][arg_user][arg_key] = 'on' == arg_val + blob['user_pref'][arg_user][arg_key] = 'on' == arg_val - conf_save(blob) - set_conf('persistent_locked', False) + conf_save(blob) + set_conf('persistent_locked', False) - # display value written to db - return usersetting_get(argv, args) + # display value written to db + return usersetting_get(argv, args) @pluginfunction('cake', 'displays a cake ASCII art', ptypes_COMMAND, ratelimit_class=RATE_FUN | RATE_GLOBAL) def command_cake(argv, **args): - if 'cake' != argv[0]: - return + if 'cake' != argv[0]: + return - return { - 'msg': args['reply_user'] + ': %s' % (random.sample(cakes, 1)[0]) - } + return { + 'msg': args['reply_user'] + ': %s' % (random.sample(cakes, 1)[0]) + } # TODO: send a hint if someone types plugin as command @pluginfunction('plugin', "'disable' or 'enable' plugins", ptypes_COMMAND) def command_plugin_activation(argv, **args): - command = argv[0] - plugin = argv[1] if len(argv) > 1 else None + command = argv[0] + plugin = argv[1] if len(argv) > 1 else None - if command not in ('enable', 'disable'): - return + if command not in ('enable', 'disable'): + return - log.info('plugin activation plugin called') + log.info('plugin activation plugin called') - if not plugin: - return { - 'msg': args['reply_user'] + ': no plugin given' - } - elif command_plugin_activation.plugin_name == plugin: - return { - 'msg': args['reply_user'] + ': not allowed' - } + if not plugin: + return { + 'msg': args['reply_user'] + ': no plugin given' + } + elif command_plugin_activation.plugin_name == plugin: + return { + 'msg': args['reply_user'] + ': not allowed' + } - for p in plugins[ptypes_COMMAND] + plugins[ptypes_PARSE]: - if p.plugin_name == plugin: - plugin_enabled_set(p, 'enable' == command) + for p in plugins[ptypes_COMMAND] + plugins[ptypes_PARSE]: + if p.plugin_name == plugin: + plugin_enabled_set(p, 'enable' == command) - return { - 'msg': args['reply_user'] + ': %sd %s' % ( - command, plugin - ) - } + return { + 'msg': args['reply_user'] + ': %sd %s' % ( + command, plugin + ) + } - return { - 'msg': args['reply_user'] + ': unknown plugin %s' % plugin - } + return { + 'msg': args['reply_user'] + ': unknown plugin %s' % plugin + } @pluginfunction('wp-en', 'crawl the english Wikipedia', ptypes_COMMAND) def command_wp_en(argv, **args): - if 'wp-en' != argv[0]: - return + if 'wp-en' != argv[0]: + return - if argv[0]: - argv[0] = 'wp' + if argv[0]: + argv[0] = 'wp' - return command_wp(argv, lang='en', **args) + return command_wp(argv, lang='en', **args) @pluginfunction('wp', 'crawl the german Wikipedia', ptypes_COMMAND) def command_wp(argv, lang='de', **args): - if 'wp' != argv[0]: - return + if 'wp' != argv[0]: + return - query = ' '.join(argv[1:]) + query = ' '.join(argv[1:]) - if query == '': - return { - 'msg': args['reply_user'] + ': no query given' - } + if query == '': + return { + 'msg': args['reply_user'] + ': no query given' + } - api = { - 'action': 'query', - 'prop': 'extracts', - 'explaintext': '', - 'redirects': '', - 'exsentences': 2, - 'continue': '', - 'format': 'json', - 'titles': query - } - apiurl = 'https://%s.wikipedia.org/w/api.php?%s' % ( - lang, urllib.parse.urlencode(api) - ) + api = { + 'action': 'query', + 'prop': 'extracts', + 'explaintext': '', + 'redirects': '', + 'exsentences': 2, + 'continue': '', + 'format': 'json', + 'titles': query + } + apiurl = 'https://%s.wikipedia.org/w/api.php?%s' % ( + lang, urllib.parse.urlencode(api) + ) - log.info('fetching %s' % apiurl) + log.info('fetching %s' % apiurl) - try: - response = urllib.request.urlopen(apiurl) - buf = response.read(BUFSIZ) - j = json.loads(buf.decode('utf8')) + try: + response = urllib.request.urlopen(apiurl) + buf = response.read(BUFSIZ) + j = json.loads(buf.decode('utf8')) - page = next(iter(j['query']['pages'].values())) - short = page.get('extract', None) - linktitle = page.get('title', query).replace(' ', '_') - link = 'https://%s.wikipedia.org/wiki/%s' % ( - lang, urllib.parse.quote(linktitle) - ) - except Exception as e: - log.info('wp(%s) failed: %s, %s' % (query, e, traceback.format_exc())) - return { - 'msg': args['reply_user'] + ': something failed: %s' % e - } + page = next(iter(j['query']['pages'].values())) + short = page.get('extract', None) + linktitle = page.get('title', query).replace(' ', '_') + link = 'https://%s.wikipedia.org/wiki/%s' % ( + lang, urllib.parse.quote(linktitle) + ) + except Exception as e: + log.info('wp(%s) failed: %s, %s' % (query, e, traceback.format_exc())) + return { + 'msg': args['reply_user'] + ': something failed: %s' % e + } - if short is not None: - return { - 'msg': args['reply_user'] + ': %s (<%s>)' % ( - short if short.strip() else '(nix)', link - ) - } - elif 'missing' in page: - return { - 'msg': 'Article "%s" not found' % page.get('title', query) - } - else: - return { - 'msg': 'json data seem to be broken' - } + if short is not None: + return { + 'msg': args['reply_user'] + ': %s (<%s>)' % ( + short if short.strip() else '(nix)', link + ) + } + elif 'missing' in page: + return { + 'msg': 'Article "%s" not found' % page.get('title', query) + } + else: + return { + 'msg': 'json data seem to be broken' + } @pluginfunction('excuse', 'prints BOFH style excuses', ptypes_COMMAND) def command_excuse(argv, **args): - if 'excuse' != argv[0]: - return + if 'excuse' != argv[0]: + return - log.info('BOFH plugin called') + log.info('BOFH plugin called') - excuse = random.sample(excuses, 1)[0] + excuse = random.sample(excuses, 1)[0] - return { - 'msg': args['reply_user'] + ': ' + excuse - } + return { + 'msg': args['reply_user'] + ': ' + excuse + } @pluginfunction('show-moinlist', 'show the current moin reply list, optionally filtered', ptypes_COMMAND) def command_show_moinlist(argv, **args): - if 'show-moinlist' != argv[0]: - return + if 'show-moinlist' != argv[0]: + return - log.info('sent moin reply list') + log.info('sent moin reply list') - argv1 = None if len(argv) < 2 else argv[1] + argv1 = None if len(argv) < 2 else argv[1] - return { - 'msg': - '%s: moin reply list%s: %s' % ( - args['reply_user'], - '' if not argv1 else ' (limited to %s)' % argv1, - ', '.join([ - b for b in moin_strings_hi + moin_strings_bye - if not argv1 or argv1.lower() in b.lower() - ]) - ) - } + return { + 'msg': + '%s: moin reply list%s: %s' % ( + args['reply_user'], + '' if not argv1 else ' (limited to %s)' % argv1, + ', '.join([ + b for b in moin_strings_hi + moin_strings_bye + if not argv1 or argv1.lower() in b.lower() + ]) + ) + } @pluginfunction('list', 'list plugin and parser status', ptypes_COMMAND) def command_list(argv, **args): - if 'list' != argv[0]: - return + if 'list' != argv[0]: + return - log.info('list plugin called') + log.info('list plugin called') - if 'enabled' in argv and 'disabled' in argv: - return { - 'msg': args['reply_user'] + ": both 'enabled' and 'disabled' makes no sense" - } + if 'enabled' in argv and 'disabled' in argv: + return { + 'msg': args['reply_user'] + ": both 'enabled' and 'disabled' makes no sense" + } - # if not given, asume both - if 'command' not in argv and 'parser' not in argv: - argv.append('command') - argv.append('parser') + # if not given, asume both + if 'command' not in argv and 'parser' not in argv: + argv.append('command') + argv.append('parser') - out_command = [] - out_parser = [] - if 'command' in argv: - out_command = plugins[ptypes_COMMAND] - if 'parser' in argv: - out_parser = plugins[ptypes_PARSE] - if 'enabled' in argv: - out_command = [p for p in out_command if plugin_enabled_get(p)] - out_parser = [p for p in out_parser if plugin_enabled_get(p)] - if 'disabled' in argv: - out_command = [p for p in out_command if not plugin_enabled_get(p)] - out_parser = [p for p in out_parser if not plugin_enabled_get(p)] + out_command = [] + out_parser = [] + if 'command' in argv: + out_command = plugins[ptypes_COMMAND] + if 'parser' in argv: + out_parser = plugins[ptypes_PARSE] + if 'enabled' in argv: + out_command = [p for p in out_command if plugin_enabled_get(p)] + out_parser = [p for p in out_parser if plugin_enabled_get(p)] + if 'disabled' in argv: + out_command = [p for p in out_command if not plugin_enabled_get(p)] + out_parser = [p for p in out_parser if not plugin_enabled_get(p)] - msg = [args['reply_user'] + ': list of plugins:'] + msg = [args['reply_user'] + ': list of plugins:'] - if out_command: - msg.append('commands: %s' % ', '.join([p.plugin_name for p in out_command])) - if out_parser: - msg.append('parsers: %s' % ', '.join([p.plugin_name for p in out_parser])) - return {'msg': msg} + if out_command: + msg.append('commands: %s' % ', '.join([p.plugin_name for p in out_command])) + if out_parser: + msg.append('parsers: %s' % ', '.join([p.plugin_name for p in out_parser])) + return {'msg': msg} @pluginfunction( - 'record', 'record a message for a now offline user (usage: record {user} {some message})', ptypes_COMMAND) + 'record', 'record a message for a now offline user (usage: record {user} {some message})', ptypes_COMMAND) def command_record(argv, **args): - if 'record' != argv[0]: - return + if 'record' != argv[0]: + return - if 3 > len(argv): - return { - 'msg': '%s: usage: record {user} {some message}' % args['reply_user'] - } + if 3 > len(argv): + return { + 'msg': '%s: usage: record {user} {some message}' % args['reply_user'] + } - target_user = argv[1].lower() - message = '%s (%s): ' % (args['reply_user'], time.strftime('%F.%T')) - message += ' '.join(argv[2:]) + target_user = argv[1].lower() + message = '%s (%s): ' % (args['reply_user'], time.strftime('%F.%T')) + message += ' '.join(argv[2:]) - if conf('persistent_locked'): - return { - 'msg': "%s: couldn't get exclusive lock" % args['reply_user'] - } + if conf('persistent_locked'): + return { + 'msg': "%s: couldn't get exclusive lock" % args['reply_user'] + } - set_conf('persistent_locked', True) - blob = conf_load() + set_conf('persistent_locked', True) + blob = conf_load() - if 'user_records' not in blob: - blob['user_records'] = {} + if 'user_records' not in blob: + blob['user_records'] = {} - if target_user not in blob['user_records']: - blob['user_records'][target_user] = [] + if target_user not in blob['user_records']: + blob['user_records'][target_user] = [] - blob['user_records'][target_user].append(message) + blob['user_records'][target_user].append(message) - conf_save(blob) - set_conf('persistent_locked', False) + conf_save(blob) + set_conf('persistent_locked', False) - return { - 'msg': '%s: message saved for %s' % (args['reply_user'], target_user) - } + return { + 'msg': '%s: message saved for %s' % (args['reply_user'], target_user) + } @pluginfunction('show-records', 'show current offline records', ptypes_COMMAND) def command_show_recordlist(argv, **args): - if 'show-records' != argv[0]: - return + if 'show-records' != argv[0]: + return - log.info('sent offline records list') + log.info('sent offline records list') - argv1 = None if len(argv) < 2 else argv[1] + argv1 = None if len(argv) < 2 else argv[1] - return { - 'msg': - '%s: offline records%s: %s' % ( - args['reply_user'], - '' if not argv1 else ' (limited to %s)' % argv1, - ', '.join([ - '%s (%d)' % (key, len(val)) for key, val in conf_load().get('user_records').items() - if not argv1 or argv1.lower() in key.lower() - ]) - ) - } + return { + 'msg': + '%s: offline records%s: %s' % ( + args['reply_user'], + '' if not argv1 else ' (limited to %s)' % argv1, + ', '.join([ + '%s (%d)' % (key, len(val)) for key, val in conf_load().get('user_records').items() + if not argv1 or argv1.lower() in key.lower() + ]) + ) + } @pluginfunction('dsa-watcher', 'automatically crawls for newly published Debian Security Announces', ptypes_COMMAND, - ratelimit_class=RATE_NO_SILENCE) + ratelimit_class=RATE_NO_SILENCE) def command_dsa_watcher(argv, **_): - """ - TODO: rewrite so that a last_dsa_date is used instead, then all DSAs since then printed and the date set to now() - """ - if 'dsa-watcher' != argv[0]: - return + """ + TODO: rewrite so that a last_dsa_date is used instead, then all DSAs since then printed and the date set to now() + """ + if 'dsa-watcher' != argv[0]: + return - if 2 != len(argv): - msg = 'wrong number of arguments' - log.warn(msg) - return {'msg': msg} + if 2 != len(argv): + msg = 'wrong number of arguments' + log.warn(msg) + return {'msg': msg} - if 'crawl' == argv[1]: - out = [] - dsa = conf_load().get('plugin_conf', {}).get('last_dsa', 1000) + if 'crawl' == argv[1]: + out = [] + dsa = conf_load().get('plugin_conf', {}).get('last_dsa', 1000) - url = 'https://security-tracker.debian.org/tracker/DSA-%d-1' % dsa + url = 'https://security-tracker.debian.org/tracker/DSA-%d-1' % dsa - try: - request = urllib.request.Request(url) - request.add_header('User-Agent', USER_AGENT) - response = urllib.request.urlopen(request) - html_text = response.read(BUFSIZ) # ignore more than BUFSIZ - except Exception as e: - err = e - if '404' not in str(err): - msg = 'error for %s: %s' % (url, err) - log.warn(msg) - out.append(msg) - else: - if str != type(html_text): - html_text = str(html_text) + try: + request = urllib.request.Request(url) + request.add_header('User-Agent', USER_AGENT) + response = urllib.request.urlopen(request) + html_text = response.read(BUFSIZ) # ignore more than BUFSIZ + except Exception as e: + err = e + if '404' not in str(err): + msg = 'error for %s: %s' % (url, err) + log.warn(msg) + out.append(msg) + else: + if str != type(html_text): + html_text = str(html_text) - result = re.match(r'.*?Description</b></td><td>(.*?)</td>.*?', html_text, re.S | re.M | re.IGNORECASE) + result = re.match(r'.*?Description</b></td><td>(.*?)</td>.*?', html_text, re.S | re.M | re.IGNORECASE) - package = 'error extracting package name' - if result: - package = result.groups()[0] + package = 'error extracting package name' + if result: + package = result.groups()[0] - if conf('persistent_locked'): - msg = "couldn't get exclusive lock" - log.warn(msg) - out.append(msg) - else: - set_conf('persistent_locked', True) - blob = conf_load() + if conf('persistent_locked'): + msg = "couldn't get exclusive lock" + log.warn(msg) + out.append(msg) + else: + set_conf('persistent_locked', True) + blob = conf_load() - if 'plugin_conf' not in blob: - blob['plugin_conf'] = {} + if 'plugin_conf' not in blob: + blob['plugin_conf'] = {} - if 'last_dsa' not in blob['plugin_conf']: - blob['plugin_conf']['last_dsa'] = 3308 # FIXME: fixed value + if 'last_dsa' not in blob['plugin_conf']: + blob['plugin_conf']['last_dsa'] = 3308 # FIXME: fixed value - blob['plugin_conf']['last_dsa'] += 1 + blob['plugin_conf']['last_dsa'] += 1 - conf_save(blob) - set_conf('persistent_locked', False) + conf_save(blob) + set_conf('persistent_locked', False) - msg = ( - 'new Debian Security Announce found (%s): %s' % (str(package).replace(' - security update', ''), url)) - out.append(msg) + msg = ( + 'new Debian Security Announce found (%s): %s' % (str(package).replace(' - security update', ''), url)) + out.append(msg) - log.info('no dsa for %d, trying again...' % dsa) - # that's good, no error, just 404 -> DSA not released yet + log.info('no dsa for %d, trying again...' % dsa) + # that's good, no error, just 404 -> DSA not released yet - crawl_at = time.time() + conf('dsa_watcher_interval') - # register_event(crawl_at, command_dsa_watcher, (['dsa-watcher', 'crawl'],)) + crawl_at = time.time() + conf('dsa_watcher_interval') + # register_event(crawl_at, command_dsa_watcher, (['dsa-watcher', 'crawl'],)) - msg = 'next crawl set to %s' % time.strftime('%F.%T', time.localtime(crawl_at)) - out.append(msg) - return { - 'msg': out, - 'event': { - 'time': crawl_at, - 'command': (command_dsa_watcher, (['dsa-watcher', 'crawl'],)) - } - } - else: - msg = 'wrong argument' - log.warn(msg) - return {'msg': msg} + msg = 'next crawl set to %s' % time.strftime('%F.%T', time.localtime(crawl_at)) + out.append(msg) + return { + 'msg': out, + 'event': { + 'time': crawl_at, + 'command': (command_dsa_watcher, (['dsa-watcher', 'crawl'],)) + } + } + else: + msg = 'wrong argument' + log.warn(msg) + return {'msg': msg} @pluginfunction("provoke-bots", "search for other bots", ptypes_COMMAND) def provoke_bots(argv, **args): - if 'provoke-bots' == argv[0]: - return { - 'msg': 'Searching for other less intelligent lifeforms... skynet? You here?' - } + if 'provoke-bots' == argv[0]: + return { + 'msg': 'Searching for other less intelligent lifeforms... skynet? You here?' + } @pluginfunction("recognize_bots", "got ya", ptypes_PARSE) def recognize_bots(**args): - unique_standard_phrases = ( - 'independent bot and have nothing to do with other artificial intelligence systems', - 'new Debian Security Announce', - 'I\'m a bot (highlight me', - ) + unique_standard_phrases = ( + 'independent bot and have nothing to do with other artificial intelligence systems', + 'new Debian Security Announce', + 'I\'m a bot (highlight me', + ) - def _add_to_list(username, message): - blob = conf_load() + def _add_to_list(username, message): + blob = conf_load() - if 'other_bots' not in blob: - blob['other_bots'] = [] - if username not in blob['other_bots']: - blob['other_bots'].append(username) - conf_save(blob) - return { - 'event': { - 'time': time.time() + 3, - 'msg': message - } - } + if 'other_bots' not in blob: + blob['other_bots'] = [] + if username not in blob['other_bots']: + blob['other_bots'].append(username) + conf_save(blob) + return { + 'event': { + 'time': time.time() + 3, + 'msg': message + } + } - if any([phrase in args['data'] for phrase in unique_standard_phrases]): - _add_to_list(args['reply_user'], 'Making notes...') - elif 'I\'ll be back' in args['data']: - _add_to_list(args['reply_user'], 'Hey there, buddy!') + if any([phrase in args['data'] for phrase in unique_standard_phrases]): + _add_to_list(args['reply_user'], 'Making notes...') + elif 'I\'ll be back' in args['data']: + _add_to_list(args['reply_user'], 'Hey there, buddy!') @pluginfunction("remove-from-botlist", "remove a user from the botlist", ptypes_COMMAND) def remove_from_botlist(argv, **args): - if 'remove-from-botlist' != argv[0]: - return + if 'remove-from-botlist' != argv[0]: + return - if len(argv) != 2: - return {'msg': "wrong number of arguments!"} + if len(argv) != 2: + return {'msg': "wrong number of arguments!"} - blob = conf_load() + blob = conf_load() - if args['reply_user'] != conf('bot_owner'): - return {'msg': "only %s may do this!" % conf('bot_owner')} + if args['reply_user'] != conf('bot_owner'): + return {'msg': "only %s may do this!" % conf('bot_owner')} - if argv[1] in blob.get('other_bots', ()): - blob['other_bots'].pop(blob['other_bots'].index(argv[1])) + if argv[1] in blob.get('other_bots', ()): + blob['other_bots'].pop(blob['other_bots'].index(argv[1])) - conf_save(blob) - return {'msg': '%s was removed from the botlist.' % argv[1]} - else: - return False + conf_save(blob) + return {'msg': '%s was removed from the botlist.' % argv[1]} + else: + return False @pluginfunction("set-status", "set bot status", ptypes_COMMAND) def set_status(argv, **args): - if 'set-status' != argv[0] or len(argv) != 2: - return + if 'set-status' != argv[0] or len(argv) != 2: + return - if argv[1] == 'mute' and args['reply_user'] == conf('bot_owner'): - return { - 'presence': { - 'status': 'xa', - 'msg': 'I\'m muted now. You can unmute me with "%s: set_status unmute"' % conf("bot_user") - } - } - elif argv[1] == 'unmute' and args['reply_user'] == conf('bot_owner'): - return { - 'presence': { - 'status': None, - 'msg': '' - } - } + if argv[1] == 'mute' and args['reply_user'] == conf('bot_owner'): + return { + 'presence': { + 'status': 'xa', + 'msg': 'I\'m muted now. You can unmute me with "%s: set_status unmute"' % conf("bot_user") + } + } + elif argv[1] == 'unmute' and args['reply_user'] == conf('bot_owner'): + return { + 'presence': { + 'status': None, + 'msg': '' + } + } @pluginfunction('reset-jobs', "reset joblist", ptypes_COMMAND, ratelimit_class=RATE_NO_LIMIT) def reset_jobs(argv, **args): - if 'reset-jobs' != argv[0] or args['reply_user'] != conf('bot_owner'): - return - else: - joblist.clear() - return {'msg': 'done.'} + if 'reset-jobs' != argv[0] or args['reply_user'] != conf('bot_owner'): + return + else: + joblist.clear() + return {'msg': 'done.'} @pluginfunction('resolve-url-title', 'extract titles from urls', ptypes_PARSE, ratelimit_class=RATE_URL) def resolve_url_title(**args): - user = args['reply_user'] - user_pref_nospoiler = conf_get('user_pref', {}).get(user, {}).get('spoiler', False) - if user_pref_nospoiler: - log.info('nospoiler in userconf') - return + user = args['reply_user'] + user_pref_nospoiler = conf_get('user_pref', {}).get(user, {}).get('spoiler', False) + if user_pref_nospoiler: + log.info('nospoiler in userconf') + return - result = re.findall(r'(https?://[^\s>]+)', args['data']) - if not result: - return + result = re.findall(r'(https?://[^\s>]+)', args['data']) + if not result: + return - out = [] - for url in result: - if any([re.match(b, url) for b in conf('url_blacklist')]): - log.info('url blacklist match for ' + url) - break + out = [] + for url in result: + if any([re.match(b, url) for b in conf('url_blacklist')]): + log.info('url blacklist match for ' + url) + break - # urllib.request is broken: - # >>> '.'.encode('idna') - # .... - # UnicodeError: label empty or too long - # >>> '.a.'.encode('idna') - # .... - # UnicodeError: label empty or too long - # >>> 'a.a.'.encode('idna') - # b'a.a.' + # urllib.request is broken: + # >>> '.'.encode('idna') + # .... + # UnicodeError: label empty or too long + # >>> '.a.'.encode('idna') + # .... + # UnicodeError: label empty or too long + # >>> 'a.a.'.encode('idna') + # b'a.a.' - try: - (status, title) = extract_title(url) - except UnicodeError as e: - (status, title) = (4, str(e)) + try: + (status, title) = extract_title(url) + except UnicodeError as e: + (status, title) = (4, str(e)) - if 0 == status: - title = title.strip() - message = 'Title: %s' % title - elif 1 == status: - if conf('image_preview'): - # of course it's fake, but it looks interesting at least - char = r""",._-+=\|/*`~"'""" - message = 'No text but %s, 1-bit ASCII art preview: [%c]' % ( - title, random.choice(char) - ) - else: - log.info('no message sent for non-text %s (%s)' % (url, title)) - continue - elif 2 == status: - message = '(No title)' - elif 3 == status: - message = title - elif 4 == status: - message = 'Bug triggered (%s), invalid URL/domain part: %s' % (title, url) - log.warn(message) - else: - message = 'some error occurred when fetching %s' % url + if 0 == status: + title = title.strip() + message = 'Title: %s' % title + elif 1 == status: + if conf('image_preview'): + # of course it's fake, but it looks interesting at least + char = r""",._-+=\|/*`~"'""" + message = 'No text but %s, 1-bit ASCII art preview: [%c]' % ( + title, random.choice(char) + ) + else: + log.info('no message sent for non-text %s (%s)' % (url, title)) + continue + elif 2 == status: + message = '(No title)' + elif 3 == status: + message = title + elif 4 == status: + message = 'Bug triggered (%s), invalid URL/domain part: %s' % (title, url) + log.warn(message) + else: + message = 'some error occurred when fetching %s' % url - message = message.replace('\n', '\\n') + message = message.replace('\n', '\\n') - log.info('adding to out buf: ' + message) - out.append(message) + log.info('adding to out buf: ' + message) + out.append(message) - return { - 'msg': out - } + return { + 'msg': out + } def else_command(args): - log.info('sent short info') - return { - 'msg': args['reply_user'] + ''': I'm a bot (highlight me with 'info' for more information).''' - } + log.info('sent short info') + return { + 'msg': args['reply_user'] + ''': I'm a bot (highlight me with 'info' for more information).''' + } def register(func_type): - """ - Register plugins. + """ + Register plugins. - :param func_type: plugin functions with this type (ptypes) will be loaded - """ + :param func_type: plugin functions with this type (ptypes) will be loaded + """ - functions = [ - f for ignored, f in globals().items() if - isinstance(f, types.FunctionType) and - all([ - f.__dict__.get('is_plugin', False), - getattr(f, 'plugin_type', None) == func_type - ]) - ] + functions = [ + f for ignored, f in globals().items() if + isinstance(f, types.FunctionType) and + all([ + f.__dict__.get('is_plugin', False), + getattr(f, 'plugin_type', None) == func_type + ]) + ] - log.info('auto-reg %s: %s' % (func_type, ', '.join( - f.plugin_name for f in functions - ))) + log.info('auto-reg %s: %s' % (func_type, ', '.join( + f.plugin_name for f in functions + ))) - for f in functions: - register_plugin(f, func_type) + for f in functions: + register_plugin(f, func_type) def register_plugin(function, func_type): - try: - plugins[func_type].append(function) - except Exception as e: - log.warn('registering %s failed: %s, %s' % (function, e, traceback.format_exc())) + try: + plugins[func_type].append(function) + except Exception as e: + log.warn('registering %s failed: %s, %s' % (function, e, traceback.format_exc())) def register_all(): - register(ptypes_PARSE) - register(ptypes_COMMAND) + register(ptypes_PARSE) + register(ptypes_COMMAND) def event_trigger(): - if 0 == len(joblist): - return True + if 0 == len(joblist): + return True - now = time.time() + now = time.time() - for (i, (t, callback, args)) in enumerate(joblist): - if t < now: - callback(*args) - del (joblist[i]) - return True + for (i, (t, callback, args)) in enumerate(joblist): + if t < now: + callback(*args) + del (joblist[i]) + return True diff --git a/string_constants.py b/string_constants.py index cb30fa3..42490b8 100644 --- a/string_constants.py +++ b/string_constants.py @@ -470,31 +470,31 @@ DNS server drank too much and had a hiccup '''.split('\n')[1:-1] moin_strings_hi = [ - 'Hi', - 'Guten Morgen', 'Morgen', - 'Moin', - 'Tag', 'Tach', - 'NAbend', 'Abend', - 'Hallo', 'Hello' + 'Hi', + 'Guten Morgen', 'Morgen', + 'Moin', + 'Tag', 'Tach', + 'NAbend', 'Abend', + 'Hallo', 'Hello' ] moin_strings_bye = [ - 'Nacht', 'gN8', 'N8', - 'bye', + 'Nacht', 'gN8', 'N8', + 'bye', ] cakes = [ - "No cake for you!", - ("The Enrichment Center is required to remind you " - "that you will be baked, and then there will be cake."), - "The cake is a lie!", - ("This is your fault. I'm going to kill you. " - "And all the cake is gone. You don't even care, do you?"), - "Quit now and cake will be served immediately.", - ("Enrichment Center regulations require both hands to be " - "empty before any cake..."), - ("Uh oh. Somebody cut the cake. I told them to wait for " - "you, but they did it anyway. There is still some left, " - "though, if you hurry back."), - "I'm going to kill you, and all the cake is gone.", - "Who's gonna make the cake when I'm gone? You?" + "No cake for you!", + ("The Enrichment Center is required to remind you " + "that you will be baked, and then there will be cake."), + "The cake is a lie!", + ("This is your fault. I'm going to kill you. " + "And all the cake is gone. You don't even care, do you?"), + "Quit now and cake will be served immediately.", + ("Enrichment Center regulations require both hands to be " + "empty before any cake..."), + ("Uh oh. Somebody cut the cake. I told them to wait for " + "you, but they did it anyway. There is still some left, " + "though, if you hurry back."), + "I'm going to kill you, and all the cake is gone.", + "Who's gonna make the cake when I'm gone? You?" ] diff --git a/test_urlbot.py b/test_urlbot.py index 75f6323..f92bc94 100644 --- a/test_urlbot.py +++ b/test_urlbot.py @@ -3,19 +3,18 @@ To be executed with nose """ import unittest import time - from common import buckets, rate_limit, RATE_GLOBAL class TestEventlooper(unittest.TestCase): - def test_broken_url(self): - """ - Test that broken socket calls are not breaking - """ - from common import fetch_page - broken_url = 'http://foo' - result = fetch_page(url=broken_url) - self.assertEqual(result, (None, None)) + def test_broken_url(self): + """ + Test that broken socket calls are not breaking + """ + from common import fetch_page + broken_url = 'http://foo' + result = fetch_page(url=broken_url) + self.assertEqual(result, (None, None)) from collections import namedtuple @@ -24,34 +23,33 @@ Bucket = namedtuple("BucketConfig", ["history", "period", "max_hist_len"]) class TestRateLimiting(unittest.TestCase): + def setUp(self): + # just for assertions + self.called = { + RATE_GLOBAL: [], + } - def setUp(self): - # just for assertions - self.called = { - RATE_GLOBAL: [], - } + def say(self, msg, rate_class=RATE_GLOBAL): + if rate_limit(rate_class): + self.called[rate_class].append(msg) + # print(msg) + time.sleep(0.1) - def say(self, msg, rate_class=RATE_GLOBAL): - if rate_limit(rate_class): - self.called[rate_class].append(msg) - # print(msg) - time.sleep(0.1) + def test_simple_burst(self): + messages = ["x_%d" % i for i in range(1, 9)] + for m in messages: + self.say(msg=m) + self.assertEqual(messages[:buckets[RATE_GLOBAL].max_hist_len], self.called[RATE_GLOBAL]) - def test_simple_burst(self): - messages = ["x_%d" % i for i in range(1, 9)] - for m in messages: - self.say(msg=m) - self.assertEqual(messages[:buckets[RATE_GLOBAL].max_hist_len], self.called[RATE_GLOBAL]) + def test_msg_two_bursts(self): + # custom bucket, just for testing + buckets[0x42] = Bucket(history=[], period=1, max_hist_len=5) + self.called[0x42] = [] - def test_msg_two_bursts(self): - # custom bucket, just for testing - buckets[0x42] = Bucket(history=[], period=1, max_hist_len=5) - self.called[0x42] = [] - - bucket = buckets[0x42] - messages = ["x_%d" % i for i in range(0, 15)] - for i, m in enumerate(messages): - if i % bucket.max_hist_len == 0: - time.sleep(bucket.period) - self.say(msg=m, rate_class=0x42) - self.assertEqual(messages, self.called[0x42]) + bucket = buckets[0x42] + messages = ["x_%d" % i for i in range(0, 15)] + for i, m in enumerate(messages): + if i % bucket.max_hist_len == 0: + time.sleep(bucket.period) + self.say(msg=m, rate_class=0x42) + self.assertEqual(messages, self.called[0x42]) diff --git a/urlbot.py b/urlbot.py index 0b983a5..9a554ed 100755 --- a/urlbot.py +++ b/urlbot.py @@ -6,276 +6,277 @@ import re import sys from common import ( - conf_load, conf_save, - extract_title, - rate_limit_classes, - RATE_GLOBAL, - RATE_CHAT, - RATE_NO_SILENCE, - RATE_EVENT, - # rate_limited, - rate_limit, - RATE_URL, conf_set) + conf_load, conf_save, + extract_title, + rate_limit_classes, + RATE_GLOBAL, + RATE_CHAT, + RATE_NO_SILENCE, + RATE_EVENT, + # rate_limited, + rate_limit, + RATE_URL, conf_set) from idlebot import IdleBot, start from plugins import ( - plugins as plugin_storage, - ptypes_COMMAND, - plugin_enabled_get, - ptypes_PARSE, - register_event, - else_command + plugins as plugin_storage, + ptypes_COMMAND, + plugin_enabled_get, + ptypes_PARSE, + register_event, + else_command ) try: - from local_config import conf, set_conf + from local_config import conf, set_conf except ImportError: - sys.stderr.write(''' + sys.stderr.write(''' %s: E: local_config.py isn't tracked because of included secrets and %s site specific configurations. Rename local_config.py.skel and %s adjust to your needs. '''[1:] % ( - sys.argv[0], - ' ' * len(sys.argv[0]), - ' ' * len(sys.argv[0]) - )) - sys.exit(1) + sys.argv[0], + ' ' * len(sys.argv[0]), + ' ' * len(sys.argv[0]) + )) + sys.exit(1) class UrlBot(IdleBot): - def __init__(self, jid, password, rooms, nick): - super(UrlBot, self).__init__(jid, password, rooms, nick) + def __init__(self, jid, password, rooms, nick): + super(UrlBot, self).__init__(jid, password, rooms, nick) - self.hist_ts = {p: [] for p in rate_limit_classes} - self.hist_flag = {p: True for p in rate_limit_classes} + self.hist_ts = {p: [] for p in rate_limit_classes} + self.hist_flag = {p: True for p in rate_limit_classes} - self.add_event_handler('message', self.message) - self.priority = 100 + self.add_event_handler('message', self.message) + self.priority = 100 - for r in self.rooms: - self.add_event_handler('muc::%s::got_online' % r, self.muc_online) + for r in self.rooms: + self.add_event_handler('muc::%s::got_online' % r, self.muc_online) - def muc_message(self, msg_obj): - return super(UrlBot, self).muc_message(msg_obj) and self.handle_msg(msg_obj) + def muc_message(self, msg_obj): + return super(UrlBot, self).muc_message(msg_obj) and self.handle_msg(msg_obj) - def message(self, msg_obj): - if 'groupchat' == msg_obj['type']: - return - else: - self.logger.info("Got the following PM: %s" % str(msg_obj)) + def message(self, msg_obj): + if 'groupchat' == msg_obj['type']: + return + else: + self.logger.info("Got the following PM: %s" % str(msg_obj)) - def muc_online(self, msg_obj): - """ - Hook for muc event "user joins" - """ - # don't react to yourself - if msg_obj['muc']['nick'] == self.nick: - return + def muc_online(self, msg_obj): + """ + Hook for muc event "user joins" + """ + # don't react to yourself + if msg_obj['muc']['nick'] == self.nick: + return - # TODO: move this to a undirected plugin, maybe new plugin type - arg_user = msg_obj['muc']['nick'] - arg_user_key = arg_user.lower() - blob_userrecords = conf_load().get('user_records', {}) + # TODO: move this to a undirected plugin, maybe new plugin type + arg_user = msg_obj['muc']['nick'] + arg_user_key = arg_user.lower() + blob_userrecords = conf_load().get('user_records', {}) - if arg_user_key in blob_userrecords: - records = blob_userrecords[arg_user_key] + if arg_user_key in blob_userrecords: + records = blob_userrecords[arg_user_key] - if not records: - return + if not records: + return - self.send_message( - mto=msg_obj['from'].bare, - mbody='%s, there %s %d message%s for you:\n%s' % ( - arg_user, - 'is' if 1 == len(records) else 'are', - len(records), - '' if 1 == len(records) else 's', - '\n'.join(records) - ), - mtype='groupchat' - ) - self.logger.info('sent %d offline records to room %s' % ( - len(records), msg_obj['from'].bare - )) + self.send_message( + mto=msg_obj['from'].bare, + mbody='%s, there %s %d message%s for you:\n%s' % ( + arg_user, + 'is' if 1 == len(records) else 'are', + len(records), + '' if 1 == len(records) else 's', + '\n'.join(records) + ), + mtype='groupchat' + ) + self.logger.info('sent %d offline records to room %s' % ( + len(records), msg_obj['from'].bare + )) - if conf('persistent_locked'): - self.logger.warn("couldn't get exclusive lock") - return False + if conf('persistent_locked'): + self.logger.warn("couldn't get exclusive lock") + return False - set_conf('persistent_locked', True) - blob = conf_load() + set_conf('persistent_locked', True) + blob = conf_load() - if 'user_records' not in blob: - blob['user_records'] = {} + if 'user_records' not in blob: + blob['user_records'] = {} - if arg_user_key in blob['user_records']: - blob['user_records'].pop(arg_user_key) + if arg_user_key in blob['user_records']: + blob['user_records'].pop(arg_user_key) - conf_save(blob) - set_conf('persistent_locked', False) + conf_save(blob) + set_conf('persistent_locked', False) - # @rate_limited(10) - def send_reply(self, message, msg_obj=None): - """ - Send a reply to a message - """ - if self.show: - self.logger.warn("I'm muted! (status: %s)" % self.show) - return + # @rate_limited(10) + def send_reply(self, message, msg_obj=None): + """ + Send a reply to a message + """ + if self.show: + self.logger.warn("I'm muted! (status: %s)" % self.show) + return - set_conf('request_counter', conf('request_counter') + 1) + set_conf('request_counter', conf('request_counter') + 1) - if str is not type(message): - message = '\n'.join(message) + if str is not type(message): + message = '\n'.join(message) - # check other bots, add nospoiler with urls - def _prevent_panic(message, room): - if 'http' in message: - other_bots = conf_load().get("other_bots", ()) - users = self.plugin['xep_0045'].getRoster(room) - if set(users).intersection(set(other_bots)): - message = '(nospoiler) %s' % message - return message + # check other bots, add nospoiler with urls + def _prevent_panic(message, room): + if 'http' in message: + other_bots = conf_load().get("other_bots", ()) + users = self.plugin['xep_0045'].getRoster(room) + if set(users).intersection(set(other_bots)): + message = '(nospoiler) %s' % message + return message - if conf('debug_mode', False): - print(message) - else: - if msg_obj: - message = _prevent_panic(message, msg_obj['from'].bare) - self.send_message( - mto=msg_obj['from'].bare, - mbody=message, - mtype='groupchat' - ) - else: # unset msg_obj == broadcast - for room in self.rooms: - message = _prevent_panic(message, room) - self.send_message( - mto=room, - mbody=message, - mtype='groupchat' - ) + if conf('debug_mode', False): + print(message) + else: + if msg_obj: + message = _prevent_panic(message, msg_obj['from'].bare) + self.send_message( + mto=msg_obj['from'].bare, + mbody=message, + mtype='groupchat' + ) + else: # unset msg_obj == broadcast + for room in self.rooms: + message = _prevent_panic(message, room) + self.send_message( + mto=room, + mbody=message, + mtype='groupchat' + ) - def handle_msg(self, msg_obj): - """ - called for incoming messages - :param msg_obj: - :returns nothing - """ - content = msg_obj['body'] + def handle_msg(self, msg_obj): + """ + called for incoming messages + :param msg_obj: + :returns nothing + """ + content = msg_obj['body'] - if 'has set the subject to:' in content: - return + if 'has set the subject to:' in content: + return - if sys.argv[0] in content: - self.logger.info('silenced, this is my own log') - return + if sys.argv[0] in content: + self.logger.info('silenced, this is my own log') + return - if 'nospoiler' in content: - self.logger.info('no spoiler for: ' + content) - return + if 'nospoiler' in content: + self.logger.info('no spoiler for: ' + content) + return - self.data_parse_commands(msg_obj) - self.data_parse_other(msg_obj) + self.data_parse_commands(msg_obj) + self.data_parse_other(msg_obj) - def data_parse_commands(self, msg_obj): - """ - react to a message with the bots nick - :param msg_obj: dictionary with incoming message parameters + def data_parse_commands(self, msg_obj): + """ + react to a message with the bots nick + :param msg_obj: dictionary with incoming message parameters - :returns: nothing - """ - global got_hangup + :returns: nothing + """ + global got_hangup - data = msg_obj['body'] - words = data.split() + data = msg_obj['body'] + words = data.split() - if 2 > len(words): # need at least two words - return None + if 2 > len(words): # need at least two words + return None - # don't reply if beginning of the text matches bot_user - if not data.startswith(conf('bot_user')): - return None + # don't reply if beginning of the text matches bot_user + if not data.startswith(conf('bot_user')): + return None - if 'hangup' in data: - self.logger.warn('received hangup: ' + data) - got_hangup = True - sys.exit(1) + if 'hangup' in data: + self.logger.warn('received hangup: ' + data) + got_hangup = True + sys.exit(1) - reply_user = msg_obj['mucnick'] + reply_user = msg_obj['mucnick'] - # TODO: check how several commands/plugins in a single message behave (also with rate limiting) - reacted = False - for plugin in plugin_storage[ptypes_COMMAND]: + # TODO: check how several commands/plugins in a single message behave (also with rate limiting) + reacted = False + for plugin in plugin_storage[ptypes_COMMAND]: - if not plugin_enabled_get(plugin): - continue + if not plugin_enabled_get(plugin): + continue - ret = plugin( - data=data, - cmd_list=[pl.plugin_name for pl in plugin_storage[ptypes_COMMAND]], - parser_list=[pl.plugin_name for pl in plugin_storage[ptypes_PARSE]], - reply_user=reply_user, - msg_obj=msg_obj, - argv=words[1:] - ) + ret = plugin( + data=data, + cmd_list=[pl.plugin_name for pl in plugin_storage[ptypes_COMMAND]], + parser_list=[pl.plugin_name for pl in plugin_storage[ptypes_PARSE]], + reply_user=reply_user, + msg_obj=msg_obj, + argv=words[1:] + ) - if ret: - self._run_action(ret, plugin, msg_obj) - reacted = True + if ret: + self._run_action(ret, plugin, msg_obj) + reacted = True - if not reacted and rate_limit(RATE_GLOBAL): - ret = else_command({'reply_user': reply_user}) - if ret: - if 'msg' in ret: - self.send_reply(ret['msg'], msg_obj) + if not reacted and rate_limit(RATE_GLOBAL): + ret = else_command({'reply_user': reply_user}) + if ret: + if 'msg' in ret: + self.send_reply(ret['msg'], msg_obj) - def data_parse_other(self, msg_obj): - """ - react to any message + def data_parse_other(self, msg_obj): + """ + react to any message - :param msg_obj: incoming message parameters - :return: - """ - data = msg_obj['body'] - reply_user = msg_obj['mucnick'] + :param msg_obj: incoming message parameters + :return: + """ + data = msg_obj['body'] + reply_user = msg_obj['mucnick'] - for plugin in plugin_storage[ptypes_PARSE]: - if not plugin_enabled_get(plugin): - continue + for plugin in plugin_storage[ptypes_PARSE]: + if not plugin_enabled_get(plugin): + continue - ret = plugin(reply_user=reply_user, data=data) + ret = plugin(reply_user=reply_user, data=data) - if ret: - self._run_action(ret, plugin, msg_obj) + if ret: + self._run_action(ret, plugin, msg_obj) - def _run_action(self, action, plugin, msg_obj): - """ - Execute the plugin's execution plan - :param action: dict with event and/or msg - :param plugin: plugin obj - :param msg_obj: xmpp message obj - """ - if 'event' in action: - event = action["event"] - if 'msg' in event: - register_event(event["time"], self.send_reply, [event['msg']]) - elif 'command' in event: - command = event["command"] - if rate_limit(RATE_EVENT): - register_event(event["time"], command[0], command[1]) + def _run_action(self, action, plugin, msg_obj): + """ + Execute the plugin's execution plan + :param action: dict with event and/or msg + :param plugin: plugin obj + :param msg_obj: xmpp message obj + """ + if 'event' in action: + event = action["event"] + if 'msg' in event: + register_event(event["time"], self.send_reply, [event['msg']]) + elif 'command' in event: + command = event["command"] + if rate_limit(RATE_EVENT): + register_event(event["time"], command[0], command[1]) - if 'msg' in action and rate_limit(RATE_CHAT | plugin.ratelimit_class): - self.send_reply(action['msg'], msg_obj) + if 'msg' in action and rate_limit(RATE_CHAT | plugin.ratelimit_class): + self.send_reply(action['msg'], msg_obj) - if 'presence' in action: - presence = action['presence'] - conf_set('presence', presence) + if 'presence' in action: + presence = action['presence'] + conf_set('presence', presence) - self.status = presence.get('msg') - self.show = presence.get('status') + self.status = presence.get('msg') + self.show = presence.get('status') + + self.send_presence(pstatus=self.status, pshow=self.show) + # self.reconnect(wait=True) - self.send_presence(pstatus=self.status, pshow=self.show) - # self.reconnect(wait=True) if '__main__' == __name__: - start(UrlBot, True) + start(UrlBot, True)