- Add cli flag for specifying config file (This allows running multiple running instances) - Fix channel joins by waiting for server reply (thanks benharri) - Remove toot commands from help message (unannounced feature) - Update readme
		
			
				
	
	
		
			186 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			186 lines
		
	
	
		
			7.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import argparse
 | |
| import socket
 | |
| import yaml
 | |
| from time import sleep
 | |
| from random import randint
 | |
| from sys import exit
 | |
| 
 | |
| 
 | |
| class Util:
 | |
|     """Utility functions."""
 | |
| 
 | |
|     def yml(self, yml_file):
 | |
|         """Open a YAML file and return a dictionary of values."""
 | |
|         try:
 | |
|             fh = open(yml_file, "r")
 | |
|             data = yaml.safe_load(fh)
 | |
|             fh.close()
 | |
|         except TypeError:
 | |
|             exit("[debug][err] Cannot load YML file. Please check it exists.")
 | |
|         return data
 | |
| 
 | |
|     def rand(self, lst):
 | |
|         """Return a random item from a given list."""
 | |
|         return lst[randint(0, len(lst)-1)]
 | |
| 
 | |
|     def cli_flags(self):
 | |
|         """Parse command line flags."""
 | |
|         self.argp = argparse.ArgumentParser()
 | |
|         self.argp.add_argument("-c", "--config", help="Config file")
 | |
|         return self.argp.parse_args()
 | |
| 
 | |
| 
 | |
| class IRC:
 | |
|     """Methods for basic IRC communication."""
 | |
| 
 | |
|     def config(self, def_conf):
 | |
|         """Load runtime settings from a YAML config file, and returns a
 | |
|         dictionary of config values. Looks for the file in a runtime path or in
 | |
|         the default location."""
 | |
|         self.util = Util()
 | |
|         # Check for runtime config locatiion
 | |
|         flags = self.util.cli_flags()
 | |
|         if flags.config != "":
 | |
|             cfg = self.util.yml(flags.config)
 | |
|         else:
 | |
|             cfg = self.util.yml(def_conf)
 | |
|         self.server = (cfg["server"]["host"], cfg["server"]["port"])
 | |
|         self.channels = cfg["channels"]
 | |
|         self.bot_nick = cfg["bot_nick"]
 | |
|         self.req_prefix = cfg["req_prefix"]
 | |
|         self.admin_user = cfg["admin"]["user"]
 | |
|         self.admin_code = cfg["admin"]["code"]
 | |
|         self.debug = cfg["debug"]
 | |
|         return cfg
 | |
| 
 | |
|     def run(self, listen_hook):
 | |
|         """A routine that connects to a server, joins channels, and attaches
 | |
|         the request listener hook to a loop."""
 | |
|         self.connect(self.server, self.bot_nick)
 | |
|         # Wait for server to reply before joining channels
 | |
|         svr_greet = self.receive()
 | |
|         while ("001 " + self.bot_nick) not in svr_greet:
 | |
|             sleep(1)
 | |
|             svr_greet = self.receive()
 | |
|         self.join_channels(self.channels)
 | |
|         while 1:
 | |
|             data = self.receive()
 | |
|             self.keepalive(data, self.bot_nick)
 | |
|             self.msg = self.parse(data, self.req_prefix)
 | |
|             for c in self.channels:
 | |
|                 # Pass in a context dict for handlers
 | |
|                 listen_hook({"msg": self.msg, "listen_chan": c})
 | |
| 
 | |
|     def connect(self, server, bot_nick):
 | |
|         """Connect to the server and sends user/nick information."""
 | |
|         try:
 | |
|             self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | |
|             self.sock.connect(server)
 | |
|         except ConnectionError as err:
 | |
|             exit("[debug][err] " + str(err))
 | |
|         self.send("USER", bot_nick + " " + bot_nick + " " + \
 | |
|             bot_nick + " " + bot_nick)
 | |
|         self.send("NICK", bot_nick)
 | |
| 
 | |
|     def disconnect(self, resp_msg, quit_msg):
 | |
|         """Notify the admin user and disconnect from the server."""
 | |
|         self.send("PRIVMSG", resp_msg, recvr=self.admin_user)
 | |
|         self.send("QUIT", ":" + quit_msg)
 | |
|         # Currently only one server per app instance is supported, so
 | |
|         # disconnect also exits the app
 | |
|         exit("Shutting down ...")
 | |
| 
 | |
|     def keepalive(self, line, bot_nick):
 | |
|         """Stay connected to a server by responding to server pings."""
 | |
|         if line.split(" ")[0] == "PING":
 | |
|             resp = line.replace("PING", "PONG", 1)
 | |
|             if self.debug:
 | |
