bbj/src/db.py

129 lines
3.3 KiB
Python
Raw Normal View History

2017-03-03 01:10:16 +00:00
from src import formatting
2017-03-01 16:59:33 +00:00
from uuid import uuid1
from src import schema
from time import time
from os import path
import json
PATH = "/home/desvox/bbj/"
if not path.isdir(PATH):
path.os.mkdir(PATH, mode=0o744)
if not path.isdir(path.join(PATH, "threads")):
path.os.mkdir(path.join(PATH, "threads"), mode=0o744)
try:
with open(path.join(PATH, "userdb"), "r") as f:
USERDB = json.loads(f.read())
except FileNotFoundError:
2017-03-03 06:15:13 +00:00
USERDB = dict(namemap=dict())
2017-03-01 16:59:33 +00:00
with open(path.join(PATH, "userdb"), "w") as f:
f.write(json.dumps(USERDB))
path.os.chmod(path.join(PATH, "userdb"), 0o600)
### THREAD MANAGEMENT ###
2017-03-03 01:10:16 +00:00
def thread_index(key="lastmod", markup=True):
2017-03-01 16:59:33 +00:00
result = list()
for ID in path.os.listdir(path.join(PATH, "threads")):
2017-03-03 01:10:16 +00:00
thread = thread_load(ID, markup)
2017-03-01 16:59:33 +00:00
thread.pop("replies")
result.append(thread)
2017-03-01 21:54:56 +00:00
return sorted(result, key=lambda i: i[key], reverse=True)
2017-03-01 16:59:33 +00:00
def thread_create(author, body, title, tags):
ID = uuid1().hex
# make sure None, False, and empty arrays are always repped consistently
tags = tags if tags else []
scheme = schema.thread(ID, author, body, title, tags)
thread_dump(ID, scheme)
return scheme
2017-03-03 01:10:16 +00:00
def thread_load(ID, markup=True):
2017-03-01 16:59:33 +00:00
try:
with open(path.join(PATH, "threads", ID), "r") as f:
2017-03-03 01:10:16 +00:00
thread = json.loads(f.read())
if not markup:
thread["body"] = formatting.cleanse(thread["body"])
for x in range(len(thread["replies"])):
thread["replies"][x]["body"] = formatting.cleanse(
thread["replies"][x]["body"])
return thread
2017-03-01 16:59:33 +00:00
except FileNotFoundError:
return False
def thread_dump(ID, obj):
with open(path.join(PATH, "threads", ID), "w") as f:
f.write(json.dumps(obj))
def thread_reply(ID, author, body):
thread = thread_load(ID)
if not thread:
return schema.error(7, "Requested thread does not exist.")
thread["reply_count"] += 1
thread["lastmod"] = time()
2017-03-03 01:10:16 +00:00
if thread["replies"]:
lastpost = thread["replies"][-1]["post_id"]
else:
lastpost = 1
reply = schema.reply(lastpost + 1, author, body)
2017-03-01 16:59:33 +00:00
thread["replies"].append(reply)
thread_dump(ID, thread)
return reply
### USER MANAGEMENT ###
def user_dbdump(dictionary):
with open(path.join(PATH, "userdb"), "w") as f:
f.write(json.dumps(dictionary))
def user_resolve(name_or_id):
check = USERDB.get(name_or_id)
try:
if check:
return name_or_id
else:
2017-03-03 06:15:13 +00:00
return USERDB["namemap"][name_or_id]
2017-03-01 16:59:33 +00:00
except KeyError:
return False
2017-03-01 17:01:07 +00:00
def user_register(auth_hash, name, quip, bio):
2017-03-03 06:15:13 +00:00
if USERDB["namemap"].get(name):
2017-03-01 16:59:33 +00:00
return schema.error(4, "Username taken.")
ID = uuid1().hex
2017-03-01 17:01:07 +00:00
scheme = schema.user_internal(ID, auth_hash, name, quip, bio, False)
2017-03-01 16:59:33 +00:00
USERDB.update({ID: scheme})
2017-03-03 06:15:13 +00:00
USERDB["namemap"].update({name: ID})
2017-03-01 16:59:33 +00:00
user_dbdump(USERDB)
return scheme
def user_get(ID):
user = USERDB[ID]
return schema.user_external(
2017-03-01 17:01:07 +00:00
ID, user["name"], user["quip"],
2017-03-01 16:59:33 +00:00
user["bio"], user["admin"])
def user_auth(ID, auth_hash):
return auth_hash == USERDB[ID]["auth_hash"]
def user_update(ID, **params):
USERDB[ID].update(params)
return USERDB[ID]