Compare commits

...

14 Commits

Author SHA1 Message Date
magical 315a249ef6 check for empty post before asking for confirmation
before this change, BBJ would give the big "You are posting
anonymously!" warning even if the post body was empty (say, because you
accidentally started a post and closed the editor immediately). only
after confirming that you _really_ wanted to post anonymously would it
notice that, hey, the post body is empty and discard the post.

it makes more sense to do the checks the other way around.
2022-08-08 01:51:37 +00:00
magical 92bdd7b660 improve wipe_screen 2022-08-07 01:39:18 +00:00
magical 77d38011b5 quote file path in shell command 2022-08-07 01:25:14 +00:00
magical 4364c16625 set env in subprocess a little more cleanly 2022-08-07 01:25:14 +00:00
magical deb8d3ba4b remove unnecessary shell=True's 2022-08-07 01:25:14 +00:00
magical f63233dbb7 don't use network.request 2022-08-07 01:25:14 +00:00
magical ab42b4a27d use network.request 2022-08-07 01:25:14 +00:00
magical 8ae384b1cd make sane_value validation a little more general 2022-08-06 06:42:20 +00:00
magical 5e7168c848 rewrite some tail call loops as normal loops 2022-08-06 06:20:16 +00:00
magical 2e54939345 remove global network value 2022-08-06 06:01:25 +00:00
magical ceb4937c60 remove relog parameter 2022-08-06 05:40:25 +00:00
magical 3e4f3de2a8 pass the initial username and password to log_in explicitly 2022-08-06 05:20:39 +00:00
magical b1d1de659d import exit
The builtin exit function is added by the site package "for the
interactive interpreter" and "should not be used in programs", according
to the manual.

We want sys.exit instead. (I didn't know you could pass it a string
value, but it turns out you can.)
2022-08-06 05:10:14 +00:00
magical df9d419919 move some global code into main 2022-08-06 05:09:57 +00:00
1 changed files with 159 additions and 146 deletions

View File

