pinhook/pinhook/plugin.py

176 lines
4.8 KiB
Python

from enum import Enum
from functools import wraps
import importlib
import os
from .log import logger
plugins = {}
cmds = {}
lstnrs = {}
class OutputType(Enum):
Message = 'message'
Action = 'action'
class Output:
def __init__(self, msg_type, msg):
self.msg_type = msg_type
self.msg = self.sanitize(msg)
def sanitize(self, msg):
try:
return msg.splitlines()
except AttributeError:
return msg
class _BasePlugin:
enabled = True
logger = logger
def enable(self):
self.enabled = True
def disable(self):
self.enabled = False
class Listener(_BasePlugin):
def __init__(self, name, run=None):
self.name = name
if run:
self.run = run
self._add_listener()
def __str__(self):
return self.name
def run(self):
pass
def _add_listener(self):
lstnrs[self.name] = self
plugins[self.name] = self
class Command(_BasePlugin):
def __init__(self, cmd, **kwargs):
self.cmd = cmd
self.help_text = kwargs.get('help_text', 'N/A')
self.ops = kwargs.get('ops', False)
self.ops_msg = kwargs.get('ops_msg', '')
self.run = kwargs.get('run', self.run)
self._add_command()
def __str__(self):
return self.cmd
def run(self, msg):
pass
def _enable_ops(self, ops_msg):
self.ops = True
self.ops_msg = ops_msg
def _update_plugin(self, **kwargs):
self.help_text = kwargs.get('help_text', 'N/A')
self.run = kwargs.get('run', self.run)
def _add_command(self):
cmds[self.cmd] = self
plugins[self.cmd] = self
def action(msg):
return Output(OutputType.Action, msg)
def message(msg):
return Output(OutputType.Message, msg)
def _add_command(command, help_text, func, ops=False, ops_msg=''):
if command not in cmds:
Command(command, help_text=help_text, ops=ops, ops_msg=ops_msg, run=func)
else:
cmds[command]._update_plugin(help_text=help_text, run=func)
def _ops_plugin(command, ops_msg, func):
if command not in cmds:
Command(command, ops=True, ops_msg=ops_msg)
else:
cmds[command]._enable_ops(ops_msg)
def _add_listener(name, func):
Listener(name, run=func)
def clear_plugins():
cmds.clear()
lstnrs.clear()
def load_plugins(plugin_dir, use_prefix=False, cmd_prefix='!'):
# i'm not sure why i need this but i do
global cmds
global plugins
global lstnrs
#check for all the disabled plugins so that we don't re-enable them
disabled_plugins = [i for i in plugins if not plugins[i].enabled]
logger.debug(disabled_plugins)
# clear plugin list to ensure no old plugins remain
logger.info('clearing plugin cache')
clear_plugins()
# ensure plugin folder exists
logger.info('checking plugin directory')
if not os.path.exists(plugin_dir):
logger.info('plugin directory {} not found, creating'.format(plugin_dir))
os.makedirs(plugin_dir)
# load all plugins
for m in os.listdir(plugin_dir):
if m.endswith('.py'):
try:
name = m[:-3]
logger.info('loading plugin {}'.format(name))
spec = importlib.machinery.PathFinder().find_spec(name, [plugin_dir])
spec.loader.load_module()
except Exception:
logger.exception('could not load plugin')
# gather all commands and listeners
if use_prefix: # use prefixes if needed
cmds = {cmd_prefix + k: v for k,v in cmds.items()}
for p in plugins:
if p in disabled_plugins:
plugins[p].disable()
for cmd in cmds:
logger.debug('adding command {}'.format(cmd))
for lstnr in lstnrs:
logger.debug('adding listener {}'.format(lstnr))
def command(command, help_text='N/A', ops=False, ops_msg=''):
@wraps(command)
def register_for_command(func):
_add_command(command, help_text, func, ops=ops, ops_msg=ops_msg)
return func
return register_for_command
def register(command, help_text='N/A'):
logger.warn('@register decorator has been deprecated in favor of @command. This will cause errors in future versions.')
@wraps(command)
def register_for_command(func):
_add_command(command, help_text, func)
return func
return register_for_command
def listener(name):
def register_as_listener(func):
_add_listener(name, func)
return func
return register_as_listener
def ops(command, msg=None):
logger.warn('use of the @ops decorator has been deprecated in favor of using the @command decorator with the ops and ops_msg options. Use will cause errors in future versions.')
@wraps(command)
def register_ops_command(func):
_ops_plugin(command, msg, func)
return func
return register_ops_command