240 lines
7.3 KiB
Python
Executable File
240 lines
7.3 KiB
Python
Executable File
import json
|
|
from glob import glob
|
|
from time import time_ns
|
|
import datetime
|
|
import os
|
|
from pwd import getpwuid
|
|
import sys
|
|
from subprocess import run
|
|
import tempfile
|
|
from math import floor
|
|
from re import compile
|
|
|
|
home = os.path.expanduser("~/.bink")
|
|
our_path = "/town/our/data/.bink"
|
|
filters = []
|
|
filters_path = os.path.expanduser("~/.binkfilters")
|
|
max_body_length = 64_000
|
|
helptext = """see https://git.tilde.town/nebula/bink for details
|
|
--help or -h: show this message
|
|
--pipe or -p: use stdin as post content. ex `echo "hello!" | town bink --pipe`
|
|
--dump or -d: print all posts in a json object
|
|
---dump-before or -db: print all posts before timestamp supplied as an additional argument
|
|
---dump-after or -da: print all posts after timestamp supplied as an additional argument"""
|
|
|
|
try:
|
|
os.mkdir(home)
|
|
except FileExistsError:
|
|
pass
|
|
|
|
try:
|
|
os.mkdir(our_path)
|
|
except FileExistsError:
|
|
pass
|
|
except FileNotFoundError:
|
|
our_path = None
|
|
|
|
try:
|
|
with open(filters_path, "r") as f:
|
|
filters = json.load(f)
|
|
except FileNotFoundError:
|
|
with open(filters_path, "w") as f:
|
|
json.dump(filters, f)
|
|
|
|
try:
|
|
editor = os.environ["EDITOR"]
|
|
except KeyError:
|
|
editor = "nano"
|
|
|
|
def create_post(body):
|
|
with open(f"{home}/{time_ns()}", "w", encoding="UTF-8") as f:
|
|
f.write(body)
|
|
|
|
def glob_posts(path, our=False):
|
|
output = []
|
|
for post_path in glob(path):
|
|
if not os.path.isfile(post_path):
|
|
continue
|
|
elif os.path.islink(post_path):
|
|
continue
|
|
split = post_path.split("/")
|
|
basename = split[-1]
|
|
try:
|
|
output.append((int(basename), "our" if our else split[2], post_path))
|
|
except ValueError: # filename is not just numbers, cannot use int()
|
|
continue
|
|
return output
|
|
|
|
def generate_feed(before=None, after=None, count=200):
|
|
posts = glob_posts("/home/**/.bink/*")
|
|
if our_path:
|
|
posts += glob_posts(f"{our_path}/*", our=True)
|
|
for post in posts.copy():
|
|
if post[1] in filters:
|
|
posts.remove(post)
|
|
if before:
|
|
posts = [post for post in posts if post[0] < before]
|
|
elif after:
|
|
posts = [post for post in posts if post[0] > after]
|
|
posts.sort(key=lambda x: x[0], reverse=True)
|
|
blogs = []
|
|
for time, user, path in posts[:count]:
|
|
try:
|
|
with open(path, "r", encoding="UTF-8") as f:
|
|
body = f.read()
|
|
except PermissionError:
|
|
continue
|
|
obj = {
|
|
"user": user,
|
|
"time": time,
|
|
"body": body[:max_body_length],
|
|
}
|
|
blogs.append(obj)
|
|
return blogs
|
|
|
|
if len(sys.argv) > 1:
|
|
argv1 = sys.argv[1]
|
|
if argv1 in ("--help", "-h"):
|
|
print(helptext)
|
|
exit(0)
|
|
elif argv1 in ("--dump", "-d"):
|
|
with open("/dev/stdout", "w") as f:
|
|
json.dump(generate_feed(), f)
|
|
exit(0)
|
|
elif argv1 in ("--dump-before", "-db"):
|
|
try:
|
|
with open("/dev/stdout", "w") as f:
|
|
json.dump(generate_feed(before=int(sys.argv[2])), f)
|
|
exit(0)
|
|
except ValueError:
|
|
print(f"Argument for {argv1} must be a number of epoch-nanoseconds.")
|
|
exit(1)
|
|
except IndexError:
|
|
print(f"Supply a timestamp to use this method.")
|
|
exit(1)
|
|
elif argv1 in ("--dump-after", "-da"):
|
|
try:
|
|
with open("/dev/stdout", "w") as f:
|
|
json.dump(generate_feed(after=int(sys.argv[2])), f)
|
|
exit(0)
|
|
except ValueError:
|
|
print(f"Argument for {argv1} must be a number of epoch-nanoseconds.")
|
|
exit(1)
|
|
except IndexError:
|
|
print(f"Supply a timestamp to use this method.")
|
|
exit(1)
|
|
elif argv1 in ("--pipe", "-p"):
|
|
try:
|
|
with open("/dev/stdin", "r", encoding="UTF-8") as f:
|
|
body = f.read().strip()
|
|
if body:
|
|
create_post(body)
|
|
exit(0)
|
|
except KeyboardInterrupt:
|
|
exit(0)
|
|
else:
|
|
create_post(" ".join(sys.argv[1:]))
|
|
exit(0)
|
|
|
|
import urwid
|
|
|
|
current_user = getpwuid(os.getuid()).pw_name
|
|
name_re = compile(f"(?<!\\w)[@~]?{current_user}")
|
|
footer = "[c]reate [r]efresh [q]uit | scrolling: arrows, j/k, space, page up/down, ctrl-d/ctrl-u"
|
|
attrmap = [
|
|
("bold", "default,bold", "default"),
|
|
("reverse", "standout", "default"),
|
|
("highlight", "light magenta", "default")
|
|
]
|
|
|
|
class App():
|
|
def __init__(self):
|
|
self.walker = urwid.SimpleFocusListWalker([
|
|
self.post_to_widget(post) for post in generate_feed()
|
|
])
|
|
self.loop = urwid.MainLoop(
|
|
urwid.Frame(
|
|
ActionBox(self.walker),
|
|
footer=urwid.Text(("reverse", footer))
|
|
),
|
|
palette=attrmap,
|
|
handle_mouse=False
|
|
)
|
|
|
|
def update(self, before=None, clear=True):
|
|
if clear:
|
|
self.walker.clear()
|
|
for post in generate_feed(before=before):
|
|
self.walker.append(self.post_to_widget(post))
|
|
|
|
def post_to_widget(self, post):
|
|
body = post["body"]
|
|
post_user = post["user"]
|
|
time_seconds = post["time"] / 1_000_000_000
|
|
stamp = datetime.datetime.fromtimestamp(time_seconds)
|
|
if current_user != post_user:
|
|
search = name_re.finditer(body)
|
|
if search:
|
|
widget_body = []
|
|
index = 0
|
|
for match in search:
|
|
start, end = match.span()
|
|
before = body[index:start]
|
|
highlight = body[start:end]
|
|
widget_body.append(before)
|
|
widget_body.append(("highlight", highlight))
|
|
index = end
|
|
widget_body.append(body[index:])
|
|
body = widget_body
|
|
pile = urwid.Pile([
|
|
urwid.Text([("bold", f"~{post_user}"), " @ ", stamp.strftime("%H:%M (%A, %B %d, %Y)")]),
|
|
urwid.Text(body),
|
|
urwid.Divider()
|
|
])
|
|
return pile
|
|
|
|
def write_with_editor(self):
|
|
self.loop.stop()
|
|
tmp = tempfile.NamedTemporaryFile()
|
|
run([editor, tmp.name])
|
|
with open(tmp.name, "r") as f:
|
|
body = f.read().strip()
|
|
if body:
|
|
create_post(body)
|
|
self.update()
|
|
self.loop.start()
|
|
|
|
class ActionBox(urwid.ListBox):
|
|
def keypress(self, size, key):
|
|
keyl = key.lower()
|
|
if keyl == "c":
|
|
app.write_with_editor()
|
|
elif keyl == "r":
|
|
app.update()
|
|
elif keyl in ("q", "x"):
|
|
raise urwid.ExitMainLoop
|
|
elif keyl == " ":
|
|
super().keypress(size, "page down")
|
|
elif keyl in ("j", "n", "ctrl n"):
|
|
super().keypress(size, "down")
|
|
elif keyl in ("k", "p", "ctrl p"):
|
|
super().keypress(size, "up")
|
|
elif key == "g":
|
|
super().keypress(size, "home")
|
|
elif key == "G":
|
|
super().keypress(size, "end")
|
|
elif key == "ctrl d":
|
|
for i in range(1, floor(size[1] / 2)):
|
|
super().keypress(size, "down")
|
|
elif key == "ctrl u":
|
|
for i in range(1, floor(size[1] / 2)):
|
|
super().keypress(size, "up")
|
|
super().keypress(size, key)
|
|
|
|
app = App()
|
|
try:
|
|
app.loop.run()
|
|
except (KeyboardInterrupt, urwid.ExitMainLoop):
|
|
app.loop.stop()
|
|
exit(0)
|