1
0
mirror of http://aero2k.de/t/repos/urlbot-native.git synced 2017-09-06 15:25:38 +02:00

refactor plugins

This commit is contained in:
Thorsten S
2015-12-26 13:50:21 +01:00
parent bd12a4caf7
commit 12d9222ddf
6 changed files with 495 additions and 479 deletions

View File

@@ -205,3 +205,31 @@ def giphy(subject, api_key):
except: except:
pass pass
return giphy_url return giphy_url
def pluginfunction(name, desc, plugin_type, ratelimit_class=RATE_GLOBAL, enabled=True):
"""A decorator to make a plugin out of a function
:param enabled:
:param ratelimit_class:
:param plugin_type:
:param desc:
:param name:
"""
if plugin_type not in ptypes:
raise TypeError('Illegal plugin_type: %s' % plugin_type)
def decorate(f):
f.is_plugin = True
f.is_enabled = enabled
f.plugin_name = name
f.plugin_desc = desc
f.plugin_type = plugin_type
f.ratelimit_class = ratelimit_class
return f
return decorate
ptypes_PARSE = 'parser'
ptypes_COMMAND = 'command'
ptypes = [ptypes_PARSE, ptypes_COMMAND]

225
plugins/__init__.py Normal file
View File

@@ -0,0 +1,225 @@
# -*- coding: utf-8 -*-
import logging
import time
import traceback
import types
import config
from common import RATE_NO_LIMIT, pluginfunction, ptypes_PARSE, ptypes_COMMAND, ptypes
from plugins import commands, parsers
joblist = []
plugins = {p: [] for p in ptypes}
log = logging.getLogger(__name__)
def plugin_enabled_get(urlbot_plugin):
plugin_section = config.runtimeconf_deepget('plugins.{}'.format(urlbot_plugin.plugin_name))
if plugin_section and "enabled" in plugin_section:
return plugin_section.as_bool("enabled")
else:
return urlbot_plugin.is_enabled
def plugin_enabled_set(plugin, enabled):
if config.conf_get('persistent_locked'):
log.warn("couldn't get exclusive lock")
config.conf_set('persistent_locked', True)
# blob = conf_load()
if plugin.plugin_name not in config.runtime_config_store['plugins']:
config.runtime_config_store['plugins'][plugin.plugin_name] = {}
config.runtime_config_store['plugins'][plugin.plugin_name]['enabled'] = enabled
config.runtimeconf_persist()
config.conf_set('persistent_locked', False)
def register_event(t, callback, args):
joblist.append((t, callback, args))
def else_command(args):
log.info('sent short info')
return {
'msg': args['reply_user'] + ''': I'm a bot (highlight me with 'info' for more information).'''
}
def register(func_type):
"""
Register plugins.
:param func_type: plugin functions with this type (ptypes) will be loaded
"""
if func_type == ptypes_COMMAND:
local_commands = [command_plugin_activation + command_list + command_help + reset_jobs]
plugin_funcs = commands.__dict__.items() + local_commands
elif func_type == ptypes_PARSE:
plugin_funcs = parsers.__dict__.items()
else:
raise RuntimeError("invalid func type: {}".format(func_type))
functions = [
f for ignored, f in plugin_funcs if
isinstance(f, types.FunctionType) and
all([
f.__dict__.get('is_plugin', False),
getattr(f, 'plugin_type', None) == func_type
])
]
log.info('auto-reg %s: %s' % (func_type, ', '.join(
f.plugin_name for f in functions
)))
for f in functions:
register_plugin(f, func_type)
def register_plugin(function, func_type):
try:
plugins[func_type].append(function)
except Exception as e:
log.warn('registering %s failed: %s, %s' % (function, e, traceback.format_exc()))
def register_all():
register(ptypes_PARSE)
register(ptypes_COMMAND)
def event_trigger():
if 0 == len(joblist):
return True
now = time.time()
for (i, (t, callback, args)) in enumerate(joblist):
if t < now:
callback(*args)
del (joblist[i])
return True
@pluginfunction('help', 'print help for a command or all known commands', ptypes_COMMAND)
def command_help(argv, **args):
what = argv[1] if len(argv) > 1 else None
logger = logging.getLogger(__name__)
if not what:
logger.info('empty help request, sent all commands')
commands = args['cmd_list']
commands.sort()
parsers = args['parser_list']
parsers.sort()
return {
'msg': [
'%s: known commands: %s' % (
args['reply_user'], ', '.join(commands)
),
'known parsers: %s' % ', '.join(parsers)
]
}
for p in plugins[ptypes_COMMAND] + plugins[ptypes_PARSE]:
if what == p.plugin_name:
logger.info('sent help for %s' % what)
return {
'msg': args['reply_user'] + ': help for %s %s %s: %s' % (
'enabled' if plugin_enabled_get(p) else 'disabled',
'parser' if p.plugin_type == ptypes_PARSE else 'command',
what, p.plugin_desc
)
}
logger.info('no help found for %s' % what)
return {
'msg': args['reply_user'] + ': no such command: %s' % what
}
@pluginfunction('plugin', "'disable' or 'enable' plugins", ptypes_COMMAND)
def command_plugin_activation(argv, **args):
if argv[0] != 'plugin' or len(argv) == 1:
return
command = argv[1]
plugin = argv[2] if len(argv) > 2 else None
if command not in ('enable', 'disable'):
return
log.info('plugin activation plugin called')
if not plugin:
return {
'msg': args['reply_user'] + ': no plugin given'
}
elif command_plugin_activation.plugin_name == plugin:
return {
'msg': args['reply_user'] + ': not allowed'
}
for p in plugins[ptypes_COMMAND] + plugins[ptypes_PARSE]:
if p.plugin_name == plugin:
plugin_enabled_set(p, 'enable' == command)
return {
'msg': args['reply_user'] + ': %sd %s' % (
command, plugin
)
}
return {
'msg': args['reply_user'] + ': unknown plugin %s' % plugin
}
@pluginfunction('list', 'list plugin and parser status', ptypes_COMMAND)
def command_list(argv, **args):
if 'list' != argv[0]:
return
log.info('list plugin called')
if 'enabled' in argv and 'disabled' in argv:
return {
'msg': args['reply_user'] + ": both 'enabled' and 'disabled' makes no sense"
}
# if not given, asume both
if 'command' not in argv and 'parser' not in argv:
argv.append('command')
argv.append('parser')
out_command = []
out_parser = []
if 'command' in argv:
out_command = plugins[ptypes_COMMAND]
if 'parser' in argv:
out_parser = plugins[ptypes_PARSE]
if 'enabled' in argv:
out_command = [p for p in out_command if plugin_enabled_get(p)]
out_parser = [p for p in out_parser if plugin_enabled_get(p)]
if 'disabled' in argv:
out_command = [p for p in out_command if not plugin_enabled_get(p)]
out_parser = [p for p in out_parser if not plugin_enabled_get(p)]
msg = [args['reply_user'] + ': list of plugins:']
if out_command:
msg.append('commands: %s' % ', '.join([p.plugin_name for p in out_command]))
if out_parser:
msg.append('parsers: %s' % ', '.join([p.plugin_name for p in out_parser]))
return {'msg': msg}
@pluginfunction('reset-jobs', "reset joblist", ptypes_COMMAND, ratelimit_class=RATE_NO_LIMIT)
def reset_jobs(argv, **args):
if args['reply_user'] != config.conf_get('bot_owner'):
return
else:
joblist.clear()
return {'msg': 'done.'}

