From 5f0c0b21829f4b9803d9de217483c9f03687730d Mon Sep 17 00:00:00 2001 From: = <=> Date: Fri, 4 Apr 2025 06:54:31 +0000 Subject: [PATCH] bunch of refactoring i guess --- bot.py | 329 ++++++++++++++++++++++---------------------- config.json.example | 8 -- 2 files changed, 165 insertions(+), 172 deletions(-) delete mode 100644 config.json.example diff --git a/bot.py b/bot.py index bc2523b..63a451f 100644 --- a/bot.py +++ b/bot.py @@ -3,8 +3,8 @@ from time import sleep, time from json import load, dump import re -channel_re = re.compile(r"PRIVMSG (#*\w+)") -name_re = re.compile(r"^:([^!]*)!") +privmsg_channel_re = re.compile(r"PRIVMSG (#*\w+)") +nick_re = re.compile(r"^:([^!]*)!") # timeout = 86400 # 24 hours timeout = 15 @@ -15,171 +15,172 @@ helptext = "i am a bot by ~nebula. i try to make it easier for users to discover helptext_short = "see https://git.tilde.town/nebula/chatterbot for instructions" class IRCBot(): - def __init__(self): - try: - with open("config.json", "r") as f: - self.config = load(f) - except FileNotFoundError: - exit("no config.json") - self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.s.connect((host, port)) - self.nick = self.config["nick"] - self.sendline(f"NICK {self.nick}") - self.sendline(f"USER {self.nick} 0 * :{self.config['realname']}") - for channel in self.config["channels"]: - self.sendline(f"JOIN {channel}") - self.commands = [ - ("invite", self.invite), - ("kick", self.kick) - ] - - def write_config(self): - with open("config.json", "w") as f: - dump(self.config, f, indent=2) - - def join_channel(self, channel): - self.sendline(f"JOIN {channel}") - self.config["channels"].append(channel) - self.write_config() - - def part_channel(self, channel): - if channel == "#tildetown": - self.sendline("i will not leave #tildetown. want to block me? see https://git.tilde.town/nebula/chatterbot") - return - elif channel == "#bots": - self.sendline("i will not leave #bots. want to block me? see https://git.tilde.town/nebula/chatterbot") - return - self.sendline(f"PART {channel}") - self.config["channels"].remove(channel) - del self.config["times"][channel] - del self.config["counts"][channel] - self.write_config() - - def sendline(self, line): - if line: - return self.s.send(bytes(f"{line}\r\n", "UTF-8")) - return None - - def send(self, channel, content): - if isinstance(content, list): - for line in content: - self.sendline(f"PRIVMSG {channel} :{line}") - sleep(0.5) - elif isinstance(content, str): - self.sendline(f"PRIVMSG {channel} :{content}") - - def help(_, __): - return helptext - - def invite(self, _, arguments): - if not arguments: - return helptext_short - lines = [] - for channel in arguments: - if not channel.startswith("#"): - lines.append("channel name must start with #") - continue - elif channel in self.config["channels"]: - lines.append(f"i am already in {channel}!") - else: - self.join_channel(channel) - lines.append(f"i have (allegedly) joined {channel}") - return lines - - def kick(self, channel, arguments): - if not arguments: - self.part_channel(channel) - return None - for channel in arguments: - self.part_channel(channel) - - def check_time(self, channel): - try: - this_time = self.config["times"][channel] - except KeyError: - this_time = self.config["times"][channel] = time() - self.write_config() - return this_time + def __init__(self): + try: + with open("config.json", "r") as f: + self.state = load(f) + except FileNotFoundError: + exit("no config.json") + self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.s.connect((host, port)) + self.nick = self.state["nick"] + self.send_raw_line(f"NICK {self.nick}") + self.send_raw_line(f"USER {self.nick} 0 * :{self.state['realname']}") + for channel in self.state["channels"]: + self.send_raw_line(f"JOIN {channel}") + self.commands = [ + ("invite", self.invite), + ("kick", self.kick) + ] - def set_time(self, channel, this_time): - self.config["times"][channel] = this_time + def write_state(self): + with open("state.json", "w") as f: + dump(self.state, f, indent=2) - def counter(self, channel): - try: - self.config["counts"][channel] += 1 - value = self.config["counts"][channel] - except KeyError: - value = self.config["counts"][channel] = 1 - self.write_config() - return value - - def reset_count(self, channel): - self.config["counts"][channel] = 0 - self.write_config() + def join_channel(self, channel): + self.send_raw_line(f"JOIN {channel}") + self.state["channels"].append(channel) + self.write_state() - def command_loop(self): - while True: - char = self.s.recv(1) - if not char: - exit(f"{self.nick}: no response from IRC server") - line = b"" - while char != b"\n": - if char != b"\r": - line += char - char = self.s.recv(1) - line = line.decode("UTF-8").strip() - if line.startswith("PING"): - pong = "PONG " + line[5:] - self.sendline(pong) - continue - channel_search = channel_re.search(line) - if not channel_search: - continue - channel = channel_search.group(1) - name_search = name_re.search(line) - if name_search: - name = name_search.group(1) - else: - name = None - if name and not channel.startswith("#"): - channel = name - try: - message_body = line[line.index(" :") + 2:] - except (IndexError, ValueError): - message_body = "" - if message_body: - if message_body.startswith("!rollcall"): - self.send(channel, helptext) - continue - elif message_body.startswith("!chatterbot"): - arguments = message_body.strip().lower()[11:] - if not arguments: - self.send(channel, helptext) - continue - arguments = arguments.split() - for command, callback in self.commands: - if command not in arguments: - continue - self.send(channel, callback(channel, arguments[1:])) - else: - if channel in ("#tildetown", "#bots"): - continue - channel_time = self.check_time(channel) - now = time() - count = self.counter(channel) - delta = now - channel_time - if delta > timeout and count < messages_within_timeout: - self.reset_count(channel) - self.send("#bots", f"i hear activity in {channel}...") - elif count < messages_within_timeout and delta > timeout: - self.reset_count - self.set_time(channel, now) + def part_channel(self, channel): + if channel in ("#tildetown", "#bots"): + self.send_raw_line(f"i will not leave {channel}. want to block me? see https://git.tilde.town/nebula/chatterbot") + return + self.send_raw_line(f"PART {channel}") + self.state["channels"].remove(channel) + del self.state["times"][channel] + del self.state["counts"][channel] + self.write_state() + + def send_raw_line(self, line): + if line: + return self.s.send(bytes(f"{line}\r\n", "UTF-8")) + return + + def send(self, channel, content): + if isinstance(content, list): + for line in content: + self.send_raw_line(f"PRIVMSG {channel} :{line}") + sleep(0.5) + elif isinstance(content, str): + self.send_raw_line(f"PRIVMSG {channel} :{content}") + + def help(_, __): + return helptext + + def invite(self, _, arguments): + if not arguments: + return helptext_short + lines = [] + for channel in arguments: + if not channel.startswith("#"): + lines.append("channel name must start with #") + continue + elif channel in self.state["channels"]: + lines.append(f"i am already in {channel}!") + else: + self.join_channel(channel) + lines.append(f"i have (allegedly) joined {channel}") + return lines + + def kick(self, channel, arguments): + if not arguments: + self.part_channel(channel) + return + for channel in arguments: + self.part_channel(channel) + + def check_time(self, channel): + try: + this_time = self.state["times"][channel] + except KeyError: + this_time = self.state["times"][channel] = time() + self.write_state() + return this_time + + def set_time(self, channel, this_time): + self.state["times"][channel] = this_time + + def counter(self, channel): + try: + self.state["counts"][channel] += 1 + value = self.state["counts"][channel] + except KeyError: + value = self.state["counts"][channel] = 1 + self.write_state() + return value + + def reset_count(self, channel): + self.state["counts"][channel] = 0 + self.write_state() + + def command_loop(self): + while True: + char = self.s.recv(1) + if not char: + exit(f"{self.nick}: no response from IRC server") + line = b"" + while char != b"\n": + if char != b"\r": + line += char + char = self.s.recv(1) + line = line.decode("UTF-8").strip() + if line.startswith("PING"): + pong = "PONG " + line[5:] + self.send_raw_line(pong) + continue + privmsg_channel_search = privmsg_channel_re.search(line) + if not privmsg_channel_search: + continue + channel = privmsg_channel_search.group(1) + nick_search = nick_re.search(line) + if nick_search: + nick = nick_search.group(1) + else: + nick = None + if nick and not channel.startswith("#"): + channel = nick + try: + message_body = line[line.index(" :") + 2:] + except (IndexError, ValueError): + message_body = "" + if message_body: + if message_body.startswith("!rollcall") or message_body.startswith("!help"): + self.send(channel, helptext) + continue + elif message_body.startswith("!chatterbot"): + arguments = message_body.strip().lower()[11:] + if not arguments: + self.send(channel, helptext) + continue + arguments = arguments.split() + for command, callback in self.commands: + if command not in arguments: + continue + self.send(channel, callback(channel, arguments[1:])) + else: + if channel in ("#tildetown", "#bots"): + continue + # i have not figured out this part yet + + # channel_time = self.check_time(channel) + # now = time() + # count = self.counter(channel) + # delta = now - channel_time + # if delta > timeout: + # if count < messages_within_timeout: + # self.send("#bots", f"i hear activity in {channel}...") + # self.reset_count(channel) + + # elif and channel_time : + # self.reset_count + # self.set_time(channel, now) if __name__ == "__main__": - bot = IRCBot() - try: - bot.command_loop() - except KeyboardInterrupt: - exit() - + bot = IRCBot() + try: + bot.command_loop() + except KeyboardInterrupt: + exit() + diff --git a/config.json.example b/config.json.example deleted file mode 100644 index 775ad11..0000000 --- a/config.json.example +++ /dev/null @@ -1,8 +0,0 @@ -{ - "nick": "chatterbot", - "realname": "a bot by ~nebula", - "channels": ["#bots"], - "times": {}, - "counts": {} -} - \ No newline at end of file