242 lines
7.2 KiB
Python
242 lines
7.2 KiB
Python
from pinhook import plugin as p
|
|
import random
|
|
import json
|
|
import requests
|
|
import typing
|
|
import datetime as _datetime
|
|
import dateutil.relativedelta
|
|
import enum
|
|
|
|
ISO8601_FORMAT_DT = "%Y-%m-%dT%H:%M:%S"
|
|
ISO8601_FORMAT_TZ = "%z"
|
|
|
|
TIME_HUMAN = "%H:%M:%S"
|
|
DATE_HUMAN = "%Y-%m-%d"
|
|
|
|
class TimeSpec(enum.Enum):
|
|
NORMAL = 1
|
|
MILLISECOND = 2
|
|
|
|
TIME_SECOND = 1
|
|
TIME_MINUTE = TIME_SECOND*60
|
|
TIME_HOUR = TIME_MINUTE*60
|
|
TIME_DAY = TIME_HOUR*24
|
|
TIME_WEEK = TIME_DAY*7
|
|
|
|
SECONDS_MINUTES = 60
|
|
SECONDS_HOURS = SECONDS_MINUTES*60
|
|
SECONDS_DAYS = SECONDS_HOURS*24
|
|
SECONDS_WEEKS = SECONDS_DAYS*7
|
|
|
|
UNIT_MINIMUM = 6
|
|
UNIT_SECOND = 5
|
|
UNIT_MINUTE = 4
|
|
UNIT_HOUR = 3
|
|
UNIT_DAY = 2
|
|
UNIT_WEEK = 1
|
|
UNIT_MONTH = 1
|
|
UNIT_YEAR = 1
|
|
|
|
def utcnow() -> _datetime.datetime:
|
|
return _datetime.datetime.utcnow().replace(tzinfo=_datetime.timezone.utc)
|
|
|
|
def timestamp(seconds: float) -> _datetime.datetime:
|
|
return _datetime.datetime.fromtimestamp(seconds).replace(
|
|
tzinfo=_datetime.timezone.utc)
|
|
|
|
def seconds_since(dt: _datetime.datetime) -> float:
|
|
return (utcnow()-dt).total_seconds()
|
|
|
|
class RelativeDirection(enum.Enum):
|
|
FORWARD = 1
|
|
BACKWARD = 2
|
|
|
|
|
|
|
|
LISTEN_URL = "https://tilderadio.org/listen"
|
|
SCHEDULE_URL = "https://tilderadio.org/schedule/"
|
|
SOURCE_URL = "https://tildegit.org/ben/bitbot-modules"
|
|
AZURACAST_API_BASE = "https://azuracast.tilderadio.org/api"
|
|
ICECAST_API_BASE = "https://azuracast.tilderadio.org/radio/8000"
|
|
|
|
now_playing = ""
|
|
dj = ""
|
|
song = ""
|
|
listeners = 0
|
|
is_online = False
|
|
|
|
def save_nowplaying(jsontxt):
|
|
if jsontxt == "":
|
|
data = requests.get(AZURACAST_API_BASE + "/nowplaying/1").json()
|
|
else:
|
|
data = json.loads(jsontxt)
|
|
|
|
# get the song name directly from icecast
|
|
icecast_data = requests.get(ICECAST_API_BASE + "/status-json.xsl").json()
|
|
np = icecast_data["icestats"]["source"]
|
|
song = np[0]["yp_currently_playing"]
|
|
listeners = sum(i["listeners"] for i in np)
|
|
|
|
# azuracast's now playing info is broken
|
|
# https://github.com/AzuraCast/AzuraCast/issues/3142
|
|
# song = data["now_playing"]["song"]["text"]
|
|
is_online = data["live"]["is_live"]
|
|
dj = data["live"]["streamer_name"]
|
|
broadcast_start = data["live"]["broadcast_start"]
|
|
# listeners = data["listeners"]["current"]
|
|
|
|
def format_nowplaying():
|
|
ret = ""
|
|
if is_online:
|
|
ret = f"({dj}) "
|
|
ret += f"Now playing: {song} /|\ {listeners} listeners"
|
|
return ret
|
|
|
|
def on_load():
|
|
save_nowplaying("")
|
|
|
|
# &np &nowplaying [Show the current song on tilderadio]
|
|
@p.command("&np", help_text="Show the current song on tilderadio")
|
|
def nowplaying(event):
|
|
save_nowplaying("")
|
|
event["stdout"].write(format_nowplaying())
|
|
|
|
# &schedule [Show a link to the tilderadio schedule]
|
|
def schedule(event):
|
|
event["stdout"].write(f"You can find the schedule here: {SCHEDULE_URL}")
|
|
|
|
|
|
# &un [Show who's up next to stream (schedule)]
|
|
@p.command("&un", help_text="Show who's up next to stream (schedule)")
|
|
def upnext(event):
|
|
js = requests.get(
|
|
AZURACAST_API_BASE + "/station/1/schedule"
|
|
).json()
|
|
if len(js) < 1:
|
|
event["stdout"].write("nothing scheduled")
|
|
else:
|
|
data = js[0]
|
|
print(data["start"]);
|
|
start = iso8601(data["start"])
|
|
now = utcnow()
|
|
total_secs = (start - now).total_seconds()
|
|
event["stdout"].write(
|
|
"{} is up next at {} UTC in {}!".format(
|
|
data["name"],
|
|
datetime_human(start),
|
|
to_pretty_time(total_secs),
|
|
)
|
|
)
|
|
|
|
# &unn [Show who's up after the next to stream (schedule)]
|
|
def upnextnext(event):
|
|
js = utils.http.request(
|
|
AZURACAST_API_BASE + "/station/1/schedule"
|
|
).json()
|
|
if len(js) < 1:
|
|
event["stdout"].write("nothing scheduled")
|
|
else:
|
|
data = js[1]
|
|
start = iso8601(data["start"])
|
|
now = utils.datetime.utcnow()
|
|
total_secs = (start - now).total_seconds()
|
|
event["stdout"].write(
|
|
"{} is up next next at {} UTC in {}!".format(
|
|
data["name"],
|
|
datetime_human(start),
|
|
to_pretty_time(total_secs),
|
|
)
|
|
)
|
|
|
|
# &dj [View who is streaming if any]
|
|
def showdj(event):
|
|
if dj == "":
|
|
message = "No one is currently on the air"
|
|
else:
|
|
message = f"{dj} is now streaming!"
|
|
if broadcast_start:
|
|
now = utils.datetime.utcnow().timestamp()
|
|
total_seconds = now - broadcast_start
|
|
message += " (for {})".format(
|
|
to_pretty_time(total_seconds)
|
|
)
|
|
event["stdout"].write(message)
|
|
|
|
|
|
# Funtions for this to work
|
|
|
|
def datetime_human(dt: _datetime.datetime, timespec: TimeSpec=TimeSpec.NORMAL):
|
|
date = _datetime.datetime.strftime(DATE_HUMAN) # removed dt,
|
|
time = _datetime.datetime.strftime(TIME_HUMAN) # removed dt,
|
|
if timespec == TimeSpec.MILLISECOND:
|
|
time += ".%s" % str(int(dt.microsecond/1000)).zfill(3)
|
|
|
|
def iso8601(dt: _datetime.datetime, timespec: TimeSpec=TimeSpec.NORMAL
|
|
) -> str:
|
|
dt_format = dt.strftime(ISO8601_FORMAT_DT)
|
|
tz_format = dt.strftime(ISO8601_FORMAT_TZ)
|
|
|
|
ms_format = ""
|
|
if timespec == TimeSpec.MILLISECOND:
|
|
ms_format = ".%s" % str(int(dt.microsecond/1000)).zfill(3)
|
|
|
|
return "%s%s%s" % (dt_format, ms_format, tz_format)
|
|
|
|
def datetime_human(dt: _datetime.datetime, timespec: TimeSpec=TimeSpec.NORMAL):
|
|
date = _datetime.datetime.strftime(DATE_HUMAN) # removed dt,
|
|
time = _datetime.datetime.strftime(TIME_HUMAN) # removed dt,
|
|
if timespec == TimeSpec.MILLISECOND:
|
|
time += ".%s" % str(int(dt.microsecond/1000)).zfill(3)
|
|
return "%s %s" % (date, time)
|
|
def date_human(dt: _datetime.datetime, timespec: TimeSpec=TimeSpec.NORMAL):
|
|
return _datetime.datetime.strftime(DATE_HUMAN) # removed dt,
|
|
|
|
def to_pretty_time(total_seconds: int, max_units: int=UNIT_MINIMUM,
|
|
direction: typing.Optional[RelativeDirection]=None) -> str:
|
|
if total_seconds < 1:
|
|
return "0s"
|
|
|
|
if not direction == None:
|
|
now = utcnow()
|
|
later = now
|
|
mod = _datetime.timedelta(seconds=total_seconds)
|
|
if direction == RelativeDirection.FORWARD:
|
|
later += mod
|
|
else:
|
|
later -= mod
|
|
|
|
dts = [later, now]
|
|
relative = dateutil.relativedelta.relativedelta(max(dts), min(dts))
|
|
years = relative.years
|
|
months = relative.months
|
|
weeks, days = divmod(relative.days, 7)
|
|
hours = relative.hours
|
|
minutes = relative.minutes
|
|
seconds = relative.seconds
|
|
else:
|
|
years, months = 0, 0
|
|
weeks, days = divmod(total_seconds, SECONDS_WEEKS)
|
|
days, hours = divmod(days, SECONDS_DAYS)
|
|
hours, minutes = divmod(hours, SECONDS_HOURS)
|
|
minutes, seconds = divmod(minutes, SECONDS_MINUTES)
|
|
|
|
out: typing.List[str] = []
|
|
if years and len(out) < max_units:
|
|
out.append("%dy" % years)
|
|
if months and len(out) < max_units:
|
|
out.append("%dmo" % months)
|
|
if weeks and len(out) < max_units:
|
|
out.append("%dw" % weeks)
|
|
if days and len(out) < max_units:
|
|
out.append("%dd" % days)
|
|
if hours and len(out) < max_units:
|
|
out.append("%dh" % hours)
|
|
if minutes and len(out) < max_units:
|
|
out.append("%dm" % minutes)
|
|
if seconds and len(out) < max_units:
|
|
out.append("%ds" % seconds)
|
|
|
|
return " ".join(out)
|
|
|
|
on_load();
|