chatterbot/bot.py

176 lines
6.3 KiB
Python

import socket
from time import sleep, time
from json import load, dump
import re
privmsg_channel_re = re.compile(r"PRIVMSG (#*\w+)")
nick_re = re.compile(r"^:([^!]*)!")
default_timeout = 43200 # 12 hours in seconds
host = "localhost"
port = 6667
helptext = "i am a bot by ~nebula. i try to make it easier for users to discover new and more obscure channels on irc. instructions for use (or to block me from showing messages in your client) are available at https://git.tilde.town/nebula/chatterbot"
helptext_short = "see https://git.tilde.town/nebula/chatterbot for instructions"
class IRCBot():
def __init__(self):
try:
with open("state.json", "r") as f:
self.state = load(f)
except FileNotFoundError:
exit("no state.json")
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((host, port))
self.nick = self.state["nick"]
self.send_raw_line(f"NICK {self.nick}")
self.send_raw_line(f"USER {self.nick} 0 * :{self.state['realname']}")
for channel in self.state["channels"]:
self.send_raw_line(f"JOIN {channel}")
self.commands = [
("invite", self.invite),
("kick", self.kick)
]
def write_state(self):
with open("state.json", "w") as f:
dump(self.state, f, indent=2)
def join_channel(self, channel):
self.send_raw_line(f"JOIN {channel}")
self.state["channels"].append(channel)
self.write_state()
def part_channel(self, channel):
if channel in ("#tildetown", "#bots"):
self.send_raw_line(f"i will not leave {channel}. want to block me? see https://git.tilde.town/nebula/chatterbot")
return
self.send_raw_line(f"PART {channel}")
self.state["channels"].remove(channel)
del self.state["times"][channel]
del self.state["counts"][channel]
self.write_state()
def send_raw_line(self, line):
if line:
return self.s.send(bytes(f"{line}\r\n", "UTF-8"))
return
def send(self, channel, content):
if isinstance(content, list):
for line in content:
self.send_raw_line(f"PRIVMSG {channel} :{line}")
sleep(0.5)
elif isinstance(content, str):
self.send_raw_line(f"PRIVMSG {channel} :{content}")
def invite(self, _, arguments):
if not arguments:
return helptext_short
lines = []
for channel in arguments:
if not channel.startswith("#"):
lines.append("channel name must start with #")
continue
elif channel in self.state["channels"]:
lines.append(f"i am already in {channel}!")
else:
self.join_channel(channel)
lines.append(f"i have (allegedly) joined {channel}")
return lines
def kick(self, channel, arguments):
if not arguments:
self.part_channel(channel)
return
for channel in arguments:
self.part_channel(channel)
def check_time(self, channel):
try:
this_time = self.state["times"][channel]
except KeyError:
this_time = self.state["times"][channel] = time()
self.write_state()
return this_time
def set_time(self, channel, this_time):
self.state["times"][channel] = this_time
def set_timeout(self, channel, arguments):
timeout = int(timeout_hours * 60 * 60)
self.state["timeouts"][channel] = timeout
self.write_state()
return f"channel timeout set to {timeout_hours} hours"
def get_timeout(self, channel):
try:
self.state["timeouts"][channel]
except KeyError:
self.state["timeouts"][channel] = default_timeout
self.write_state()
def command_loop(self):
while True:
char = self.s.recv(1)
if not char:
exit(f"{self.nick}: no response from IRC server")
line = b""
while char != b"\n":
if char != b"\r":
line += char
char = self.s.recv(1)
line = line.decode("UTF-8").strip()
if line.startswith("PING"):
pong = "PONG " + line[5:]
self.send_raw_line(pong)
continue
privmsg_channel_search = privmsg_channel_re.search(line)
if not privmsg_channel_search:
continue
channel = privmsg_channel_search.group(1)
nick_search = nick_re.search(line)
if nick_search:
nick = nick_search.group(1)
else:
nick = None
if nick and not channel.startswith("#"):
channel = nick
try:
message_body = line[line.index(" :") + 2:]
except (IndexError, ValueError):
message_body = ""
if message_body:
if message_body.startswith("!rollcall") or message_body.startswith("!help"):
self.send(channel, helptext)
continue
elif message_body.startswith("!chatterbot"):
arguments = message_body.strip().lower()[11:]
if not arguments:
self.send(channel, helptext)
continue
arguments = arguments.split()
for command, callback in self.commands:
if command not in arguments:
continue
self.send(channel, callback(channel, arguments[1:]))
else:
if channel in ("#tildetown", "#bots"):
continue
channel_time = self.check_time(channel)
now = time()
channel_timeout = self.get_timeout(channel)
delta = now - channel_time
if delta > channel_timeout:
self.send("#bots", f"i hear activity in {channel}...")
self.set_time(channel, now)
if __name__ == "__main__":
bot = IRCBot()
try:
bot.command_loop()
except KeyboardInterrupt:
exit()