View File

@@ -1,267 +1,20 @@
# -*- coding: utf-8 -*-
import json import json
import logging import logging
import random import random
import re
import time import time
import traceback import traceback
import types
import unicodedata import unicodedata
import urllib.parse import urllib.parse
import urllib.request import urllib.request
from common import RATE_GLOBAL, RATE_NO_SILENCE, VERSION, RATE_INTERACTIVE, BUFSIZ, \
USER_AGENT, extract_title, RATE_FUN, RATE_NO_LIMIT, RATE_URL, giphy
from config import runtimeconf_get
import config import config
from string_constants import excuses, moin_strings_hi, moin_strings_bye, cakes from common import VERSION, RATE_FUN, RATE_GLOBAL, RATE_INTERACTIVE, RATE_NO_LIMIT, giphy, BUFSIZ, pluginfunction, \
ptypes_COMMAND
ptypes_PARSE = 'parser' from plugins import ptypes_COMMAND
ptypes_COMMAND = 'command' from string_constants import cakes, excuses, moin_strings_hi, moin_strings_bye
ptypes = [ptypes_PARSE, ptypes_COMMAND]
joblist = []
plugins = {p: [] for p in ptypes}
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def plugin_enabled_get(urlbot_plugin):
plugin_section = config.runtimeconf_deepget('plugins.{}'.format(urlbot_plugin.plugin_name))
if plugin_section and "enabled" in plugin_section:
return plugin_section.as_bool("enabled")
else:
return urlbot_plugin.is_enabled
def plugin_enabled_set(plugin, enabled):
if config.conf_get('persistent_locked'):
log.warn("couldn't get exclusive lock")
config.conf_set('persistent_locked', True)
# blob = conf_load()
if plugin.plugin_name not in config.runtime_config_store['plugins']:
config.runtime_config_store['plugins'][plugin.plugin_name] = {}
config.runtime_config_store['plugins'][plugin.plugin_name]['enabled'] = enabled
config.runtimeconf_persist()
config.conf_set('persistent_locked', False)
def pluginfunction(name, desc, plugin_type, ratelimit_class=RATE_GLOBAL, enabled=True):
"""A decorator to make a plugin out of a function
:param enabled:
:param ratelimit_class:
:param plugin_type:
:param desc:
:param name:
"""
if plugin_type not in ptypes:
raise TypeError('Illegal plugin_type: %s' % plugin_type)
def decorate(f):
f.is_plugin = True
f.is_enabled = enabled
f.plugin_name = name
f.plugin_desc = desc
f.plugin_type = plugin_type
f.ratelimit_class = ratelimit_class
return f
return decorate
def register_event(t, callback, args):
joblist.append((t, callback, args))
@pluginfunction('mental_ill', 'parse mental illness', ptypes_PARSE, ratelimit_class=RATE_NO_SILENCE | RATE_GLOBAL)
def parse_mental_ill(**args):
min_ill = 3
c = 0
flag = False
# return True for min_ill '!' in a row
for d in args['data']:
if '!' == d or '?' == d:
c += 1
else:
c = 0
if min_ill <= c:
flag = True
break
if flag:
log.info('sent mental illness reply')
return {
'msg': (
'Multiple exclamation/question marks are a sure sign of mental disease, with %s as a living example.' %
args['reply_user']
)
}
@pluginfunction('debbug', 'parse Debian bug numbers', ptypes_PARSE, ratelimit_class=RATE_NO_SILENCE | RATE_GLOBAL)
def parse_debbug(**args):
bugs = re.findall(r'#(\d{4,})', args['data'])
if not bugs:
return None
out = []
for b in bugs:
log.info('detected Debian bug #%s' % b)
url = 'https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=%s' % b
status, title = extract_title(url)
if 0 == status:
out.append('Debian Bug: %s: %s' % (title, url))
elif 3 == status:
out.append('error for #%s: %s' % (b, title))
else:
log.info('unknown status %d' % status)
return {
'msg': out
}
@pluginfunction('cve', 'parse a CVE handle', ptypes_PARSE, ratelimit_class=RATE_NO_SILENCE | RATE_GLOBAL)
def parse_cve(**args):
cves = re.findall(r'(CVE-\d\d\d\d-\d+)', args['data'].upper())
if not cves:
return None
log.info('detected CVE handle')
return {
'msg': ['https://security-tracker.debian.org/tracker/%s' % c for c in cves]
}
@pluginfunction('dsa', 'parse a DSA handle', ptypes_PARSE, ratelimit_class=RATE_NO_SILENCE | RATE_GLOBAL)
def parse_dsa(**args):
dsas = re.findall(r'(DSA-\d\d\d\d-\d+)', args['data'].upper())
if not dsas:
return None
log.info('detected DSA handle')
return {
'msg': ['https://security-tracker.debian.org/tracker/%s' % d for d in dsas]
}
@pluginfunction('skynet', 'parse skynet', ptypes_PARSE, ratelimit_class=RATE_FUN | RATE_GLOBAL)
def parse_skynet(**args):
if 'skynet' in args['data'].lower():
return {
'msg': 'I\'ll be back.'
}
@pluginfunction('moin', 'parse hi/bye', ptypes_PARSE)
def parse_moin(**args):
for direction in [moin_strings_hi, moin_strings_bye]:
for d in direction:
words = re.split(r'\W+', args['data'])
# assumption: longer sentences are not greetings
if 3 < len(args['data'].split()):
continue
for w in words:
if d.lower() == w.lower():
if args['reply_user'] in config.conf_get('moin-disabled-user'):
log.info('moin blacklist match')
return
if args['reply_user'] in config.conf_get('moin-modified-user'):
log.info('being "quiet" for %s' % w)
return {
'msg': '/me %s' % random.choice([
"doesn't say anything at all",
'whistles uninterested',
'just ignores this incident'
])
}
log.info('sent %s reply for %s' % (
'hi' if direction is moin_strings_hi else 'bye', w
))
return {
'msg': '''%s, %s''' % (
random.choice(direction),
args['reply_user']
)
}
@pluginfunction('latex', r'reacts on \LaTeX', ptypes_PARSE, ratelimit_class=RATE_FUN | RATE_GLOBAL)
def parse_latex(**args):
if r'\LaTeX' in args['data']:
return {
'msg': '''LaTeX is way too complex for me, I'm happy with fmt(1)'''
}
@pluginfunction('me-action', 'reacts to /me.*%{bot_nickname}', ptypes_PARSE, ratelimit_class=RATE_FUN | RATE_GLOBAL)
def parse_slash_me(**args):
if args['data'].lower().startswith('/me') and (config.conf_get('bot_nickname') in args['data'].lower()):
log.info('sent /me reply')
me_replys = [
'are you that rude to everybody?',
'oh, thank you...',
'do you really think that was nice?',
'that sounds very interesting...',
"excuse me, but I'm already late for an appointment"
]
return {
'msg': args['reply_user'] + ': %s' % random.choice(me_replys)
}
@pluginfunction('help', 'print help for a command or all known commands', ptypes_COMMAND)
def command_help(argv, **args):
command = argv[0]
what = argv[1] if len(argv) > 1 else None
if 'help' != command:
return
if not what:
log.info('empty help request, sent all commands')
commands = args['cmd_list']
commands.sort()
parsers = args['parser_list']
parsers.sort()
return {
'msg': [
'%s: known commands: %s' % (
args['reply_user'], ', '.join(commands)
),
'known parsers: %s' % ', '.join(parsers)
]
}
for p in plugins[ptypes_COMMAND] + plugins[ptypes_PARSE]:
if what == p.plugin_name:
log.info('sent help for %s' % what)
return {
'msg': args['reply_user'] + ': help for %s %s %s: %s' % (
'enabled' if plugin_enabled_get(p) else 'disabled',
'parser' if p.plugin_type == ptypes_PARSE else 'command',
what, p.plugin_desc
)
}
log.info('no help found for %s' % what)
return {
'msg': args['reply_user'] + ': no such command: %s' % what
}
@pluginfunction('version', 'prints version', ptypes_COMMAND) @pluginfunction('version', 'prints version', ptypes_COMMAND)
def command_version(argv, **args): def command_version(argv, **args):
if 'version' != argv[0]: if 'version' != argv[0]:
@@ -439,7 +192,8 @@ def command_info(argv, **args):
} }
@pluginfunction('teatimer', 'sets a tea timer to $1 or currently %d seconds' % config.conf_get('tea_steep_time'), ptypes_COMMAND) @pluginfunction('teatimer', 'sets a tea timer to $1 or currently %d seconds' % config.conf_get('tea_steep_time'),
ptypes_COMMAND)
def command_teatimer(argv, **args): def command_teatimer(argv, **args):
if 'teatimer' != argv[0]: if 'teatimer' != argv[0]:
return return
@@ -658,43 +412,6 @@ def command_terminate(argv, **args):
} }
@pluginfunction('plugin', "'disable' or 'enable' plugins", ptypes_COMMAND)
def command_plugin_activation(argv, **args):
if argv[0] != 'plugin' or len(argv) == 1:
return
command = argv[1]
plugin = argv[2] if len(argv) > 2 else None
if command not in ('enable', 'disable'):
return
log.info('plugin activation plugin called')
if not plugin:
return {
'msg': args['reply_user'] + ': no plugin given'
}
elif command_plugin_activation.plugin_name == plugin:
return {
'msg': args['reply_user'] + ': not allowed'
}
for p in plugins[ptypes_COMMAND] + plugins[ptypes_PARSE]:
if p.plugin_name == plugin:
plugin_enabled_set(p, 'enable' == command)
return {
'msg': args['reply_user'] + ': %sd %s' % (
command, plugin
)
}
return {
'msg': args['reply_user'] + ': unknown plugin %s' % plugin
}
@pluginfunction('wp-en', 'crawl the english Wikipedia', ptypes_COMMAND) @pluginfunction('wp-en', 'crawl the english Wikipedia', ptypes_COMMAND)
def command_wp_en(argv, **args): def command_wp_en(argv, **args):
if 'wp-en' != argv[0]: if 'wp-en' != argv[0]:
@@ -803,45 +520,6 @@ def command_show_moinlist(argv, **args):
} }
@pluginfunction('list', 'list plugin and parser status', ptypes_COMMAND)
def command_list(argv, **args):
if 'list' != argv[0]:
return
log.info('list plugin called')
if 'enabled' in argv and 'disabled' in argv:
return {
'msg': args['reply_user'] + ": both 'enabled' and 'disabled' makes no sense"
}
# if not given, asume both
if 'command' not in argv and 'parser' not in argv:
argv.append('command')
argv.append('parser')
out_command = []
out_parser = []
if 'command' in argv:
out_command = plugins[ptypes_COMMAND]
if 'parser' in argv:
out_parser = plugins[ptypes_PARSE]
if 'enabled' in argv:
out_command = [p for p in out_command if plugin_enabled_get(p)]
out_parser = [p for p in out_parser if plugin_enabled_get(p)]
if 'disabled' in argv:
out_command = [p for p in out_command if not plugin_enabled_get(p)]
out_parser = [p for p in out_parser if not plugin_enabled_get(p)]
msg = [args['reply_user'] + ': list of plugins:']
if out_command:
msg.append('commands: %s' % ', '.join([p.plugin_name for p in out_command]))
if out_parser:
msg.append('parsers: %s' % ', '.join([p.plugin_name for p in out_parser]))
return {'msg': msg}
@pluginfunction( @pluginfunction(
'record', 'record a message for a now offline user (usage: record {user} {some message})', ptypes_COMMAND) 'record', 'record a message for a now offline user (usage: record {user} {some message})', ptypes_COMMAND)
def command_record(argv, **args): def command_record(argv, **args):
@@ -895,12 +573,11 @@ def command_show_recordlist(argv, **args):
[ [
'%s (%d)' % (key, len(val)) for key, val in config.runtime_config_store['user_records'].items() '%s (%d)' % (key, len(val)) for key, val in config.runtime_config_store['user_records'].items()
if not argv1 or argv1.lower() in key.lower() if not argv1 or argv1.lower() in key.lower()
] ]
) )
) )
} }
# TODO: disabled until rewrite # TODO: disabled until rewrite
# @pluginfunction('dsa-watcher', 'automatically crawls for newly published Debian Security Announces', ptypes_COMMAND, # @pluginfunction('dsa-watcher', 'automatically crawls for newly published Debian Security Announces', ptypes_COMMAND,
# ratelimit_class=RATE_NO_SILENCE) # ratelimit_class=RATE_NO_SILENCE)
@@ -987,8 +664,6 @@ def command_show_recordlist(argv, **args):
# msg = 'wrong argument' # msg = 'wrong argument'
# log.warn(msg) # log.warn(msg)
# return {'msg': msg} # return {'msg': msg}
@pluginfunction("provoke-bots", "search for other bots", ptypes_COMMAND) @pluginfunction("provoke-bots", "search for other bots", ptypes_COMMAND)
def provoke_bots(argv, **args): def provoke_bots(argv, **args):
if 'provoke-bots' == argv[0]: if 'provoke-bots' == argv[0]:
@@ -997,32 +672,6 @@ def provoke_bots(argv, **args):
} }
@pluginfunction("recognize_bots", "got ya", ptypes_PARSE)
def recognize_bots(**args):
unique_standard_phrases = (
'independent bot and have nothing to do with other artificial intelligence systems',
'new Debian Security Announce',
'I\'m a bot (highlight me',
)
def _add_to_list(username, message):
if username not in config.runtime_config_store['other_bots']:
config.runtime_config_store['other_bots'].append(username)
config.runtimeconf_persist()
log.info("Adding {} to the list of bots (now {})".format(username, config.runtime_config_store['other_bots']))
return {
'event': {
'time': time.time() + 3,
'msg': message
}
}
if any([phrase in args['data'] for phrase in unique_standard_phrases]):
return _add_to_list(args['reply_user'], 'Making notes...')
elif 'I\'ll be back' in args['data']:
return _add_to_list(args['reply_user'], 'Hey there, buddy!')
@pluginfunction("remove-from-botlist", "remove a user from the botlist", ptypes_COMMAND) @pluginfunction("remove-from-botlist", "remove a user from the botlist", ptypes_COMMAND)
def remove_from_botlist(argv, **args): def remove_from_botlist(argv, **args):
if len(argv) != 2: if len(argv) != 2:
@@ -1076,15 +725,6 @@ def set_status(argv, **args):
} }
@pluginfunction('reset-jobs', "reset joblist", ptypes_COMMAND, ratelimit_class=RATE_NO_LIMIT)
def reset_jobs(argv, **args):
if args['reply_user'] != config.conf_get('bot_owner'):
return
else:
joblist.clear()
return {'msg': 'done.'}
@pluginfunction('save-config', "save config", ptypes_COMMAND, ratelimit_class=RATE_NO_LIMIT) @pluginfunction('save-config', "save config", ptypes_COMMAND, ratelimit_class=RATE_NO_LIMIT)
def save_config(argv, **args): def save_config(argv, **args):
if args['reply_user'] != config.conf_get('bot_owner'): if args['reply_user'] != config.conf_get('bot_owner'):
@@ -1103,54 +743,6 @@ def flausch(argv, **args):
} }
@pluginfunction('resolve-url-title', 'extract titles from urls', ptypes_PARSE, ratelimit_class=RATE_URL)
def resolve_url_title(**args):
user = args['reply_user']
user_pref_nospoiler = runtimeconf_get('user_pref', {}).get(user, {}).get('spoiler', False)
if user_pref_nospoiler:
log.info('nospoiler in userconf')
return
result = re.findall(r'(https?://[^\s>]+)', args['data'])
if not result:
return
url_blacklist = config.runtime_config_store['url_blacklist'].values()
out = []
for url in result:
if any([re.match(b, url) for b in url_blacklist]):
log.info('url blacklist match for ' + url)
break
# urllib.request is broken:
# >>> '.'.encode('idna')
# ....
# UnicodeError: label empty or too long
# >>> '.a.'.encode('idna')
# ....
# UnicodeError: label empty or too long
# >>> 'a.a.'.encode('idna')
# b'a.a.'
try:
title = extract_title(url)
except UnicodeError as e:
message = 'Bug triggered (%s), invalid URL/domain part: %s' % (str(e), url)
log.warn(message)
return {'msg': message}
if title:
title = title.strip()
message = 'Title: %s' % title
message = message.replace('\n', '\\n')
out.append(message)
return {
'msg': out
}
@pluginfunction('show-runtimeconfig', "show the current runtimeconfig", ptypes_COMMAND, ratelimit_class=RATE_NO_LIMIT) @pluginfunction('show-runtimeconfig', "show the current runtimeconfig", ptypes_COMMAND, ratelimit_class=RATE_NO_LIMIT)
def show_runtimeconfig(argv, **args): def show_runtimeconfig(argv, **args):
if args['reply_user'] != config.conf_get('bot_owner'): if args['reply_user'] != config.conf_get('bot_owner'):
@@ -1167,59 +759,3 @@ def reload_runtimeconfig(argv, **args):
else: else:
config.runtime_config_store.reload() config.runtime_config_store.reload()
return {'msg': 'done'} return {'msg': 'done'}
def else_command(args):
log.info('sent short info')
return {
'msg': args['reply_user'] + ''': I'm a bot (highlight me with 'info' for more information).'''
}
def register(func_type):
"""
Register plugins.
:param func_type: plugin functions with this type (ptypes) will be loaded
"""
functions = [
f for ignored, f in globals().items() if
isinstance(f, types.FunctionType) and
all([
f.__dict__.get('is_plugin', False),
getattr(f, 'plugin_type', None) == func_type
])
]
log.info('auto-reg %s: %s' % (func_type, ', '.join(
f.plugin_name for f in functions
)))
for f in functions:
register_plugin(f, func_type)
def register_plugin(function, func_type):
try:
plugins[func_type].append(function)
except Exception as e:
log.warn('registering %s failed: %s, %s' % (function, e, traceback.format_exc()))
def register_all():
register(ptypes_PARSE)
register(ptypes_COMMAND)
def event_trigger():
if 0 == len(joblist):
return True
now = time.time()
for (i, (t, callback, args)) in enumerate(joblist):
if t < now:
callback(*args)
del (joblist[i])
return True

