scripts/ftg/main.py

226 lines
9.1 KiB
Python

import feedparser
import urllib3
import yaml
import os
from shutil import rmtree
from sys import exit
from time import sleep, strftime
from hashi import Hashi
from formatter import HtmlToFText
class FTG:
def init(self, config):
"""Load the config. Please call this first before other methods."""
self.conf = self.parse_yaml(config)
self.hh = Hashi()
def run(self):
"""Download feeds and generate gophermaps."""
any_change = False
count = 0
all_feeds = []
for f in self.conf["feeds"]:
# Check feed for changes
dir_path = self.conf["home"]["dir"] + "/" + f["permalink"]
feed_path = dir_path + "/" + self.conf["update"]["feed_file"]
hash_path = dir_path + "/" + self.conf["update"]["hash_file"]
self.hh.fetch_url(f["url"], feed_path)
check = self.hh.check_hash(feed_path, hash_path)
# Build a list of feed data to regenerate the home map
f["path"] = feed_path
all_feeds.append(self.parse_rss(f))
if check["changed"]:
print("Getting update ...")
any_change = True
# Put up placeholder home map while downloading feed items
self.gen_home_map([], mode="temp")
self.parse_file_list(all_feeds[count]["items"], dir_path)
# Cache feed hash
with open(hash_path, "w") as fh:
fh.write(check["new"])
# Regenerate the map
self.gen_feed_map(all_feeds[count])
else:
print("Feed is up-to-date.")
count += 1
sleep(self.conf["update"]["sleep"])
# If any of the feeds have changed, regenerate the home map
# to ensure the permalinks to feed maps are current
if any_change:
self.gen_home_map(all_feeds)
def parse_yaml(self, yml):
"""Open a YAML file and return a dictionary of values."""
try:
fh = open(yml, "r")
data = yaml.safe_load(fh)
fh.close()
except:
print("Error: could not load config.")
exit(1)
return data
def parse_rss(self, feed):
"""Given a dictionary with a feed url, title, permalink and feed file
path, parse the url and return a feed data dictionary."""
if ("url" not in feed) or (feed["url"] == None) or \
("permalink" not in feed) or (feed["permalink"] == None):
print("Error: missing/empty field. Please check config.")
exit(1)
try:
print("Parsing " + feed["permalink"] + " ...")
resp = feedparser.parse(feed["path"])
except:
print("Error: could not parse (" + feed["url"] + ")")
exit(1)
# Insert custom fields
resp["url"] = feed["url"]
resp["permalink"] = feed["permalink"]
if ("title" in feed) and (feed["title"] != None):
resp["display_title"] = feed["title"]
else:
resp["display_title"] = resp["channel"]["title"]
return resp
def check_filetype(self, url):
"""Given a resource url, return a dictionary containing the gopher
filetype and file extension."""
meta = {}
meta["ext"] = url.rsplit(".", 1)[1]
domain = url.rsplit(".", 1)[0]
if meta["ext"] == "gif":
meta["type"] = "g"
elif (meta["ext"] == "png") or (meta["ext"].lower() == "jpg") or \
(meta["ext"].lower() == "jpeg"):
meta["type"] = "I"
elif meta["ext"] == "pdf":
meta["type"] = "d"
else:
meta = {"type": "0", "ext": "txt"}
# Return empty meta if site matches keywords list
for kw in self.conf["update"]["skip_cache"]:
if (kw in domain) or (kw in meta["ext"]):
meta = {}
break
return meta
def get_file(self, url, ext, path):
"""Save a link to file given the url, extension and file path."""
# Initialise urllib and include user-agent with request
hdrs = {"user-agent": self.conf["update"]["user_agent"]}
http = urllib3.PoolManager(headers=hdrs)
# Disable ssl warnings
urllib3.disable_warnings()
resp = http.request("GET", url)
fmt = HtmlToFText()
if ext == "txt":
try:
txt = fmt.convert(resp.data.decode("utf-8"))
with open(path, "w", encoding="utf-8") as fh:
fh.write(txt)
except UnicodeDecodeError:
# Attempt to work around "codec can't decode byte" error
# if certain this is a txt/html file
txt = fmt.convert(resp.data.decode("ISO-8859-1"))
with open(path, "w", encoding="ISO-8859-1") as fh:
fh.write(txt)
else:
try:
with open(path, "wb") as fh:
fh.write(resp.data)
except:
with open(path, "w") as fh:
fh.write("An error occurred while saving the file." + \
"Please notify the administrator.")
def parse_file_list(self, file_list, path):
"""Given a list of file urls and target directory path, save the links
as files to the path."""
count = 0
# Make sure path exists
os.makedirs(path, exist_ok=True)
for i in file_list:
count += 1
file_meta = self.check_filetype(i["link"])
if "ext" in file_meta:
print("Downloading item (" + str(count) + "/" + \
str(len(file_list)) + ") ...")
file_path = path + "/" + str(count) + "." + \
file_meta["ext"]
self.get_file(i["link"], file_meta["ext"], file_path)
sleep(self.conf["update"]["sleep"])
def clear_cache(self, path, *args, **kwargs):
"""Given a directory path and removal mode, remove the selections.
Modes: dirs, files, all"""
mode = kwargs.get("mode", "")
if (mode == "dirs") or (mode == ""):
for rt, dirs, files in os.walk(path):
for d in dirs:
rmtree(path + "/" + d)
elif (mode == "files") or (mode == ""):
for rt, dirs, files in os.walk(path):
for f in files:
os.remove(path + "/" + f)
def gen_home_map(self, feed_data, *args, **kwargs):
"""Write the top-level gophermap."""
if kwargs.get("mode", "") == "temp":
print("Placing temporary gophermap at " + \
self.conf["home"]["dir"] + " ...")
os.makedirs(self.conf["home"]["dir"], exist_ok=True)
with open(self.conf["home"]["dir"] + "/gophermap", "w") as fh:
fh.write(self.conf["home"]["title"] + \
self.conf["home"]["info"] + "\r\n" + \
self.conf["home"]["temp"])
else:
print("Generating gophermap at " + self.conf["home"]["dir"] + \
" ...")
os.makedirs(self.conf["home"]["dir"], exist_ok=True)
with open(self.conf["home"]["dir"] + "/gophermap", "w") as fh:
fh.write(self.conf["home"]["title"] + \
self.conf["home"]["info"] + "\r\n" + \
self.conf["home"]["updated"] + \
strftime((self.conf["home"]["timestamp"])) + "\n\n\n")
for f in feed_data:
fh.write("1" + f["display_title"] + "\t" + \
f["permalink"] + "\n")
def gen_feed_map(self, feed_data):
"""Given a data dictionary for a feed source, write a feed
gophermap."""
dir_path = self.conf["home"]["dir"] + "/" + feed_data["permalink"]
os.makedirs(dir_path, exist_ok=True)
self.clear_cache(dir_path)
count = 0
print("Generating gophermap " + feed_data["permalink"] + " ...")
with open(dir_path + "/gophermap", "w") as fh:
# Info text
fh.write(feed_data["display_title"] + "\r\n\n" + \
"1" + self.conf["home"]["nav_back"] + "\t" + \
self.conf["home"]["url"] + "\r\n\n" + \
"hWebsite" + "\tURL:" + feed_data["channel"]["link"] + \
"\r\n" + "hFeed" + "\tURL:" + feed_data["url"] + "\r\n\n")
# Item links
for i in feed_data["items"]:
count += 1
fh.write("h" + i["title"] + "\tURL:" + i["link"] + "\r\n")
file_meta = self.check_filetype(i["link"])
if "ext" in file_meta:
fh.write(file_meta["type"] + "(" + file_meta["ext"] + \
")\t" + str(count) + "." + file_meta["ext"] + "\r\n")
if ("author" in i) and (i["author"] != ""):
fh.write("author: " + i["author"] + "\n")
if ("date" in i) and (i["date"] != ""):
fh.write("posted: " + i["date"] + "\n")
fh.write("\n")
ftg = FTG()
ftg.init("config.yml")
ftg.run()