diff --git a/persistent_config.ini.spec b/persistent_config.ini.spec index 6171efe..84d7439 100644 --- a/persistent_config.ini.spec +++ b/persistent_config.ini.spec @@ -9,3 +9,14 @@ other_bots = string_list(default=list()) [user_pref] [user_records] + +[url_blacklist] +heise = string(default='^.*heise\.de/.*-[0-9]+\.html$') +wikipedia = string(default='^.*wikipedia\.org/wiki/.*$') +blog = string(default=string(default='^.*blog\.fefe\.de/\?ts=[0-9a-f]+$')) +ibash = string(default='^.*ibash\.de/zitat.*$') +golem = string(default='^.*golem\.de/news/.*$') +paste_debian = string(default='^.*paste\.debian\.net/((hidden|plainh?)/)?[0-9a-f]+/?$') +example = string(default='^.*example\.(org|net|com).*$') +sprunge = string(default='^.*sprunge\.us/.*$') +ftp_debian = string(default='^.*ftp\...\.debian\.org.*$') diff --git a/plugins.py b/plugins.py index 0560ba9..f82187b 100644 --- a/plugins.py +++ b/plugins.py @@ -459,7 +459,7 @@ def command_teatimer(argv, **args): ready = time.time() + steep try: - log.info('tea timer set to %s' % time.strftime('%F.%T', time.localtime(ready))) + log.info('tea timer set to %s' % time.strftime('%Y-%m-%d %H:%M', time.localtime(ready))) except (ValueError, OverflowError) as e: return { 'msg': args['reply_user'] + ': time format error: ' + str(e) @@ -467,7 +467,7 @@ def command_teatimer(argv, **args): return { 'msg': args['reply_user'] + ': Tea timer set to %s' % time.strftime( - '%F.%T', time.localtime(ready) + '%Y-%m-%d %H:%M', time.localtime(ready) ), 'event': { 'time': ready, @@ -536,7 +536,7 @@ def command_show_blacklist(argv, **args): '' if not argv1 else ' (limited to %s)' % argv1 ) ] + [ - b for b in config.conf_get('url_blacklist') if not argv1 or argv1 in b + b for b in config.runtime_config_store['url_blacklist'].values() if not argv1 or argv1 in b ] } @@ -807,7 +807,7 @@ def command_record(argv, **args): } target_user = argv[1].lower() - message = '%s (%s): ' % (args['reply_user'], time.strftime('%F.%T')) + message = '{} ({}): '.format(args['reply_user'], time.strftime('%Y-%m-%d %H:%M')) message += ' '.join(argv[2:]) if config.conf_get('persistent_locked'): @@ -927,7 +927,7 @@ def command_show_recordlist(argv, **args): # crawl_at = time.time() + config.get('dsa_watcher_interval') # # register_event(crawl_at, command_dsa_watcher, (['dsa-watcher', 'crawl'],)) # -# msg = 'next crawl set to %s' % time.strftime('%F.%T', time.localtime(crawl_at)) +# msg = 'next crawl set to %s' % time.strftime('%Y-%m-%d %H:%M', time.localtime(crawl_at)) # out.append(msg) # return { # 'msg': out, @@ -1024,6 +1024,14 @@ def reset_jobs(argv, **args): return {'msg': 'done.'} +@pluginfunction('flausch', "make people flauschig", ptypes_COMMAND, ratelimit_class=RATE_FUN) +def flausch(argv, **args): + if len(argv) != 2: + return + return { + 'msg': '{}: *flausch*'.format(argv[1]) + } + @pluginfunction('resolve-url-title', 'extract titles from urls', ptypes_PARSE, ratelimit_class=RATE_URL) def resolve_url_title(**args): user = args['reply_user'] @@ -1036,17 +1044,7 @@ def resolve_url_title(**args): if not result: return - url_blacklist = [ - r'^.*heise\.de/.*-[0-9]+\.html$', - r'^.*wikipedia\.org/wiki/.*$', - r'^.*blog\.fefe\.de/\?ts=[0-9a-f]+$', - r'^.*ibash\.de/zitat.*$', - r'^.*golem\.de/news/.*$' - r'^.*paste\.debian\.net/((hidden|plainh?)/)?[0-9a-f]+/?$', - r'^.*example\.(org|net|com).*$', - r'^.*sprunge\.us/.*$', - r'^.*ftp\...\.debian\.org.*$' - ] + url_blacklist = config.runtime_config_store['url_blacklist'].values() out = [] for url in result: diff --git a/test_urlbot.py b/test_urlbot.py index 1ba1ee0..28b1423 100644 --- a/test_urlbot.py +++ b/test_urlbot.py @@ -55,3 +55,180 @@ class TestRateLimiting(unittest.TestCase): time.sleep(bucket.period) self.say(msg=m, rate_class=0x42) self.assertEqual(messages, self.called[0x42]) + + +class TestPlugins(unittest.TestCase): + def setUp(self): + import config + self.bot_nickname = config.conf_get("bot_nickname") + pass + + def shortDescription(self): + return None + + def test_all_commands_no_message(self): + """ + By specification, currently not possible! should be revisited after settings words[2:]. + Using '' as empty first word as workaround + """ + import plugins + plugin_functions = filter( + lambda x: x[0].startswith("command") and x[1].plugin_type == 'command', + plugins.__dict__.items() + ) + + msg_obj = { + 'body': '', + 'mucnick': 'hans' + } + + plugin_arguments = { + 'msg_obj': msg_obj, + 'reply_user': msg_obj['mucnick'], + 'data': msg_obj['body'], + 'cmd_list': [], + 'parser_list': [], + } + + for p in plugin_functions: + func = p[1] + result = func([''], **plugin_arguments) + self.assertEqual(result, None) + + def test_all_commands_garbage(self): + """ + message is rubbish + :return: + """ + import plugins + plugin_functions = filter( + lambda x: x[0].startswith("command") and x[1].plugin_type == 'command', + plugins.__dict__.items() + ) + + msg_obj = { + 'body': 'urlbot: dewzd hweufweufgw ufzgwufgw ezfgweuf guwegf', + 'mucnick': 'hans' + } + + plugin_arguments = { + 'msg_obj': msg_obj, + 'reply_user': msg_obj['mucnick'], + 'data': msg_obj['body'], + 'cmd_list': [], + 'parser_list': [], + } + words = msg_obj['body'].split()[1:] + + for p in plugin_functions: + func = p[1] + result = func(words, **plugin_arguments) + self.assertEqual(result, None) + + def test_all_commands_with_command(self): + """ + Call plugins with their name, expect None or some action dict + """ + import plugins + plugin_functions = filter( + lambda x: x[0].startswith("command") and x[1].plugin_type == 'command', + plugins.__dict__.items() + ) + + for p in plugin_functions: + func = p[1] + msg_obj = { + 'body': '{}: {}'.format(self.bot_nickname, func.plugin_name), + 'mucnick': 'hans' + } + + plugin_arguments = { + 'msg_obj': msg_obj, + 'reply_user': msg_obj['mucnick'], + 'data': msg_obj['body'], + 'cmd_list': [], + 'parser_list': [], + } + words = msg_obj['body'].split()[1:] + + result = func(words, **plugin_arguments) + + import inspect + source = inspect.getsourcelines(func)[0][2:10] + + # assert that check on the right name as long as we're doing that. + self.assertTrue(func.plugin_name in ''.join(source), '{} not in {}'.format(func.plugin_name, source)) + + self.assertTrue(result is None or isinstance(result, dict)) + if result: + self.assertTrue(any(['msg' in result, 'event' in result, 'presence' in result])) + if 'event' in result: + self.assertTrue(any(['msg' in result['event'], 'command' in result['event']])) + if 'presence' in result: + self.assertTrue(all(['msg' in result['presence'], 'status' in result['presence']])) + + def test_commands_with_params(self): + """ + Test known commands with params so that + they don't return immidiately + """ + + import plugins + plugin_functions = filter( + lambda x: x[0].startswith("command") and x[1].plugin_type == 'command', + plugins.__dict__.items() + ) + + for p in plugin_functions: + func = p[1] + msg_obj = { + 'body': '{}: {}'.format(self.bot_nickname, func.plugin_name), + 'mucnick': 'hans' + } + + plugin_arguments = { + 'msg_obj': msg_obj, + 'reply_user': msg_obj['mucnick'], + 'data': msg_obj['body'], + 'cmd_list': [], + 'parser_list': [], + } + words = msg_obj['body'].split()[1:] + param_mapping = { + 'plugin': ['disable', 'cake'], + 'wp': ['Wikipedia'], + 'wp-en': ['Wikipedia'], + 'choose': ['A', 'B'], + 'decode': ['@'], + 'record': ['urlbug', 'this', 'is', 'sparta'], + # 'teatimer' invalid format string + + } + + if func.plugin_name in param_mapping: + words.extend(param_mapping[func.plugin_name]) + + result = func(words, **plugin_arguments) + + import inspect + source = inspect.getsourcelines(func)[0][2:10] + + # assert that check on the right name as long as we're doing that. + self.assertTrue(func.plugin_name in ''.join(source), '{} not in {}'.format(func.plugin_name, source)) + + print(func.plugin_name, result) + self.assertIsInstance(result, dict, msg=func.plugin_name) + if result: + self.assertTrue(any(['msg' in result, 'event' in result, 'presence' in result])) + if 'event' in result: + self.assertTrue(any(['msg' in result['event'], 'command' in result['event']])) + if 'presence' in result: + self.assertTrue(all(['msg' in result['presence'], 'status' in result['presence']])) + + def test_teatimer(self): + from plugins import command_teatimer + result = command_teatimer(['teatimer'], reply_user='hans') + self.assertIn('event', result) + self.assertIn('time', result['event']) + self.assertIn('msg', result['event']) + self.assertIn('msg', result) diff --git a/urlbot.py b/urlbot.py index 3f19f7c..20806b0 100755 --- a/urlbot.py +++ b/urlbot.py @@ -228,7 +228,7 @@ class UrlBot(IdleBot): # TODO: check how several commands/plugins # in a single message behave (also with rate limiting) reacted = False - for plugin in plugin_storage[ptypes_COMMAND]: + for plugin in filter(lambda p: p.plugin_name == words[1], plugin_storage[ptypes_COMMAND]): if not plugin_enabled_get(plugin): continue