232
plugins/parsers.py Normal file
View File

@@ -0,0 +1,232 @@
import logging
import random
import re
import time
import config
from common import RATE_NO_SILENCE, RATE_GLOBAL, extract_title, RATE_FUN, RATE_URL, pluginfunction, ptypes_PARSE
from config import runtimeconf_get
from plugins import ptypes_PARSE
from string_constants import moin_strings_hi, moin_strings_bye
log = logging.getLogger(__name__)
@pluginfunction('mental_ill', 'parse mental illness', ptypes_PARSE, ratelimit_class=RATE_NO_SILENCE | RATE_GLOBAL)
def parse_mental_ill(**args):
min_ill = 3
c = 0
flag = False
# return True for min_ill '!' in a row
for d in args['data']:
if '!' == d or '?' == d:
c += 1
else:
c = 0
if min_ill <= c:
flag = True
break
if flag:
log.info('sent mental illness reply')
return {
'msg': (
'Multiple exclamation/question marks are a sure sign of mental disease, with %s as a living example.' %
args['reply_user']
)
}
@pluginfunction('debbug', 'parse Debian bug numbers', ptypes_PARSE, ratelimit_class=RATE_NO_SILENCE | RATE_GLOBAL)
def parse_debbug(**args):
bugs = re.findall(r'#(\d{4,})', args['data'])
if not bugs:
return None
out = []
for b in bugs:
log.info('detected Debian bug #%s' % b)
url = 'https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=%s' % b
status, title = extract_title(url)
if 0 == status:
out.append('Debian Bug: %s: %s' % (title, url))
elif 3 == status:
out.append('error for #%s: %s' % (b, title))
else:
log.info('unknown status %d' % status)
return {
'msg': out
}
@pluginfunction('cve', 'parse a CVE handle', ptypes_PARSE, ratelimit_class=RATE_NO_SILENCE | RATE_GLOBAL)
def parse_cve(**args):
cves = re.findall(r'(CVE-\d\d\d\d-\d+)', args['data'].upper())
if not cves:
return None
log.info('detected CVE handle')
return {
'msg': ['https://security-tracker.debian.org/tracker/%s' % c for c in cves]
}
@pluginfunction('dsa', 'parse a DSA handle', ptypes_PARSE, ratelimit_class=RATE_NO_SILENCE | RATE_GLOBAL)
def parse_dsa(**args):
dsas = re.findall(r'(DSA-\d\d\d\d-\d+)', args['data'].upper())
if not dsas:
return None
log.info('detected DSA handle')
return {
'msg': ['https://security-tracker.debian.org/tracker/%s' % d for d in dsas]
}
@pluginfunction('skynet', 'parse skynet', ptypes_PARSE, ratelimit_class=RATE_FUN | RATE_GLOBAL)
def parse_skynet(**args):
if 'skynet' in args['data'].lower():
return {
'msg': 'I\'ll be back.'
}
@pluginfunction('moin', 'parse hi/bye', ptypes_PARSE)
def parse_moin(**args):
for direction in [moin_strings_hi, moin_strings_bye]:
for d in direction:
words = re.split(r'\W+', args['data'])
# assumption: longer sentences are not greetings
if 3 < len(args['data'].split()):
continue
for w in words:
if d.lower() == w.lower():
if args['reply_user'] in config.conf_get('moin-disabled-user'):
log.info('moin blacklist match')
return
if args['reply_user'] in config.conf_get('moin-modified-user'):
log.info('being "quiet" for %s' % w)
return {
'msg': '/me %s' % random.choice([
"doesn't say anything at all",
'whistles uninterested',
'just ignores this incident'
])
}
log.info('sent %s reply for %s' % (
'hi' if direction is moin_strings_hi else 'bye', w
))
return {
'msg': '''%s, %s''' % (
random.choice(direction),
args['reply_user']
)
}
@pluginfunction('latex', r'reacts on \LaTeX', ptypes_PARSE, ratelimit_class=RATE_FUN | RATE_GLOBAL)
def parse_latex(**args):
if r'\LaTeX' in args['data']:
return {
'msg': '''LaTeX is way too complex for me, I'm happy with fmt(1)'''
}
@pluginfunction('me-action', 'reacts to /me.*%{bot_nickname}', ptypes_PARSE, ratelimit_class=RATE_FUN | RATE_GLOBAL)
def parse_slash_me(**args):
if args['data'].lower().startswith('/me') and (config.conf_get('bot_nickname') in args['data'].lower()):
log.info('sent /me reply')
me_replys = [
'are you that rude to everybody?',
'oh, thank you...',
'do you really think that was nice?',
'that sounds very interesting...',
"excuse me, but I'm already late for an appointment"
]
return {
'msg': args['reply_user'] + ': %s' % random.choice(me_replys)
}
@pluginfunction("recognize_bots", "got ya", ptypes_PARSE)
def recognize_bots(**args):
unique_standard_phrases = (
'independent bot and have nothing to do with other artificial intelligence systems',
'new Debian Security Announce',
'I\'m a bot (highlight me',
)
def _add_to_list(username, message):
if username not in config.runtime_config_store['other_bots']:
config.runtime_config_store['other_bots'].append(username)
config.runtimeconf_persist()
log.info("Adding {} to the list of bots (now {})".format(username, config.runtime_config_store['other_bots']))
return {
'event': {
'time': time.time() + 3,
'msg': message
}
}
if any([phrase in args['data'] for phrase in unique_standard_phrases]):
return _add_to_list(args['reply_user'], 'Making notes...')
elif 'I\'ll be back' in args['data']:
return _add_to_list(args['reply_user'], 'Hey there, buddy!')
@pluginfunction('resolve-url-title', 'extract titles from urls', ptypes_PARSE, ratelimit_class=RATE_URL)
def resolve_url_title(**args):
user = args['reply_user']
user_pref_nospoiler = runtimeconf_get('user_pref', {}).get(user, {}).get('spoiler', False)
if user_pref_nospoiler:
log.info('nospoiler in userconf')
return
result = re.findall(r'(https?://[^\s>]+)', args['data'])
if not result:
return
url_blacklist = config.runtime_config_store['url_blacklist'].values()
out = []
for url in result:
if any([re.match(b, url) for b in url_blacklist]):
log.info('url blacklist match for ' + url)
break
# urllib.request is broken:
# >>> '.'.encode('idna')
# ....
# UnicodeError: label empty or too long
# >>> '.a.'.encode('idna')
# ....
# UnicodeError: label empty or too long
# >>> 'a.a.'.encode('idna')
# b'a.a.'
try:
title = extract_title(url)
except UnicodeError as e:
message = 'Bug triggered (%s), invalid URL/domain part: %s' % (str(e), url)
log.warn(message)
return {'msg': message}
if title:
title = title.strip()
message = 'Title: %s' % title
message = message.replace('\n', '\\n')
out.append(message)
return {
'msg': out
}

