juju/plugins/tilderadio.py

242 lines
7.2 KiB
Python
Raw Normal View History

from pinhook import plugin as p
import random
import json
import requests
import typing
import datetime as _datetime
import dateutil.relativedelta
import enum
ISO8601_FORMAT_DT = "%Y-%m-%dT%H:%M:%S"
ISO8601_FORMAT_TZ = "%z"
TIME_HUMAN = "%H:%M:%S"
DATE_HUMAN = "%Y-%m-%d"
class TimeSpec(enum.Enum):
NORMAL = 1
MILLISECOND = 2
TIME_SECOND = 1
TIME_MINUTE = TIME_SECOND*60
TIME_HOUR = TIME_MINUTE*60
TIME_DAY = TIME_HOUR*24
TIME_WEEK = TIME_DAY*7
SECONDS_MINUTES = 60
SECONDS_HOURS = SECONDS_MINUTES*60
SECONDS_DAYS = SECONDS_HOURS*24
SECONDS_WEEKS = SECONDS_DAYS*7
UNIT_MINIMUM = 6
UNIT_SECOND = 5
UNIT_MINUTE = 4
UNIT_HOUR = 3
UNIT_DAY = 2
UNIT_WEEK = 1
UNIT_MONTH = 1
UNIT_YEAR = 1
def utcnow() -> _datetime.datetime:
return _datetime.datetime.utcnow().replace(tzinfo=_datetime.timezone.utc)
def timestamp(seconds: float) -> _datetime.datetime:
return _datetime.datetime.fromtimestamp(seconds).replace(
tzinfo=_datetime.timezone.utc)
def seconds_since(dt: _datetime.datetime) -> float:
return (utcnow()-dt).total_seconds()
class RelativeDirection(enum.Enum):
FORWARD = 1
BACKWARD = 2
LISTEN_URL = "https://tilderadio.org/listen"
SCHEDULE_URL = "https://tilderadio.org/schedule/"
SOURCE_URL = "https://tildegit.org/ben/bitbot-modules"
AZURACAST_API_BASE = "https://azuracast.tilderadio.org/api"
ICECAST_API_BASE = "https://azuracast.tilderadio.org/radio/8000"
now_playing = ""
dj = ""
song = ""
listeners = 0
is_online = False
def save_nowplaying(jsontxt):
if jsontxt == "":
data = requests.get(AZURACAST_API_BASE + "/nowplaying/1").json()
else:
data = json.loads(jsontxt)
# get the song name directly from icecast
icecast_data = requests.get(ICECAST_API_BASE + "/status-json.xsl").json()
np = icecast_data["icestats"]["source"]
song = np[0]["yp_currently_playing"]
listeners = sum(i["listeners"] for i in np)
# azuracast's now playing info is broken
# https://github.com/AzuraCast/AzuraCast/issues/3142
# song = data["now_playing"]["song"]["text"]
is_online = data["live"]["is_live"]
dj = data["live"]["streamer_name"]
broadcast_start = data["live"]["broadcast_start"]
# listeners = data["listeners"]["current"]
def format_nowplaying():
ret = ""
if is_online:
ret = f"({dj}) "
ret += f"Now playing: {song} /|\ {listeners} listeners"
return ret
def on_load():
save_nowplaying("")
# &np &nowplaying [Show the current song on tilderadio]
@p.command("&np", help_text="Show the current song on tilderadio")
def nowplaying(event):
save_nowplaying("")
event["stdout"].write(format_nowplaying())
# &schedule [Show a link to the tilderadio schedule]
def schedule(event):
event["stdout"].write(f"You can find the schedule here: {SCHEDULE_URL}")
# &un [Show who's up next to stream (schedule)]
@p.command("&un", help_text="Show who's up next to stream (schedule)")
def upnext(event):
js = requests.get(
AZURACAST_API_BASE + "/station/1/schedule"
).json()
if len(js) < 1:
event["stdout"].write("nothing scheduled")
else:
data = js[0]
print(data["start"]);
start = iso8601(data["start"])
now = utcnow()
total_secs = (start - now).total_seconds()
event["stdout"].write(
"{} is up next at {} UTC in {}!".format(
data["name"],
datetime_human(start),
to_pretty_time(total_secs),
)
)
# &unn [Show who's up after the next to stream (schedule)]
def upnextnext(event):
js = utils.http.request(
AZURACAST_API_BASE + "/station/1/schedule"
).json()
if len(js) < 1:
event["stdout"].write("nothing scheduled")
else:
data = js[1]
start = iso8601(data["start"])
now = utils.datetime.utcnow()
total_secs = (start - now).total_seconds()
event["stdout"].write(
"{} is up next next at {} UTC in {}!".format(
data["name"],
datetime_human(start),
to_pretty_time(total_secs),
)
)
# &dj [View who is streaming if any]
def showdj(event):
if dj == "":
message = "No one is currently on the air"
else:
message = f"{dj} is now streaming!"
if broadcast_start:
now = utils.datetime.utcnow().timestamp()
total_seconds = now - broadcast_start
message += " (for {})".format(
to_pretty_time(total_seconds)
)
event["stdout"].write(message)
# Funtions for this to work
def datetime_human(dt: _datetime.datetime, timespec: TimeSpec=TimeSpec.NORMAL):
date = _datetime.datetime.strftime(DATE_HUMAN) # removed dt,
time = _datetime.datetime.strftime(TIME_HUMAN) # removed dt,
if timespec == TimeSpec.MILLISECOND:
time += ".%s" % str(int(dt.microsecond/1000)).zfill(3)
def iso8601(dt: _datetime.datetime, timespec: TimeSpec=TimeSpec.NORMAL
) -> str:
dt_format = dt.strftime(ISO8601_FORMAT_DT)
tz_format = dt.strftime(ISO8601_FORMAT_TZ)
ms_format = ""
if timespec == TimeSpec.MILLISECOND:
ms_format = ".%s" % str(int(dt.microsecond/1000)).zfill(3)
return "%s%s%s" % (dt_format, ms_format, tz_format)
def datetime_human(dt: _datetime.datetime, timespec: TimeSpec=TimeSpec.NORMAL):
date = _datetime.datetime.strftime(DATE_HUMAN) # removed dt,
time = _datetime.datetime.strftime(TIME_HUMAN) # removed dt,
if timespec == TimeSpec.MILLISECOND:
time += ".%s" % str(int(dt.microsecond/1000)).zfill(3)
return "%s %s" % (date, time)
def date_human(dt: _datetime.datetime, timespec: TimeSpec=TimeSpec.NORMAL):
return _datetime.datetime.strftime(DATE_HUMAN) # removed dt,
def to_pretty_time(total_seconds: int, max_units: int=UNIT_MINIMUM,
direction: typing.Optional[RelativeDirection]=None) -> str:
if total_seconds < 1:
return "0s"
if not direction == None:
now = utcnow()
later = now
mod = _datetime.timedelta(seconds=total_seconds)
if direction == RelativeDirection.FORWARD:
later += mod
else:
later -= mod
dts = [later, now]
relative = dateutil.relativedelta.relativedelta(max(dts), min(dts))
years = relative.years
months = relative.months
weeks, days = divmod(relative.days, 7)
hours = relative.hours
minutes = relative.minutes
seconds = relative.seconds
else:
years, months = 0, 0
weeks, days = divmod(total_seconds, SECONDS_WEEKS)
days, hours = divmod(days, SECONDS_DAYS)
hours, minutes = divmod(hours, SECONDS_HOURS)
minutes, seconds = divmod(minutes, SECONDS_MINUTES)
out: typing.List[str] = []
if years and len(out) < max_units:
out.append("%dy" % years)
if months and len(out) < max_units:
out.append("%dmo" % months)
if weeks and len(out) < max_units:
out.append("%dw" % weeks)
if days and len(out) < max_units:
out.append("%dd" % days)
if hours and len(out) < max_units:
out.append("%dh" % hours)
if minutes and len(out) < max_units:
out.append("%dm" % minutes)
if seconds and len(out) < max_units:
out.append("%ds" % seconds)
return " ".join(out)
on_load();