import argparse import socket import yaml from time import sleep from random import randint from sys import exit class Util: """Utility functions.""" def yml(self, yml_file): """Open a YAML file and return a dictionary of values.""" try: fh = open(yml_file, "r") data = yaml.safe_load(fh) fh.close() except TypeError: exit("[debug][err] Cannot load YML file. Please check it exists.") return data def rand(self, lst): """Return a random item from a given list.""" return lst[randint(0, len(lst)-1)] def cli_flags(self): """Parse command line flags.""" self.argp = argparse.ArgumentParser() self.argp.add_argument("-c", "--config", help="Config file") return self.argp.parse_args() class IRC: """Methods for basic IRC communication.""" def config(self, def_conf): """Load runtime settings from a YAML config file, and returns a dictionary of config values. Looks for the file in a runtime path or in the default location.""" self.util = Util() # Check for runtime config locatiion flags = self.util.cli_flags() if flags.config != "": cfg = self.util.yml(flags.config) else: cfg = self.util.yml(def_conf) self.server = (cfg["server"]["host"], cfg["server"]["port"]) self.channels = cfg["channels"] self.bot_nick = cfg["bot_nick"] self.req_prefix = cfg["req_prefix"] self.admin_user = cfg["admin"]["user"] self.admin_code = cfg["admin"]["code"] self.debug = cfg["debug"] return cfg def run(self, listen_hook): """A routine that connects to a server, joins channels, and attaches the request listener hook to a loop.""" self.connect(self.server, self.bot_nick) # Wait for server to reply before joining channels svr_greet = self.receive() while ("001 " + self.bot_nick) not in svr_greet: sleep(1) svr_greet = self.receive() self.join_channels(self.channels) while 1: data = self.receive() self.keepalive(data, self.bot_nick) self.msg = self.parse(data, self.req_prefix) for c in self.channels: # Pass in a context dict for handlers listen_hook({"msg": self.msg, "listen_chan": c}) def connect(self, server, bot_nick): """Connect to the server and sends user/nick information.""" try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(server) except ConnectionError as err: exit("[debug][err] " + str(err)) self.send("USER", bot_nick + " " + bot_nick + " " + \ bot_nick + " " + bot_nick) self.send("NICK", bot_nick) def disconnect(self, resp_msg, quit_msg): """Notify the admin user and disconnect from the server.""" self.send("PRIVMSG", resp_msg, recvr=self.admin_user) self.send("QUIT", ":" + quit_msg) # Currently only one server per app instance is supported, so # disconnect also exits the app exit("Shutting down ...") def keepalive(self, line, bot_nick): """Stay connected to a server by responding to server pings.""" if line.split(" ")[0] == "PING": resp = line.replace("PING", "PONG", 1) if self.debug: print("[debug][send] " + resp) self.sock.sendall(bytes(resp + "\r\n", "utf-8")) # Fallback to ensure process exits if timeout occurs elif (bot_nick in line.split(" ")[0]) and \ ("QUIT :Ping timeout:" in line): exit("[debug][err] Ping timeout, exited.") def join_channels(self, channels): """Join channels given a list of channel names.""" for c in channels: if c.strip() != "" or c.strip() != "#": self.send("JOIN", c) def send(self, command, text, *args, **kwargs): """Send messages given the IRC command and text. Optionally specify a message recipient with `recvr=user`.""" recvr = kwargs.get("recvr", "") if recvr != "": recvr += " :" if self.debug: print("[debug][send] " + command + " " + recvr + text) bs = bytes(command + " " + recvr + text + "\r\n", "utf-8") try: self.sock.sendall(bs) except BrokenPipeError as err: if self.debug: print("[debug] " + str(err) + " at `" + \ bs.decode("utf-8").strip() + "`") pass def receive(self): """Get messages from the connected socket.""" data = self.sock.recv(4096).decode("utf-8").strip("\r\n") if self.debug: print("[debug][recv] " + data) return data def parse(self, line, req_prefix): """Using received data from a socket, extract the request, the nick and username of the requester, the channel where the request originated and return a dictionary of values.""" data = {"req": "", "req_chan": "", "nick": "", "user": ""} if (":" + req_prefix) in line: data["req"] = line.split("PRIVMSG", 1)[1].split(":" + \ req_prefix, 1)[1].strip() data["req_chan"] = line.split("PRIVMSG ", \ 1)[1].split(" :", 1)[0] data["nick"] = line.split("!~", 1)[0][1:] data["user"] = line.split("!~", 2)[0][0:].split("@", 1)[0] return data def listen(self, context, trigger, handler, *args, **kwargs): """Listen for a trigger and call the handler. It takes a context dictionary (to determine the channel and recipient), trigger string and corresponding handler function. Optional flags (chan, query, admin) can be used to specify whether a trigger is active in channels or by /msg. e.g. `chan=False, query=True` to make a trigger query-only. By default, it will listen in both channels and private messages.""" in_chan = kwargs.get("chan", True) in_query = kwargs.get("query", True) in_admin = kwargs.get("admin", False) # Admin requests are query/pm only if in_admin: in_chan = False msg = context["msg"] channel = context["listen_chan"] # Responses are sent via pm to the user by default, while requests made # in a channel are sent to the channel. While it's possible to override # the recvr, it usually easier to enable a trigger in the same location # where the response will be sent. context["recvr"] = msg["user"] if msg["req"] == trigger: # Respond only in the channel where the request was made if in_chan and channel == msg["req_chan"]: context["recvr"] = msg["req_chan"] handler(context) # Respond to query/pm elif in_query and msg["req_chan"] == self.bot_nick: handler(context) # Respond only to the admin user elif in_admin and msg["user"].lower() == \ self.admin_user.lower() and self.admin_code in msg["req"]: handler(context) def reply(self, cxt, text): """Alias of send() with PRIVMSG command preset.""" self.send("PRIVMSG", text, recvr=cxt["recvr"])