bunch of refactoring i guess

This commit is contained in:
= 2025-04-04 06:54:31 +00:00
parent ea79b23b28
commit 5f0c0b2182
2 changed files with 165 additions and 172 deletions

307
bot.py
View File

@ -3,8 +3,8 @@ from time import sleep, time
from json import load, dump
import re
channel_re = re.compile(r"PRIVMSG (#*\w+)")
name_re = re.compile(r"^:([^!]*)!")
privmsg_channel_re = re.compile(r"PRIVMSG (#*\w+)")
nick_re = re.compile(r"^:([^!]*)!")
# timeout = 86400 # 24 hours
timeout = 15
@ -15,171 +15,172 @@ helptext = "i am a bot by ~nebula. i try to make it easier for users to discover
helptext_short = "see https://git.tilde.town/nebula/chatterbot for instructions"
class IRCBot():
def __init__(self):
try:
with open("config.json", "r") as f:
self.config = load(f)
except FileNotFoundError:
exit("no config.json")
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((host, port))
self.nick = self.config["nick"]
self.sendline(f"NICK {self.nick}")
self.sendline(f"USER {self.nick} 0 * :{self.config['realname']}")
for channel in self.config["channels"]:
self.sendline(f"JOIN {channel}")
self.commands = [
("invite", self.invite),
("kick", self.kick)
]
def __init__(self):
try:
with open("config.json", "r") as f:
self.state = load(f)
except FileNotFoundError:
exit("no config.json")
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((host, port))
self.nick = self.state["nick"]
self.send_raw_line(f"NICK {self.nick}")
self.send_raw_line(f"USER {self.nick} 0 * :{self.state['realname']}")
for channel in self.state["channels"]:
self.send_raw_line(f"JOIN {channel}")
self.commands = [
("invite", self.invite),
("kick", self.kick)
]
def write_config(self):
with open("config.json", "w") as f:
dump(self.config, f, indent=2)
def write_state(self):
with open("state.json", "w") as f:
dump(self.state, f, indent=2)
def join_channel(self, channel):
self.sendline(f"JOIN {channel}")
self.config["channels"].append(channel)
self.write_config()
def join_channel(self, channel):
self.send_raw_line(f"JOIN {channel}")
self.state["channels"].append(channel)
self.write_state()
def part_channel(self, channel):
if channel == "#tildetown":
self.sendline("i will not leave #tildetown. want to block me? see https://git.tilde.town/nebula/chatterbot")
return
elif channel == "#bots":
self.sendline("i will not leave #bots. want to block me? see https://git.tilde.town/nebula/chatterbot")
return
self.sendline(f"PART {channel}")
self.config["channels"].remove(channel)
del self.config["times"][channel]
del self.config["counts"][channel]
self.write_config()
def part_channel(self, channel):
if channel in ("#tildetown", "#bots"):
self.send_raw_line(f"i will not leave {channel}. want to block me? see https://git.tilde.town/nebula/chatterbot")
return
self.send_raw_line(f"PART {channel}")
self.state["channels"].remove(channel)
del self.state["times"][channel]
del self.state["counts"][channel]
self.write_state()
def sendline(self, line):
if line:
return self.s.send(bytes(f"{line}\r\n", "UTF-8"))
return None
def send_raw_line(self, line):
if line:
return self.s.send(bytes(f"{line}\r\n", "UTF-8"))
return
def send(self, channel, content):
if isinstance(content, list):
for line in content:
self.sendline(f"PRIVMSG {channel} :{line}")
sleep(0.5)
elif isinstance(content, str):
self.sendline(f"PRIVMSG {channel} :{content}")
def send(self, channel, content):
if isinstance(content, list):
for line in content:
self.send_raw_line(f"PRIVMSG {channel} :{line}")
sleep(0.5)
elif isinstance(content, str):
self.send_raw_line(f"PRIVMSG {channel} :{content}")
def help(_, __):
return helptext
def help(_, __):
return helptext
def invite(self, _, arguments):
if not arguments:
return helptext_short
lines = []
for channel in arguments:
if not channel.startswith("#"):
lines.append("channel name must start with #")
continue
elif channel in self.config["channels"]:
lines.append(f"i am already in {channel}!")
else:
self.join_channel(channel)
lines.append(f"i have (allegedly) joined {channel}")
return lines
def invite(self, _, arguments):
if not arguments:
return helptext_short
lines = []
for channel in arguments:
if not channel.startswith("#"):
lines.append("channel name must start with #")
continue
elif channel in self.state["channels"]:
lines.append(f"i am already in {channel}!")
else:
self.join_channel(channel)
lines.append(f"i have (allegedly) joined {channel}")
return lines
def kick(self, channel, arguments):
if not arguments:
self.part_channel(channel)
return None
for channel in arguments:
self.part_channel(channel)
def kick(self, channel, arguments):
if not arguments:
self.part_channel(channel)
return
for channel in arguments:
self.part_channel(channel)
def check_time(self, channel):
try:
this_time = self.config["times"][channel]
except KeyError:
this_time = self.config["times"][channel] = time()
self.write_config()
return this_time
def check_time(self, channel):
try:
this_time = self.state["times"][channel]
except KeyError:
this_time = self.state["times"][channel] = time()
self.write_state()
return this_time
def set_time(self, channel, this_time):
self.config["times"][channel] = this_time
def set_time(self, channel, this_time):
self.state["times"][channel] = this_time
def counter(self, channel):
try:
self.config["counts"][channel] += 1
value = self.config["counts"][channel]
except KeyError:
value = self.config["counts"][channel] = 1
self.write_config()
return value
def counter(self, channel):
try:
self.state["counts"][channel] += 1
value = self.state["counts"][channel]
except KeyError:
value = self.state["counts"][channel] = 1
self.write_state()
return value
def reset_count(self, channel):
self.config["counts"][channel] = 0
self.write_config()
def reset_count(self, channel):
self.state["counts"][channel] = 0
self.write_state()
def command_loop(self):
while True:
char = self.s.recv(1)
if not char:
exit(f"{self.nick}: no response from IRC server")
line = b""
while char != b"\n":
if char != b"\r":
line += char
char = self.s.recv(1)
line = line.decode("UTF-8").strip()
if line.startswith("PING"):
pong = "PONG " + line[5:]
self.sendline(pong)
continue
channel_search = channel_re.search(line)
if not channel_search:
continue
channel = channel_search.group(1)
name_search = name_re.search(line)
if name_search:
name = name_search.group(1)
else:
name = None
if name and not channel.startswith("#"):
channel = name
try:
message_body = line[line.index(" :") + 2:]
except (IndexError, ValueError):
message_body = ""
if message_body:
if message_body.startswith("!rollcall"):
self.send(channel, helptext)
continue
elif message_body.startswith("!chatterbot"):
arguments = message_body.strip().lower()[11:]
if not arguments:
self.send(channel, helptext)
continue
arguments = arguments.split()
for command, callback in self.commands:
if command not in arguments:
continue
self.send(channel, callback(channel, arguments[1:]))
else:
if channel in ("#tildetown", "#bots"):
continue
channel_time = self.check_time(channel)
now = time()
count = self.counter(channel)
delta = now - channel_time
if delta > timeout and count < messages_within_timeout:
self.reset_count(channel)
self.send("#bots", f"i hear activity in {channel}...")
elif count < messages_within_timeout and delta > timeout:
self.reset_count
self.set_time(channel, now)
def command_loop(self):
while True:
char = self.s.recv(1)
if not char:
exit(f"{self.nick}: no response from IRC server")
line = b""
while char != b"\n":
if char != b"\r":
line += char
char = self.s.recv(1)
line = line.decode("UTF-8").strip()
if line.startswith("PING"):
pong = "PONG " + line[5:]
self.send_raw_line(pong)
continue
privmsg_channel_search = privmsg_channel_re.search(line)
if not privmsg_channel_search:
continue
channel = privmsg_channel_search.group(1)
nick_search = nick_re.search(line)
if nick_search:
nick = nick_search.group(1)
else:
nick = None
if nick and not channel.startswith("#"):
channel = nick
try:
message_body = line[line.index(" :") + 2:]
except (IndexError, ValueError):
message_body = ""
if message_body:
if message_body.startswith("!rollcall") or message_body.startswith("!help"):
self.send(channel, helptext)
continue
elif message_body.startswith("!chatterbot"):
arguments = message_body.strip().lower()[11:]
if not arguments:
self.send(channel, helptext)
continue
arguments = arguments.split()
for command, callback in self.commands:
if command not in arguments:
continue
self.send(channel, callback(channel, arguments[1:]))
else:
if channel in ("#tildetown", "#bots"):
continue
# i have not figured out this part yet
# channel_time = self.check_time(channel)
# now = time()
# count = self.counter(channel)
# delta = now - channel_time
# if delta > timeout:
# if count < messages_within_timeout:
# self.send("#bots", f"i hear activity in {channel}...")
# self.reset_count(channel)
# elif and channel_time :
# self.reset_count
# self.set_time(channel, now)
if __name__ == "__main__":
bot = IRCBot()
try:
bot.command_loop()
except KeyboardInterrupt:
exit()
bot = IRCBot()
try:
bot.command_loop()
except KeyboardInterrupt:
exit()

View File

@ -1,8 +0,0 @@
{
"nick": "chatterbot",
"realname": "a bot by ~nebula",
"channels": ["#bots"],
"times": {},
"counts": {}
}