|                 print("[debug][send] " + resp)
 | |
|             self.sock.sendall(bytes(resp + "\r\n", "utf-8"))
 | |
|         # Fallback to ensure process exits if timeout occurs
 | |
|         elif (bot_nick in line.split(" ")[0]) and \
 | |
|                 ("QUIT :Ping timeout:" in line):
 | |
|             exit("[debug][err] Ping timeout, exited.")
 | |
| 
 | |
|     def join_channels(self, channels):
 | |
|         """Join channels given a list of channel names."""
 | |
|         for c in channels:
 | |
|             if c.strip() != "" or c.strip() != "#":
 | |
|                 self.send("JOIN", c)
 | |
| 
 | |
|     def send(self, command, text, *args, **kwargs):
 | |
|         """Send messages given the IRC command and text. Optionally specify a
 | |
|         message recipient with `recvr=user`."""
 | |
|         recvr = kwargs.get("recvr", "")
 | |
|         if recvr != "":
 | |
|             recvr += " :"
 | |
|         if self.debug:
 | |
|             print("[debug][send] " + command + " " + recvr + text)
 | |
|         bs = bytes(command + " " + recvr + text + "\r\n", "utf-8")
 | |
|         try:
 | |
|             self.sock.sendall(bs)
 | |
|         except BrokenPipeError as err:
 | |
|             if self.debug:
 | |
|                 print("[debug] " + str(err) + " at `" + \
 | |
|                     bs.decode("utf-8").strip() + "`")
 | |
|             pass
 | |
| 
 | |
|     def receive(self):
 | |
|         """Get messages from the connected socket."""
 | |
|         data = self.sock.recv(4096).decode("utf-8").strip("\r\n")
 | |
|         if self.debug:
 | |
|             print("[debug][recv] " + data)
 | |
|         return data
 | |
| 
 | |
|     def parse(self, line, req_prefix):
 | |
|         """Using received data from a socket, extract the request, the nick and
 | |
|         username of the requester, the channel where the request originated and
 | |
|         return a dictionary of values."""
 | |
|         data = {"req": "", "req_chan": "", "nick": "", "user": ""}
 | |
|         if (":" + req_prefix) in line:
 | |
|             data["req"] = line.split("PRIVMSG", 1)[1].split(":" + \
 | |
|                 req_prefix, 1)[1].strip()
 | |
|             data["req_chan"] = line.split("PRIVMSG ", \
 | |
|                 1)[1].split(" :", 1)[0]
 | |
|             data["nick"] = line.split("!~", 1)[0][1:]
 | |
|             data["user"] = line.split("!~", 2)[0][0:].split("@", 1)[0]
 | |
|         return data
 | |
| 
 | |
|     def listen(self, context, trigger, handler, *args, **kwargs):
 | |
|         """Listen for a trigger and call the handler. It takes a context
 | |
|         dictionary (to determine the channel and recipient), trigger string and
 | |
|         corresponding handler function. Optional flags (chan, query, admin) can
 | |
|         be used to specify whether a trigger is active in channels or by
 | |
|         /msg. e.g. `chan=False, query=True` to make a trigger query-only. By
 | |
|         default, it will listen in both channels and private messages."""
 | |
|         in_chan = kwargs.get("chan", True)
 | |
|         in_query = kwargs.get("query", True)
 | |
|         in_admin = kwargs.get("admin", False)
 | |
|         # Admin requests are query/pm only
 | |
|         if in_admin:
 | |
|             in_chan = False
 | |
|         msg = context["msg"]
 | |
|         channel = context["listen_chan"]
 | |
|         # Responses are sent via pm to the user by default, while requests made
 | |
|         # in a channel are sent to the channel. While it's possible to override
 | |
|         # the recvr, it usually easier to enable a trigger in the same location
 | |
|         # where the response will be sent.
 | |
|         context["recvr"] = msg["user"]
 | |
| 
 | |
|         if msg["req"] == trigger:
 | |
|             # Respond only in the channel where the request was made
 | |
|             if in_chan and channel == msg["req_chan"]:
 | |
|                 context["recvr"] = msg["req_chan"]
 | |
|                 handler(context)
 | |
|             # Respond to query/pm
 | |
|             elif in_query and msg["req_chan"] == self.bot_nick:
 | |
|                 handler(context)
 | |
|             # Respond only to the admin user
 | |
|             elif in_admin and msg["user"].lower() == \
 | |
|                 self.admin_user.lower() and self.admin_code in msg["req"]:
 | |
|                     handler(context)
 | |
| 
 | |
|     def reply(self, cxt, text):
 | |
|         """Alias of send() with PRIVMSG command preset."""
 | |
|         self.send("PRIVMSG", text, recvr=cxt["recvr"])
 |