mysterious_cube/main.py

459 lines
14 KiB
Python

# {
# 'speciesCode': 'dowwoo',
# 'comName': 'Downy Woodpecker',
# 'sciName': 'Dryobates pubescens',
# 'locId': 'L36986367',
# 'locName': 'Home',
# 'obsDt': '2024-12-14 17:06',
# 'howMany': 1,
# 'lat': 36.0006572,
# 'lng': -95.0865753,
# 'obsValid': True,
# 'obsReviewed': False,
# 'locationPrivate': True,
# 'subId': 'S205413945'
# }
from random import choice
import requests
from time import sleep
import socket
import re
from json import dump, load, dumps
try:
with open("config.json", "r") as f:
config = load(f)
except FileNotFoundError:
exit("Please create config.json with api key(s) and channels")
host = "localhost"
port = 6667
nick = config["nick"]
realname = "a bot by ~nebula"
helptext = "!birds, !trivia, !trscores, !aitrivia, !aiscores. contact ~nebula for help, feedback or problem reports. https://git.tilde.town/nebula/mysterious_cube"
channels = config["channels"]
channel_re = re.compile(r"PRIVMSG (#*\w+)")
name_re = re.compile(r"^:([^!]*)!")
llm_answer_re = re.compile(r"^(true|false)")
llama_url = "https://llama.mcopp.com/v1/chat/completions"
llama_headers = {
"Content-Type": "application/json",
"Authorization": config["llama_key"]
}
geonames_url = "http://api.geonames.org/searchJSON"
geonames_user = config["geonames_user"]
ebird_url = "https://api.ebird.org/v2/data/obs/geo/recent"
ebird_key = config["ebird_key"]
google_url = "https://customsearch.googleapis.com/customsearch/v1"
google_key = config["google_key"]
google_cx = config["google_cx"]
trivia_questions_file = "trivia.questions"
trivia_state_file = "trivia.state"
trivia_score_file = "trivia.scores"
trivia_unselected_file = "trivia.unselected"
ai_state_file = "trivia.aistate"
ai_score_file = "trivia.aiscores"
bird_url_file = "brids.urls"
try:
with open(trivia_questions_file, "r") as f:
trivia_questions = load(f)
except FileNotFoundError:
trivia_questions = []
try:
with open(trivia_state_file, "r") as f:
trivia_state = load(f)
except FileNotFoundError:
trivia_state = {}
try:
with open(trivia_score_file, "r") as f:
trivia_scores = load(f)
except FileNotFoundError:
trivia_scores = {}
try:
with open(trivia_unselected_file, "r") as f:
trivia_unselected = load(f)
except FileNotFoundError:
trivia_unselected = []
try:
with open(ai_state_file, "r") as f:
ai_state = load(f)
except FileNotFoundError:
ai_state = {}
try:
with open(ai_score_file, "r") as f:
ai_scores = load(f)
except FileNotFoundError:
ai_scores = {}
try:
with open(bird_url_file, "r") as f:
bird_urls = load(f)
except FileNotFoundError:
bird_urls = {}
def write_state():
with open(trivia_state_file, "w") as f:
dump(trivia_state, f)
with open(trivia_score_file, "w") as f:
dump(trivia_scores, f)
with open(trivia_unselected_file, "w") as f:
dump(trivia_unselected, f)
with open(ai_score_file, "w") as f:
dump(ai_scores, f)
with open(ai_state_file, "w") as f:
dump(ai_state, f)
with open(bird_url_file, "w") as f:
dump(bird_urls, f)
def get_location(query):
params = {
"username": geonames_user,
"q": query,
"maxRows": 1
}
try:
response = requests.get(geonames_url, params=params)
data = response.json()["geonames"][0]
if "lat" not in data:
return None
else:
return data
except IndexError:
return None
def cache_bird_url(sciName):
global bird_urls
if sciName in bird_urls.keys():
return bird_urls[sciName]
search = requests.get(google_url, params={
"q": sciName,
"key": google_key,
"cx": google_cx
})
data = search.json()
try:
link = data["items"][0]["link"]
bird_urls[sciName] = link
write_state()
return link
except (IndexError, KeyError):
return None
def get_birds(location):
params = {
"key": ebird_key,
"dist": 50,
"lat": location["lat"],
"lng": location["lng"]
}
request = requests.get(ebird_url, params=params)
data = request.json()[:4]
line = f"Location: {location['name']}; "
for sighting in data:
url = cache_bird_url(sighting['sciName']).rstrip("/id")
line += f"{sighting['comName']} [ {url} ]; "
return line.rstrip("; ")
def post_birds(channel, username, arguments):
if not arguments:
return "Posts recently sighted birds. Give a location name with this command, eg !birds Dodge City, KS"
location = get_location(arguments)
if not location:
return f"No data found for {arguments}"
birds = get_birds(location)
if not birds:
return f"No data found for {arguments}"
return birds
def get_question(ai_enabled=False):
global trivia_questions
global trivia_unselected
if trivia_questions:
if not trivia_unselected:
trivia_unselected = trivia_questions.copy()
question = choice(trivia_unselected)
trivia_unselected.remove(question)
question.append(ai_enabled)
# print(len(unselected))
return question
else:
return False
def post_question(channel, username, arguments):
global trivia_state
question = get_question()
if question:
trivia_state[channel] = question
write_state()
return f"Answer 'true' or 'false': {question[0]}"
else:
return "internal error"
def post_ai_question(channel, username, arguments):
global ai_state
question = get_question(ai_enabled=True)
if question:
ai_state[channel] = question
write_state()
return f"Will AI answer this true/false statement 'right' or 'wrong': {question[0]}"
else:
return "internal error"
def ai_answer(choice, channel, name):
global ai_state
if channel not in ai_state.keys():
return None
question_text , answer, ai_enabled = ai_state[channel]
user_correct = False
try:
llm_response = llama_response(question_text)
llm_answer = llm_answer_re.search(llm_response.lower())
except Exception as e:
print(e)
return "internal error"
del ai_state[channel]
write_state()
if llm_answer:
llm_answer = llm_answer.group(1)
if llm_answer.lower() == answer:
line = "The AI was (at least kind of) right! "
user_correct = choice == "right"
else:
line = "The AI was wrong! "
user_correct = choice == "wrong"
else:
return [
f"Cannot automatically determine if AI is right or wrong.",
f"AI Response: {llm_response}",
f"The right answer is {answer}!"
]
# print(f"{answer}; {choice}; {user_correct}")
if name:
if name not in ai_scores.keys():
ai_scores[name] = 0
if user_correct:
ai_scores[name] += 1
write_state()
return [
f"AI response: {llm_response}",
line + f"{name} scores 1 AI point! Total AI score for {name}: {ai_scores[name]}pts. See top AI scores with !aiscores"
]
else:
ai_scores[name] -= 1
write_state()
return [
f"AI response: {llm_response}",
line + f"{name} loses 1 AI point! Total AI score for {name}: {ai_scores[name]}pts. See top AI scores with !aiscores"
]
return [
f"AI response: {llm_response}",
f"The right answer is {answer}!"
]
def answer(choice, channel, name):
global trivia_state
if channel not in trivia_state.keys():
return None
_, answer, ai_enabled = trivia_state[channel]
del trivia_state[channel]
write_state()
line = f"The answer is {answer}!"
if not ai_enabled and name:
if name not in trivia_scores.keys():
trivia_scores[name] = 0
if choice == answer:
trivia_scores[name] += 1
line += f" {name} scores 1 point! Total score for {name}: {trivia_scores[name]}pts."
else:
trivia_scores[name] -= 1
line += f" {name} loses 1 point! Total score for {name}: {trivia_scores[name]}pts."
write_state()
line += " See top scores with !trscores"
return line
def post_top_scores(channel, name, arguments):
global trivia_scores
score_list = [(name, score) for name, score in trivia_scores.items()]
if not score_list:
return "No current scores."
sorted_scores = sorted(score_list, key=lambda x: x[1], reverse=True)
line = "Top scores: "
count = 1
for name, score in sorted_scores:
if count > 10:
break
line += f"[{count}. {make_no_ping_username(name)}: {score}pts], "
count += 1
return line[:-2]
def post_top_ai_scores(channel, name, arguments):
global ai_scores
score_list = [(name, score) for name, score in ai_scores.items()]
if not score_list:
return "No current AI scores."
sorted_scores = sorted(score_list, key=lambda x: x[1], reverse=True)
line = "Top AI scores: "
count = 1
for name, score in sorted_scores:
if count > 10:
break
line += f"[{count}. {make_no_ping_username(name)}: {score}pts], "
count += 1
return line[:-2]
def post_help(channel, name, arguments):
return helptext
def answer_true(channel, name, arguments):
return answer("true", channel, name)
def answer_false(channel, name, arguments):
return answer("false", channel, name)
def answer_right(channel, name, arguments):
return ai_answer("right", channel, name)
def answer_wrong(channel, name, arguments):
return ai_answer("wrong", channel, name)
def make_no_ping_username(name):
return name[0] + "\u200b" + name[1:]
def llama_response(question):
content = {
"n_predict": 64,
"temperature": 0.6,
"min_p": 0.05,
"messages": [
{
"role": "system",
"content": "You are an entertaining bot in an IRC server. Your responses are brief."
},
{
"role": "user",
"content": f"{question} True, or false? Briefly explain why."
}
]
}
r = requests.post(llama_url, headers=llama_headers, data=dumps(content))
response = r.json()
return response["choices"][0]["message"]["content"]
class IRCBot():
def __init__(self, nick, realname, helptext, commands, searchers, channels):
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((host, port))
self.nick = nick
self.realname = realname
self.helptext = helptext
self.commands = commands
self.searchers = searchers
self.channels = channels
self.sendline(f"NICK {self.nick}")
self.sendline(f"USER {self.nick} 0 * :{self.realname}")
for channel in self.channels:
self.sendline(f"JOIN {channel}")
def sendline(self, line):
if line:
return self.s.send(bytes(f"{line}\r\n", "UTF-8"))
return None
def send(self, channel, content):
if isinstance(content, list):
for line in content:
self.sendline(f"PRIVMSG {channel} :{line}")
sleep(0.5)
elif isinstance(content, str):
self.sendline(f"PRIVMSG {channel} :{content}")
def command_loop(self):
while True:
char = self.s.recv(1)
if not char:
exit(f"{self.nick}: no response from IRC server")
line = b""
while char != b"\n":
if char != b"\r":
line += char
char = self.s.recv(1)
line = line.decode("UTF-8").strip()
if line.startswith("PING"):
pong = "PONG " + line[5:]
self.sendline(pong)
continue
channel_search = channel_re.search(line)
if not channel_search:
continue
channel = channel_search.group(1)
name_search = name_re.search(line)
if name_search:
name = name_search.group(1)
else:
name = None
if name and not channel.startswith("#"):
channel = name
try:
message_body = line[line.index(" :") + 2:]
except (IndexError, ValueError):
message_body = ""
if message_body:
for callback in self.searchers:
result = callback(message_body)
if result:
self.send(channel, result)
for command, callback in self.commands:
if message_body.lower().startswith(command):
try:
arguments = line[line.index(command) + len(command) + 1:]
except (IndexError, ValueError):
arguments = None
result = callback(channel, name, arguments)
if result:
self.send(channel, result)
def run():
bot = IRCBot(
nick,
realname,
helptext,
[ # endswith commands
("!help", post_help),
("!rollcall", post_help),
("!birds", post_birds),
("!trivia", post_question),
("!aitrivia", post_ai_question),
("!trscores", post_top_scores),
("!aiscores", post_top_ai_scores),
("true", answer_true),
("false", answer_false),
("right", answer_right),
("wrong", answer_wrong)
],
[ # message searchers
# empty
],
channels
)
while True:
sleep(0.5)
bot.command_loop()
# if __name__ == "__main__":
# run()