@ -29,27 +29,18 @@ from getpass import getpass
from subprocess import call
from random import choice
from code import interact
from sys import exit
import rlcompleter
import readline
import tempfile
import shlex
import urwid
import json
import os
import re
# XxX_N0_4rgP4rs3_XxX ###yoloswag
def get_arg(key, default=None, get_value=True):
try:
spec = argv.index("--" + key)
value = argv[spec + 1] if get_value else True
except ValueError: # --key not specified
value = default
except IndexError: # flag given but no value
exit("invalid format for --" + key)
return value
if get_arg("help", False, False):
print("""BBJ Urwid Client
help_text = """\
BBJ Urwid Client
Available options:
--help: this message
--https: enable use of https, requires host support
@ -62,16 +53,7 @@ Available environment variables:
BBJ_PASSWORD: set your password to log in automatically.
if the password is wrong, will prompt you as normal.
Please note that these environment variables need to be exported, and are
visible to all other programs run from your shell.""")
exit()
try:
network = BBJ(get_arg("host", "127.0.0.1"),
get_arg("port", 7099),
get_arg("https", False, False))
except URLError as e:
# print the connection error in red
exit("\033[0;31m%s\033[0m" % repr(e))
visible to all other programs run from your shell."""
obnoxious_logo = """
% _ * ! *
@ -328,7 +310,8 @@ markpath = os.path.join(os.getenv("HOME"), ".bbjmarks")
pinpath = os.path.join(os.getenv("HOME"), ".bbjpins")
class App(object):
def __init__(self):
def __init__(self, network):
self.network = network
self.prefs = bbjrc("load")
self.client_pinned_threads = load_client_pins()
self.usermap = {}
@ -387,7 +370,7 @@ class App(object):
bar formatting to it.
"""
header = ("{}@bbj | " + text).format(
(network.user_name or "anonymous"),
(self.network.user_name or "anonymous"),
*format_specs
)
self.loop.widget.header = urwid.AttrMap(urwid.Text(header), "bar")
@ -593,7 +576,7 @@ class App(object):
# first we need to get the server's version of the message
# instead of our formatted one
try:
message = network.edit_query(thread_id, post_id)
message = self.network.edit_query(thread_id, post_id)
except UserWarning as e:
self.remove_overlays()
return self.temp_footer_message(e.description)
@ -616,7 +599,7 @@ class App(object):
urwid.Text(("bold", "Delete this %s?" % ("whole thread" if op else "post"))),
urwid.Divider(),
cute_button(("10" , ">> Yes"), lambda _: [
network.message_delete(message["thread_id"], message["post_id"]),
self.network.message_delete(message["thread_id"], message["post_id"]),
self.remove_overlays(),
self.index() if op else self.refresh()
]),
@ -638,7 +621,7 @@ class App(object):
def toggle_formatting(self, button, message):
self.remove_overlays()
raw = not message["send_raw"]
network.set_post_raw(message["thread_id"], message["post_id"], raw)
self.network.set_post_raw(message["thread_id"], message["post_id"], raw)
return self.refresh()
@ -655,7 +638,7 @@ class App(object):
"View %sQuote" % ("a " if len(quotes) != 1 else ""),
self.quote_view_menu, quotes))
if network.can_edit(message["thread_id"], message["post_id"]) \
if self.network.can_edit(message["thread_id"], message["post_id"]) \
and not self.window_split:
if message["post_id"] == 0:
@ -668,7 +651,7 @@ class App(object):
"Enable Formatting" if raw else "Disable Formatting",
self.toggle_formatting, message))
buttons.insert(0, urwid.Button("Edit Post", self.edit_post, message))
if network.user["is_admin"]:
if self.network.user["is_admin"]:
buttons.insert(0, urwid.Text(("20", "Reminder: You're an admin!")))
if not buttons:
@ -821,7 +804,7 @@ class App(object):
# narrowed selection of content, so we dont want to resume last_index_pos
self.last_index_pos = False
else:
threads, usermap = network.thread_index()
threads, usermap = self.network.thread_index()
self.usermap.update(usermap)
self.walker.clear()
@ -867,7 +850,7 @@ class App(object):
self.body.attr_map = {None: "default"}
self.mode = "thread"
thread, usermap = network.thread_load(thread_id, format="sequential")
thread, usermap = self.network.thread_load(thread_id, format="sequential")
self.usermap.update(usermap)
self.thread = thread
self.match_data["matches"].clear()
@ -888,16 +871,16 @@ class App(object):
def toggle_server_pin(self):
if self.mode != "index" or not network.user["is_admin"]:
if self.mode != "index" or not self.network.user["is_admin"]:
return
thread = self.walker.get_focus()[0].thread
network.thread_set_pin(thread["thread_id"], not thread["pinned"])
self.network.thread_set_pin(thread["thread_id"], not thread["pinned"])
self.index()
def search_index_callback(self, query):
simple_query = query.lower().strip()
threads, usermap = network.thread_index()
threads, usermap = self.network.thread_index()
self.usermap.update(usermap)
results = [
thread for thread in threads
@ -1172,10 +1155,10 @@ class App(object):
"""
self.loop.widget = self.loop.widget[0]
self.loop.stop()
call("clear", shell=True)
call("clear")
print(welcome)
try:
log_in(relog=True)
log_in(self.network)
except (KeyboardInterrupt, InterruptedError):
pass
self.loop.start()
@ -1188,8 +1171,9 @@ class App(object):
Options menu callback to anonymize the user and
then redisplay the options menu.
"""
network.user_name = network.user_auth = None
network.user = network("get_me")["data"]
self.network.user_name = None
self.network.user_auth = None
self.network.user = self.network.request("get_me")["data"]
self.loop.widget = self.loop.widget[0]
self.set_default_header()
self.options_menu()
@ -1231,7 +1215,7 @@ class App(object):
"""
# we can "recycle" the server's formatting abilities to
# use the same syntax for the help text itself
message = network.fake_message(
message = self.network.fake_message(
"\n\n".join(format_help), format="sequential")
widget = OptionsMenu(
@ -1253,7 +1237,7 @@ class App(object):
def set_color(self, button, value, color):
if value == False:
return
network.user_update(color=color)
self.network.user_update(color=color)
def toggle_exit(self, button, value):
@ -1280,10 +1264,10 @@ class App(object):
def change_username(self, *_):
self.loop.stop()
call("clear", shell=True)
call("clear")
try:
name = nameloop("Choose a new username", True)
network.user_update(user_name=name)
name = nameloop(self.network, "Choose a new username")
self.network.user_update(user_name=name)
motherfucking_rainbows("~~hello there %s~~" % name)
sleep(0.8)
self.loop.start()
@ -1296,10 +1280,10 @@ class App(object):
def change_password(self, *_):
self.loop.stop()
call("clear", shell=True)
call("clear")
try:
password = password_loop("Choose a new password. Can be empty", True)
network.user_update(auth_hash=network._hash(password))
password = password_loop("Choose a new password. Can be empty")
self.network.user_update(auth_hash=self.network._hash(password))
motherfucking_rainbows("SET NEW PASSWORD")
sleep(0.8)
self.loop.start()
@ -1395,8 +1379,8 @@ class App(object):
editor_buttons = []
edit_mode = []
if network.user_auth:
account_message = "Logged in as %s." % network.user_name
if self.network.user_auth:
account_message = "Logged in as %s." % self.network.user_name
account_stuff = [
urwid.Button("Relog", on_press=self.relog),
urwid.Button("Go anonymous", on_press=self.unlog),
@ -1413,7 +1397,7 @@ class App(object):
for index, color in enumerate(colornames):
urwid.RadioButton(
user_colors, color.title(),
network.user["color"] == index,
self.network.user["color"] == index,
self.set_color, index)
for item in user_colors:
@ -1618,7 +1602,9 @@ class App(object):
descriptor, path = tempfile.mkstemp()
with open(path, "w") as _:
_.write(init_body)
call("export LANG=en_US.UTF-8; %s %s" % (self.prefs["editor"], path), shell=True)
env = os.environ.copy()
env['LANG'] = 'en_US.UTF-8'
call("%s %s" % (self.prefs["editor"], shlex.quote(path)), env=env, shell=True)
with open(path) as _:
body = _.read()
os.remove(path)
@ -1635,7 +1621,7 @@ class App(object):
return self.footer_prompt("Title", self.compose)
elif title:
try: network.validate("title", title)
try: self.network.validate("title", title)
except AssertionError as e:
return self.footer_prompt(
"Title", self.compose, extra_text=e.description)
@ -1644,24 +1630,19 @@ class App(object):
body = self.overthrow_ext_edit(init_body)
if not body or re.search("^>>[0-9]+$", body):
return self.temp_footer_message("EMPTY POST DISCARDED")
params = {"body": body}
if self.mode == "thread" and not edit:
endpoint = "thread_reply"
params.update({"thread_id": self.thread["thread_id"]})
self.network.thread_reply(self.thread["thread_id"], body)
elif edit:
endpoint = "edit_post"
params.update({
"thread_id": self.thread["thread_id"],
"post_id": edit["post_id"]
})
self.network.edit_message(
thread_id=self.thread["thread_id"],
post_id=edit["post_id"],
body=body)
else:
endpoint = "thread_create"
params.update({"title": title})
self.network.thread_create(title, body)
network.request(endpoint, **params)
self.refresh()
if edit:
self.goto_post(edit["post_id"])
@ -1714,6 +1695,12 @@ class App(object):
self.window_split=True
self.switch_editor()
def repaint_screen(self):
"""
Force urwid to repaint the whole screen.
"""
self.loop.screen.clear()
class MessageBody(urwid.Text):
"""
@ -1770,7 +1757,7 @@ class MessageBody(urwid.Text):
if _c != 0:
color = str(_c)
if user != "anonymous" and user["user_name"] == network.user_name:
if user != "anonymous" and user["user_name"] == app.network.user_name:
display = "[You]"
# bold it
color += "0"
@ -1978,7 +1965,7 @@ class ExternalEditor(urwid.Terminal):
# should use the options menu to switch to Overthrow mode.
env.update({"LANG": "POSIX"})
command = ["bash", "-c", "{} {}; echo Press any key to kill this window...".format(
app.prefs["editor"], self.path)]
app.prefs["editor"], shlex.quote(self.path))]
super(ExternalEditor, self).__init__(command, env, app.loop, app.prefs["edit_escapes"]["abort"])
urwid.connect_signal(self, "closed", self.exterminate)
@ -1987,7 +1974,7 @@ class ExternalEditor(urwid.Terminal):
# app.loop.widget = app.loop.widget[0]
# if not value:
# app.loop.stop()
# call("clear", shell=True)
# call("clear")
# print(welcome)
# log_in(True)
# app.loop.start()
@ -1995,9 +1982,21 @@ class ExternalEditor(urwid.Terminal):
def exterminate(self, *_, anon_confirmed=False):
# close the editor and grab the post body
app.close_editor()
with open(self.path) as _:
body = _.read().strip()
os.remove(self.path)
# make sure its not empty
if not body or re.search("^>>[0-9]+$", body):
app.temp_footer_message("EMPTY POST DISCARDED")
return
# are we anonymous? check if the user wants to log in first
if app.prefs["confirm_anon"] \
and not anon_confirmed \
and network.user["user_name"] == "anonymous":
and app.network.user["user_name"] == "anonymous":
# TODO fixoverlay: urwid terminal widgets have been mucking
# up overlay dialogs since the wee days of bbj, i really
# need to find a real solution instead of dodging the issue
@ -2024,7 +2023,7 @@ class ExternalEditor(urwid.Terminal):
# return
# else:
app.loop.stop()
call("clear", shell=True)
call("clear")
print(anon_warn)
choice = paren_prompt(
"Post anonymously?", default="yes", choices=["Yes", "no"]
@ -2033,27 +2032,20 @@ class ExternalEditor(urwid.Terminal):
log_in(True)
app.loop.start()
app.close_editor()
with open(self.path) as _:
body = _.read().strip()
os.remove(self.path)
# ok; do the post
self.params.update({"body": body})
app.network.request(self.endpoint, **self.params)
if self.endpoint == "edit_post":
app.refresh()
app.goto_post(self.params["post_id"])
if body and not re.search("^>>[0-9]+$", body):
self.params.update({"body": body})
network.request(self.endpoint, **self.params)
if self.endpoint == "edit_post":
app.refresh()
app.goto_post(self.params["post_id"])
elif app.mode == "thread":
app.refresh()
app.goto_post(app.thread["reply_count"])
elif app.mode == "thread":
app.refresh()
app.goto_post(app.thread["reply_count"])
else:
app.last_pos = None
app.index()
else:
app.temp_footer_message("EMPTY POST DISCARDED")
app.last_pos = None
app.index()
def keypress(self, size, key):
@ -2071,7 +2063,7 @@ class ExternalEditor(urwid.Terminal):
if keyl == "ctrl l":
# always do this, and also pass it to the terminal
wipe_screen()
app.repaint_screen()
elif key == app.prefs["edit_escapes"]["abort"]:
self.terminate()
@ -2149,7 +2141,7 @@ class OptionsMenu(urwid.LineBox):
return self.keypress(size, "enter")
elif keyl == "ctrl l":
wipe_screen()
app.repaint_screen()
def mouse_event(self, size, event, button, x, y, focus):
@ -2205,7 +2197,7 @@ class ActionBox(urwid.ListBox):
self.change_focus(size, 0)
elif key == "ctrl l":
wipe_screen()
app.repaint_screen()
elif keyl == "o":
app.options_menu()
@ -2252,13 +2244,13 @@ class ActionBox(urwid.ListBox):
elif key == "~":
# sssssshhhhhhhh
app.loop.stop()
try: call("sl", shell=True)
try: call("sl")
except: pass
app.loop.start()
elif keyl == "$":
app.loop.stop()
call("clear", shell=True)
call("clear")
readline.set_completer(rlcompleter.Completer().complete)
readline.parse_and_bind("tab: complete")
interact(banner="Python " + version + "\nBBJ Interactive Console\nCtrl-D exits.", local=globals())
@ -2306,7 +2298,7 @@ def frilly_exit():
out = " ~~CoMeE BaCkK SooOn~~ 0000000"
motherfucking_rainbows(out.zfill(width))
else:
call("clear", shell=True)
call("clear")
motherfucking_rainbows("Come back soon! <3")
exit()
@ -2385,47 +2377,57 @@ def paren_prompt(text, positive=True, choices=[], function=input, default=None):
return ""
def sane_value(key, prompt, positive=True, return_empty=False):
response = paren_prompt(prompt, positive)
if return_empty and response == "":
return response
try: network.validate(key, response)
except AssertionError as e:
return sane_value(key, e.description, False)
return response
def sane_value(prompt, validate, positive=True, function=input):
"""Prompts for an input with paren_prompt until validate(response)
returns None (or something falsy). Otherwise the returned string is used
as the new prompt.
"""
while 1:
response = paren_prompt(prompt, positive, function=function)
error = validate(response)
if not error:
return response
prompt = str(error) if error != None else ""
positive = False
def password_loop(prompt, positive=True):
response1 = paren_prompt(prompt, positive, function=getpass)
if response1 == "":
confprompt = "Confirm empty password"
else:
confprompt = "Confirm it"
response2 = paren_prompt(confprompt, function=getpass)
if response1 != response2:
return password_loop("Those didnt match. Try again", False)
return response1
while 1:
response1 = paren_prompt(prompt, positive, function=getpass)
if response1 == "":
confprompt = "Confirm empty password"
else:
confprompt = "Confirm it"
response2 = paren_prompt(confprompt, function=getpass)
if response1 != response2:
prompt = "Those didnt match. Try again"
positive = False
else:
return response1
def nameloop(prompt, positive):
name = sane_value("user_name", prompt, positive)
if network.user_is_registered(name):
return nameloop("%s is already registered" % name, False)
return name
def nameloop(network, prompt, positive=True):
def validate_name(name):
try: network.validate("user_name", name)
except AssertionError as e:
return e.description
if network.user_is_registered(name):
return "%s is already registered" % name
return sane_value(prompt, validate_name, positive)
def log_in(relog=False):
def log_in(network, name="", password=""):
"""
Handles login or registration using an oldschool input()
chain. The user is run through this before starting the
Handles login or registration. If name and/or password are not
provided, the user is prompted for them using an oldschool
input() chain. The user is run through this before starting the
curses app.
"""
if relog:
name = sane_value("user_name", "Username", return_empty=True)
else:
name = get_arg("user") \
or os.getenv("BBJ_USER") \
or sane_value("user_name", "Username", return_empty=True)
if not name:
def validate_name(response):
if response != "": # allow empty username
try: network.validate("user_name", response)
except AssertionError as e:
return e.description
name = sane_value("Username", validate_name)
if name == "":
motherfucking_rainbows("~~W3 4R3 4n0nYm0u5~~")
else:
@ -2434,21 +2436,18 @@ def log_in(relog=False):
try:
network.set_credentials(
name,
os.getenv("BBJ_PASSWORD", default="")
if not relog else ""
password if name and password else ""
)
# make it easy for people who use an empty password =)
motherfucking_rainbows("~~welcome back {}~~".format(network.user_name))
except ConnectionRefusedError:
def login_loop(prompt, positive):
def validate_creds(password):
try:
password = paren_prompt(prompt, positive, function=getpass)
network.set_credentials(name, password)
except ConnectionRefusedError:
login_loop("// R E J E C T E D //.", False)
login_loop("Enter your password", True)
return "// R E J E C T E D //."
password = sane_value("Enter your password", validate_creds, function=getpass)
motherfucking_rainbows("~~welcome back {}~~".format(network.user_name))
except ValueError:
@ -2459,7 +2458,7 @@ def log_in(relog=False):
)
if response == "c":
name = nameloop("Pick a new name", True)
name = nameloop(network, "Pick a new name")
elif response == "n":
raise InterruptedError
@ -2562,25 +2561,39 @@ def ignore(*_, **__):
pass
def wipe_screen(*_):
"""
A crude hack to repaint the whole screen. I didnt immediately
see anything to acheive this in the MainLoop methods so this
will do, I suppose.
"""
app.loop.stop()
call("clear", shell=True)
app.loop.start()
# XxX_N0_4rgP4rs3_XxX ###yoloswag
def get_arg(key, default=None, get_value=True):
try:
spec = argv.index("--" + key)
value = argv[spec + 1] if get_value else True
except ValueError: # --key not specified
value = default
except IndexError: # flag given but no value
exit("invalid format for --" + key)
return value
def main():
if get_arg("help", False, False):
print(help_text)
exit()
try:
network = BBJ(get_arg("host", "127.0.0.1"),
get_arg("port", 7099),
get_arg("https", False, False))
except URLError as e:
# print the connection error in red
exit("\033[0;31m%s\033[0m" % repr(e))
global app
app = App()
call("clear", shell=True)
app = App(network)
call("clear")
motherfucking_rainbows(obnoxious_logo)
print(welcome)
try:
log_in()
name = get_arg("user") or os.getenv("BBJ_USER")
password = os.getenv("BBJ_PASSWORD", default="")
log_in(network, name, password)
app.index()
app.loop.run()
except (InterruptedError, KeyboardInterrupt):