View File

@@ -6,8 +6,7 @@ TODO: test all plugins, maybe declare their sample input somewhere near the code
import tempfile import tempfile
import time import time
import unittest import unittest
from collections import namedtuple
import mock as mock
from common import buckets, rate_limit, RATE_GLOBAL from common import buckets, rate_limit, RATE_GLOBAL
@@ -23,8 +22,6 @@ class TestEventlooper(unittest.TestCase):
self.assertEqual(result, (None, None)) self.assertEqual(result, (None, None))
from collections import namedtuple
Bucket = namedtuple("BucketConfig", ["history", "period", "max_hist_len"]) Bucket = namedtuple("BucketConfig", ["history", "period", "max_hist_len"])
@@ -235,7 +232,7 @@ class TestPlugins(unittest.TestCase):
self.assertTrue(all(['msg' in result['presence'], 'status' in result['presence']])) self.assertTrue(all(['msg' in result['presence'], 'status' in result['presence']]))
def test_teatimer(self): def test_teatimer(self):
from plugins import command_teatimer from plugins.commands import command_teatimer
result = command_teatimer(['teatimer'], reply_user='hans') result = command_teatimer(['teatimer'], reply_user='hans')
self.assertIn('event', result) self.assertIn('event', result)
self.assertIn('time', result['event']) self.assertIn('time', result['event'])
@@ -303,5 +300,3 @@ class TestPlugins(unittest.TestCase):
if 'DERPDERP' in config.runtime_config_store['other_bots']: if 'DERPDERP' in config.runtime_config_store['other_bots']:
config.runtime_config_store['other_bots'].remove('DERPDERP') config.runtime_config_store['other_bots'].remove('DERPDERP')
config.runtime_config_store.write() config.runtime_config_store.write()

View File

@@ -13,7 +13,7 @@ from common import (
RATE_CHAT, RATE_CHAT,
RATE_EVENT, RATE_EVENT,
rate_limit, rate_limit,
) ptypes_PARSE, ptypes_COMMAND)
from config import runtimeconf_set from config import runtimeconf_set
from idlebot import IdleBot, start from idlebot import IdleBot, start
from plugins import ( from plugins import (