354 lines
11 KiB
Python
354 lines
11 KiB
Python
from random import choice
|
|
import requests
|
|
from time import sleep
|
|
import socket
|
|
import re
|
|
from json import dump, load, dumps
|
|
|
|
try:
|
|
with open("config.json", "r") as f:
|
|
config = load(f)
|
|
except FileNotFoundError:
|
|
exit("Please create config.json with api key(s) and channels")
|
|
|
|
host = "localhost"
|
|
port = 6667
|
|
nick = config["nick"]
|
|
realname = "a bot by ~nebula"
|
|
helptext = "!trivia, !trscores, !aitrivia, !aiscores for trivia game. contact ~nebula for help, feedback or problem reports. https://git.tilde.town/nebula/mysterious_cube"
|
|
channels = config["channels"]
|
|
|
|
channel_re = re.compile(r"PRIVMSG (#*\w+)")
|
|
name_re = re.compile(r"^:([^!]*)!")
|
|
llm_answer_re = re.compile(r"^(true|false)")
|
|
|
|
llama_url = "https://llama.mcopp.com/v1/chat/completions"
|
|
llama_headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": config["llama_key"]
|
|
}
|
|
|
|
trivia_questions_file = "trivia.questions"
|
|
trivia_state_file = "trivia.state"
|
|
trivia_score_file = "trivia.scores"
|
|
trivia_unselected_file = "trivia.unselected"
|
|
ai_state_file = "trivia.aistate"
|
|
ai_score_file = "trivia.aiscores"
|
|
|
|
try:
|
|
with open(trivia_questions_file, "r") as f:
|
|
trivia_questions = load(f)
|
|
except FileNotFoundError:
|
|
trivia_questions = []
|
|
|
|
try:
|
|
with open(trivia_state_file, "r") as f:
|
|
trivia_state = load(f)
|
|
except FileNotFoundError:
|
|
trivia_state = {}
|
|
|
|
try:
|
|
with open(trivia_score_file, "r") as f:
|
|
trivia_scores = load(f)
|
|
except FileNotFoundError:
|
|
trivia_scores = {}
|
|
|
|
try:
|
|
with open(trivia_unselected_file, "r") as f:
|
|
trivia_unselected = load(f)
|
|
except FileNotFoundError:
|
|
trivia_unselected = []
|
|
|
|
try:
|
|
with open(ai_state_file, "r") as f:
|
|
ai_state = load(f)
|
|
except FileNotFoundError:
|
|
ai_state = {}
|
|
|
|
try:
|
|
with open(ai_score_file, "r") as f:
|
|
ai_scores = load(f)
|
|
except FileNotFoundError:
|
|
ai_scores = {}
|
|
|
|
def write_state():
|
|
with open(trivia_state_file, "w") as f:
|
|
dump(trivia_state, f)
|
|
with open(trivia_score_file, "w") as f:
|
|
dump(trivia_scores, f)
|
|
with open(trivia_unselected_file, "w") as f:
|
|
dump(trivia_unselected, f)
|
|
with open(ai_score_file, "w") as f:
|
|
dump(ai_scores, f)
|
|
with open(ai_state_file, "w") as f:
|
|
dump(ai_state, f)
|
|
|
|
def get_question(ai_enabled=False):
|
|
global trivia_questions
|
|
global trivia_unselected
|
|
if trivia_questions:
|
|
if not trivia_unselected:
|
|
trivia_unselected = trivia_questions.copy()
|
|
question = choice(trivia_unselected)
|
|
trivia_unselected.remove(question)
|
|
question.append(ai_enabled)
|
|
# print(len(unselected))
|
|
return question
|
|
else:
|
|
return False
|
|
|
|
def post_question(channel, username):
|
|
global trivia_state
|
|
question = get_question()
|
|
if question:
|
|
trivia_state[channel] = question
|
|
write_state()
|
|
return f"Answer 'true' or 'false': {question[0]}"
|
|
else:
|
|
return "internal error"
|
|
|
|
def post_ai_question(channel, username):
|
|
global ai_state
|
|
question = get_question(ai_enabled=True)
|
|
if question:
|
|
ai_state[channel] = question
|
|
write_state()
|
|
return f"Will AI answer this true/false statement 'right' or 'wrong': {question[0]}"
|
|
else:
|
|
return "internal error"
|
|
|
|
def ai_answer(choice, channel, name):
|
|
global ai_state
|
|
if channel not in ai_state.keys():
|
|
return None
|
|
question_text , answer, ai_enabled = ai_state[channel]
|
|
user_correct = False
|
|
try:
|
|
llm_response = llama_response(question_text)
|
|
llm_answer = llm_answer_re.search(llm_response.lower())
|
|
except Exception as e:
|
|
print(e)
|
|
return "internal error"
|
|
del ai_state[channel]
|
|
write_state()
|
|
if llm_answer:
|
|
llm_answer = llm_answer.group(1)
|
|
if llm_answer.lower() == answer:
|
|
line = "The AI was (at least kind of) right! "
|
|
user_correct = choice == "right"
|
|
else:
|
|
line = "The AI was wrong! "
|
|
user_correct = choice == "wrong"
|
|
else:
|
|
return [
|
|
f"Cannot automatically determine if AI is right or wrong.",
|
|
f"AI Response: {llm_response}",
|
|
f"The right answer is {answer}!"
|
|
]
|
|
# print(f"{answer}; {choice}; {user_correct}")
|
|
if name:
|
|
if name not in ai_scores.keys():
|
|
ai_scores[name] = 0
|
|
if user_correct:
|
|
ai_scores[name] += 1
|
|
write_state()
|
|
return [
|
|
f"AI response: {llm_response}",
|
|
line + f"{name} scores 1 AI point! Total AI score for {name}: {ai_scores[name]}pts. See top AI scores with !aiscores"
|
|
|
|
]
|
|
else:
|
|
ai_scores[name] -= 1
|
|
write_state()
|
|
return [
|
|
f"AI response: {llm_response}",
|
|
line + f"{name} loses 1 AI point! Total AI score for {name}: {ai_scores[name]}pts. See top AI scores with !aiscores"
|
|
]
|
|
return [
|
|
f"AI response: {llm_response}",
|
|
f"The right answer is {answer}!"
|
|
]
|
|
|
|
def answer(choice, channel, name):
|
|
global trivia_state
|
|
if channel not in trivia_state.keys():
|
|
return None
|
|
_, answer, ai_enabled = trivia_state[channel]
|
|
del trivia_state[channel]
|
|
write_state()
|
|
line = f"The answer is {answer}!"
|
|
if not ai_enabled and name:
|
|
if name not in trivia_scores.keys():
|
|
trivia_scores[name] = 0
|
|
if choice == answer:
|
|
trivia_scores[name] += 1
|
|
line += f" {name} scores 1 point! Total score for {name}: {trivia_scores[name]}pts."
|
|
else:
|
|
trivia_scores[name] -= 1
|
|
line += f" {name} loses 1 point! Total score for {name}: {trivia_scores[name]}pts."
|
|
write_state()
|
|
line += " See top scores with !trscores"
|
|
return line
|
|
|
|
def post_top_scores(channel, name):
|
|
global trivia_scores
|
|
score_list = [(name, score) for name, score in trivia_scores.items()]
|
|
if not score_list:
|
|
return "No current scores."
|
|
sorted_scores = sorted(score_list, key=lambda x: x[1], reverse=True)
|
|
line = "Top scores: "
|
|
count = 1
|
|
for name, score in sorted_scores:
|
|
if count > 10:
|
|
break
|
|
line += f"[{count}. {make_no_ping_username(name)}: {score}pts], "
|
|
count += 1
|
|
return line[:-2]
|
|
|
|
def post_top_ai_scores(channel, name):
|
|
global ai_scores
|
|
score_list = [(name, score) for name, score in ai_scores.items()]
|
|
if not score_list:
|
|
return "No current AI scores."
|
|
sorted_scores = sorted(score_list, key=lambda x: x[1], reverse=True)
|
|
line = "Top AI scores: "
|
|
count = 1
|
|
for name, score in sorted_scores:
|
|
if count > 10:
|
|
break
|
|
line += f"[{count}. {make_no_ping_username(name)}: {score}pts], "
|
|
count += 1
|
|
return line[:-2]
|
|
|
|
def post_help(channel, name):
|
|
return helptext
|
|
|
|
def answer_true(channel, name):
|
|
return answer("true", channel, name)
|
|
|
|
def answer_false(channel, name):
|
|
return answer("false", channel, name)
|
|
|
|
def answer_right(channel, name):
|
|
return ai_answer("right", channel, name)
|
|
|
|
def answer_wrong(channel, name):
|
|
return ai_answer("wrong", channel, name)
|
|
|
|
def make_no_ping_username(name):
|
|
return name[0] + "\u200b" + name[1:]
|
|
|
|
def llama_response(question):
|
|
content = {
|
|
"n_predict": 64,
|
|
"temperature": 0.6,
|
|
"min_p": 0.05,
|
|
"messages": [
|
|
{
|
|
"role": "system",
|
|
"content": "You are an entertaining bot in an IRC server. Your responses are brief."
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": f"{question} True, or false? Briefly explain why."
|
|
}
|
|
]
|
|
}
|
|
r = requests.post(llama_url, headers=llama_headers, data=dumps(content))
|
|
response = r.json()
|
|
return response["choices"][0]["message"]["content"]
|
|
|
|
class IRCBot():
|
|
def __init__(self, nick, realname, helptext, commands, searchers, channels):
|
|
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.s.connect((host, port))
|
|
self.nick = nick
|
|
self.realname = realname
|
|
self.helptext = helptext
|
|
self.commands = commands
|
|
self.searchers = searchers
|
|
self.channels = channels
|
|
self.sendline(f"NICK {self.nick}")
|
|
self.sendline(f"USER {self.nick} 0 * :{self.realname}")
|
|
for channel in self.channels:
|
|
self.sendline(f"JOIN {channel}")
|
|
|
|
def sendline(self, line):
|
|
return self.s.send(bytes(f"{line}\r\n", "UTF-8"))
|
|
|
|
def send(self, channel, content):
|
|
if isinstance(content, list):
|
|
for line in content:
|
|
self.sendline(f"PRIVMSG {channel} :{line}")
|
|
sleep(0.5)
|
|
elif isinstance(content, str):
|
|
self.sendline(f"PRIVMSG {channel} :{content}")
|
|
|
|
def ping_pong(self):
|
|
while True:
|
|
sleep(2)
|
|
response = self.s.recv(8192).decode("UTF-8")
|
|
if not response:
|
|
exit(f"{self.nick}: no response from IRC server")
|
|
split = response.split("\r\n")
|
|
for line in split:
|
|
line = line.strip()
|
|
if line.startswith("PING"):
|
|
pong = "PONG " + line[5:]
|
|
self.sendline(pong)
|
|
continue
|
|
channel_search = channel_re.search(line)
|
|
if not channel_search:
|
|
continue
|
|
name_search = name_re.search(line)
|
|
channel = channel_search.group(1)
|
|
if name_search:
|
|
name = name_search.group(1)
|
|
else:
|
|
name = None
|
|
if name and not channel.startswith("#"):
|
|
channel = name
|
|
try:
|
|
message_body = line[line.index(" :") + 2:]
|
|
except Exception as e:
|
|
print(e)
|
|
message_body = ""
|
|
for command, callback in self.commands:
|
|
if line.lower().endswith(command):
|
|
result = callback(channel, name)
|
|
if result:
|
|
self.send(channel, result)
|
|
if message_body:
|
|
for callback in self.searchers:
|
|
callback(message_body)
|
|
|
|
def run():
|
|
bot = IRCBot(
|
|
nick,
|
|
realname,
|
|
helptext,
|
|
[ # endswith commands
|
|
("!help", post_help),
|
|
("!rollcall", post_help)
|
|
("!trivia", post_question),
|
|
("!aitrivia", post_ai_question),
|
|
("!trscores", post_top_scores),
|
|
("!aiscores", post_top_ai_scores),
|
|
("true", answer_true),
|
|
("false", answer_false),
|
|
("right", answer_right),
|
|
("wrong", answer_wrong)
|
|
],
|
|
[ # message searchers
|
|
# empty
|
|
],
|
|
channels
|
|
)
|
|
|
|
while True:
|
|
sleep(2)
|
|
bot.ping_pong()
|
|
|
|
if __name__ == "__main__":
|
|
run()
|