181 lines
6.4 KiB
Python
181 lines
6.4 KiB
Python
import socket
|
|
from time import sleep, time
|
|
from json import load, dump
|
|
import re
|
|
|
|
channel_re = re.compile(r"PRIVMSG (#*\w+)")
|
|
name_re = re.compile(r"^:([^!]*)!")
|
|
|
|
# timeout = 86400 # 24 hours
|
|
timeout = 15
|
|
messages_within_timeout = 5
|
|
host = "localhost"
|
|
port = 6667
|
|
helptext = "i am a bot by ~nebula. i try to make it easier for users to discover new and more obscure channels on irc. instructions for use (or to block me from showing messages in your client) are available at https://git.tilde.town/nebula/chatterbot"
|
|
helptext_short = "see https://git.tilde.town/nebula/chatterbot for instructions"
|
|
|
|
class IRCBot():
|
|
def __init__(self):
|
|
try:
|
|
with open("config.json", "r") as f:
|
|
self.config = load(f)
|
|
except FileNotFoundError:
|
|
exit("no config.json")
|
|
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.s.connect((host, port))
|
|
self.nick = self.config["nick"]
|
|
self.sendline(f"NICK {self.nick}")
|
|
self.sendline(f"USER {self.nick} 0 * :{self.config['realname']}")
|
|
for channel in self.config["channels"]:
|
|
self.sendline(f"JOIN {channel}")
|
|
self.commands = [
|
|
("invite", self.invite),
|
|
("kick", self.kick)
|
|
]
|
|
|
|
def write_config(self):
|
|
with open("config.json", "w") as f:
|
|
dump(self.config, f, indent=2)
|
|
|
|
def join_channel(self, channel):
|
|
self.sendline(f"JOIN {channel}")
|
|
self.config["channels"].append(channel)
|
|
self.write_config()
|
|
|
|
def part_channel(self, channel):
|
|
if channel == "#tildetown":
|
|
self.sendline("i will not leave #tildetown. want to block me? see https://git.tilde.town/nebula/chatterbot")
|
|
return
|
|
elif channel == "#bots":
|
|
self.sendline("i will not leave #bots. want to block me? see https://git.tilde.town/nebula/chatterbot")
|
|
return
|
|
self.sendline(f"PART {channel}")
|
|
self.config["channels"].remove(channel)
|
|
del self.config["times"][channel]
|
|
del self.config["counts"][channel]
|
|
self.write_config()
|
|
|
|
def sendline(self, line):
|
|
if line:
|
|
return self.s.send(bytes(f"{line}\r\n", "UTF-8"))
|
|
return None
|
|
|
|
def send(self, channel, content):
|
|
if isinstance(content, list):
|
|
for line in content:
|
|
self.sendline(f"PRIVMSG {channel} :{line}")
|
|
sleep(0.5)
|
|
elif isinstance(content, str):
|
|
self.sendline(f"PRIVMSG {channel} :{content}")
|
|
|
|
def help(_, __):
|
|
return helptext
|
|
|
|
def invite(self, _, arguments):
|
|
if not arguments:
|
|
return helptext_short
|
|
lines = []
|
|
for channel in arguments:
|
|
if not channel.startswith("#"):
|
|
lines.append("channel name must start with #")
|
|
continue
|
|
elif channel in self.config["channels"]:
|
|
lines.append(f"i am already in {channel}!")
|
|
else:
|
|
self.join_channel(channel)
|
|
lines.append(f"i have (allegedly) joined {channel}")
|
|
return lines
|
|
|
|
def kick(self, channel, arguments):
|
|
if not arguments:
|
|
self.part_channel(channel)
|
|
return None
|
|
for channel in arguments:
|
|
self.part_channel(channel)
|
|
|
|
def check_time(self, channel):
|
|
try:
|
|
this_time = self.config["times"][channel]
|
|
except KeyError:
|
|
this_time = time()
|
|
self.config["times"][channel] = this_time
|
|
self.write_config()
|
|
return this_time
|
|
|
|
def set_time(self, channel, this_time):
|
|
self.config["times"][channel] = this_time
|
|
|
|
def counter(self, channel):
|
|
try:
|
|
self.config["counts"][channel] += 1
|
|
value = self.config["counts"][channel]
|
|
except KeyError:
|
|
value = self.config["counts"][channel] = 1
|
|
self.write_config()
|
|
return value
|
|
|
|
def command_loop(self):
|
|
while True:
|
|
char = self.s.recv(1)
|
|
if not char:
|
|
exit(f"{self.nick}: no response from IRC server")
|
|
line = b""
|
|
while char != b"\n":
|
|
if char != b"\r":
|
|
line += char
|
|
char = self.s.recv(1)
|
|
line = line.decode("UTF-8").strip()
|
|
if line.startswith("PING"):
|
|
pong = "PONG " + line[5:]
|
|
self.sendline(pong)
|
|
continue
|
|
channel_search = channel_re.search(line)
|
|
if not channel_search:
|
|
continue
|
|
channel = channel_search.group(1)
|
|
name_search = name_re.search(line)
|
|
if name_search:
|
|
name = name_search.group(1)
|
|
else:
|
|
name = None
|
|
if name and not channel.startswith("#"):
|
|
channel = name
|
|
try:
|
|
message_body = line[line.index(" :") + 2:]
|
|
except (IndexError, ValueError):
|
|
message_body = ""
|
|
if message_body:
|
|
if message_body.startswith("!rollcall"):
|
|
self.send(channel, helptext)
|
|
continue
|
|
elif message_body.startswith("!chatterbot"):
|
|
arguments = message_body.strip().lower()[11:]
|
|
if not arguments:
|
|
self.send(channel, helptext)
|
|
continue
|
|
arguments = arguments.split()
|
|
for command, callback in self.commands:
|
|
if command not in arguments:
|
|
continue
|
|
self.send(channel, callback(channel, arguments[1:]))
|
|
else:
|
|
if channel in ("#tildetown", "#bots"):
|
|
continue
|
|
channel_time = self.check_time(channel)
|
|
now = time()
|
|
if now - channel_time > timeout and self.counter(channel) < messages_within_timeout:
|
|
self.send("#bots", f"i hear activity in {channel}...")
|
|
# self.send("#tildetown", f"i hear activity in {channel}...")
|
|
self.config["counts"][channel] = 0
|
|
self.write_config()
|
|
self.set_time(channel, now)
|
|
|
|
if __name__ == "__main__":
|
|
bot = IRCBot()
|
|
try:
|
|
bot.command_loop()
|
|
except KeyboardInterrupt:
|
|
exit()
|
|
|
|
|