itte/itte.py

165 lines
6.5 KiB
Python

import socket
import yaml
from time import sleep
from random import randint
from sys import exit
class Util:
"""Utility functions."""
def yml(self, yml_file):
"Open a YAML file and return a dictionary of values."
fh = open(yml_file, "r")
data = yaml.safe_load(fh)
fh.close()
return data
def rand(self, lst):
"""Return a random item from a given list."""
return lst[randint(0, len(lst)-1)]
class IRC:
"""Methods for basic IRC communication."""
def config(self, conf):
"""Load runtime settings from a YAML config file, and returns a
dictionary of config values."""
self.util = Util()
cfg = self.util.yml(conf)
self.server = (cfg["server"]["host"], cfg["server"]["port"])
self.channels = cfg["channels"]
self.bot_nick = cfg["bot_nick"]
self.req_prefix = cfg["req_prefix"]
self.admin_user = cfg["admin"]["user"]
self.admin_code = cfg["admin"]["code"]
self.debug = cfg["debug"]
return cfg
def run(self, listen_hook):
"""A routine that connects to a server, joins channels, and attaches
the request listener hook to a loop."""
self.connect(self.server, self.bot_nick)
self.join_channels(self.channels)
while 1:
data = self.receive()
self.keepalive(data, self.bot_nick)
self.msg = self.parse(data, self.req_prefix)
for c in self.channels:
# Pass in a context dict for handlers
listen_hook({"msg": self.msg, "listen_chan": c})
def connect(self, server, bot_nick):
"""Connect to the server and sends user/nick information."""
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(server)
except ConnectionError as err:
exit("[debug][err] " + str(err))
self.send("USER", bot_nick + " " + bot_nick + " " + \
bot_nick + " " + bot_nick)
self.send("NICK", bot_nick)
def disconnect(self, resp_msg, quit_msg):
"""Notify the admin user and disconnect from the server."""
self.send("PRIVMSG", resp_msg, recvr=self.admin_user)
self.send("QUIT", ":" + quit_msg)
# Currently only one server per app instance is supported, so
# disconnect also exits the app
exit("Shutting down ...")
def keepalive(self, line, bot_nick):
"""Stay connected to a server by responding to server pings."""
if line.split(" ")[0] == "PING":
resp = line.replace("PING", "PONG", 1)
if self.debug:
print("[debug][send] " + resp)
self.sock.sendall(bytes(resp + "\r\n", "utf-8"))
# Fallback to ensure process exits if timeout occurs
elif (bot_nick in line.split(" ")[0]) and \
("QUIT :Ping timeout:" in line):
exit("[debug][err] Ping timeout, exited.")
def join_channels(self, channels):
"""Join channels given a list of channel names."""
for c in channels:
if c.strip() != "" or c.strip() != "#":
self.send("JOIN", c)
def send(self, command, text, *args, **kwargs):
"""Send messages given the IRC command and text. Optionally specify a
message recipient with `recvr=user`."""
recvr = kwargs.get("recvr", "")
if recvr != "":
recvr += " :"
if self.debug:
print("[debug][send] " + command + " " + recvr + text)
bs = bytes(command + " " + recvr + text + "\r\n", "utf-8")
try:
self.sock.sendall(bs)
except BrokenPipeError as err:
if self.debug:
print("[debug] " + str(err) + " at `" + \
bs.decode("utf-8").strip() + "`")
pass
def receive(self):
"""Get messages from the connected socket."""
data = self.sock.recv(4096).decode("utf-8").strip("\r\n")
if self.debug:
print("[debug][recv] " + data)
return data
def parse(self, line, req_prefix):
"""Using received data from a socket, extract the request, the nick and
username of the requester, the channel where the request originated and
return a dictionary of values."""
data = {"req": "", "req_chan": "", "nick": "", "user": ""}
if (":" + req_prefix) in line:
data["req"] = line.split("PRIVMSG", 1)[1].split(":" + \
req_prefix, 1)[1].strip()
data["req_chan"] = line.split("PRIVMSG ", \
1)[1].split(" :", 1)[0]
data["nick"] = line.split("!~", 1)[0][1:]
data["user"] = line.split("!~", 2)[1][0:].split("@", 1)[0]
return data
def listen(self, context, trigger, handler, *args, **kwargs):
"""Listen for a trigger and call the handler. It takes a context
dictionary (to determine the channel and recipient), trigger string and
corresponding handler function. Optional flags (chan, query, admin) can
be used to specify whether a trigger is active in channels or by
/msg. e.g. `chan=False, query=True` to make a trigger query-only. By
default, it will listen in both channels and private messages."""
in_chan = kwargs.get("chan", True)
in_query = kwargs.get("query", True)
in_admin = kwargs.get("admin", False)
# Admin requests are query/pm only
if in_admin:
in_chan = False
msg = context["msg"]
channel = context["listen_chan"]
# Responses are sent via pm to the user by default, while requests made
# in a channel are sent to the channel. While it's possible to override
# the recvr, it usually easier to enable a trigger in the same location
# where the response will be sent.
context["recvr"] = msg["user"]
if msg["req"] == trigger:
# Respond only in the channel where the request was made
if in_chan and channel == msg["req_chan"]:
context["recvr"] = msg["req_chan"]
handler(context)
# Respond to query/pm
elif in_query and msg["req_chan"] == self.bot_nick:
handler(context)
# Respond only to the admin user
elif in_admin and msg["user"].lower() == \
self.admin_user.lower() and self.admin_code in msg["req"]:
handler(context)
def reply(self, cxt, text):
"""Alias of send() with PRIVMSG command preset."""
self.send("PRIVMSG", text, recvr=cxt["recvr"])