226 lines
9.1 KiB
Python
226 lines
9.1 KiB
Python
|
import feedparser
|
||
|
import urllib3
|
||
|
import yaml
|
||
|
|
||
|
import os
|
||
|
from shutil import rmtree
|
||
|
from sys import exit
|
||
|
from time import sleep, strftime
|
||
|
|
||
|
from hashi import Hashi
|
||
|
from formatter import HtmlToFText
|
||
|
|
||
|
|
||
|
class FTG:
|
||
|
|
||
|
def init(self, config):
|
||
|
"""Load the config. Please call this first before other methods."""
|
||
|
self.conf = self.parse_yaml(config)
|
||
|
self.hh = Hashi()
|
||
|
|
||
|
def run(self):
|
||
|
"""Download feeds and generate gophermaps."""
|
||
|
any_change = False
|
||
|
count = 0
|
||
|
all_feeds = []
|
||
|
for f in self.conf["feeds"]:
|
||
|
# Check feed for changes
|
||
|
dir_path = self.conf["home"]["dir"] + "/" + f["permalink"]
|
||
|
feed_path = dir_path + "/" + self.conf["update"]["feed_file"]
|
||
|
hash_path = dir_path + "/" + self.conf["update"]["hash_file"]
|
||
|
self.hh.fetch_url(f["url"], feed_path)
|
||
|
check = self.hh.check_hash(feed_path, hash_path)
|
||
|
# Build a list of feed data to regenerate the home map
|
||
|
f["path"] = feed_path
|
||
|
all_feeds.append(self.parse_rss(f))
|
||
|
if check["changed"]:
|
||
|
print("Getting update ...")
|
||
|
any_change = True
|
||
|
# Put up placeholder home map while downloading feed items
|
||
|
self.gen_home_map([], mode="temp")
|
||
|
self.parse_file_list(all_feeds[count]["items"], dir_path)
|
||
|
# Cache feed hash
|
||
|
with open(hash_path, "w") as fh:
|
||
|
fh.write(check["new"])
|
||
|
# Regenerate the map
|
||
|
self.gen_feed_map(all_feeds[count])
|
||
|
else:
|
||
|
print("Feed is up-to-date.")
|
||
|
count += 1
|
||
|
sleep(self.conf["update"]["sleep"])
|
||
|
# If any of the feeds have changed, regenerate the home map
|
||
|
# to ensure the permalinks to feed maps are current
|
||
|
if any_change:
|
||
|
self.gen_home_map(all_feeds)
|
||
|
|
||
|
def parse_yaml(self, yml):
|
||
|
"""Open a YAML file and return a dictionary of values."""
|
||
|
try:
|
||
|
fh = open(yml, "r")
|
||
|
data = yaml.safe_load(fh)
|
||
|
fh.close()
|
||
|
except:
|
||
|
print("Error: could not load config.")
|
||
|
exit(1)
|
||
|
return data
|
||
|
|
||
|
def parse_rss(self, feed):
|
||
|
"""Given a dictionary with a feed url, title, permalink and feed file
|
||
|
path, parse the url and return a feed data dictionary."""
|
||
|
if ("url" not in feed) or (feed["url"] == None) or \
|
||
|
("permalink" not in feed) or (feed["permalink"] == None):
|
||
|
print("Error: missing/empty field. Please check config.")
|
||
|
exit(1)
|
||
|
try:
|
||
|
print("Parsing " + feed["permalink"] + " ...")
|
||
|
resp = feedparser.parse(feed["path"])
|
||
|
except:
|
||
|
print("Error: could not parse (" + feed["url"] + ")")
|
||
|
exit(1)
|
||
|
# Insert custom fields
|
||
|
resp["url"] = feed["url"]
|
||
|
resp["permalink"] = feed["permalink"]
|
||
|
if ("title" in feed) and (feed["title"] != None):
|
||
|
resp["display_title"] = feed["title"]
|
||
|
else:
|
||
|
resp["display_title"] = resp["channel"]["title"]
|
||
|
return resp
|
||
|
|
||
|
def check_filetype(self, url):
|
||
|
"""Given a resource url, return a dictionary containing the gopher
|
||
|
filetype and file extension."""
|
||
|
meta = {}
|
||
|
meta["ext"] = url.rsplit(".", 1)[1]
|
||
|
domain = url.rsplit(".", 1)[0]
|
||
|
if meta["ext"] == "gif":
|
||
|
meta["type"] = "g"
|
||
|
elif (meta["ext"] == "png") or (meta["ext"].lower() == "jpg") or \
|
||
|
(meta["ext"].lower() == "jpeg"):
|
||
|
meta["type"] = "I"
|
||
|
elif meta["ext"] == "pdf":
|
||
|
meta["type"] = "d"
|
||
|
else:
|
||
|
meta = {"type": "0", "ext": "txt"}
|
||
|
# Return empty meta if site matches keywords list
|
||
|
for kw in self.conf["update"]["skip_cache"]:
|
||
|
if (kw in domain) or (kw in meta["ext"]):
|
||
|
meta = {}
|
||
|
break
|
||
|
return meta
|
||
|
|
||
|
def get_file(self, url, ext, path):
|
||
|
"""Save a link to file given the url, extension and file path."""
|
||
|
# Initialise urllib and include user-agent with request
|
||
|
hdrs = {"user-agent": self.conf["update"]["user_agent"]}
|
||
|
http = urllib3.PoolManager(headers=hdrs)
|
||
|
# Disable ssl warnings
|
||
|
urllib3.disable_warnings()
|
||
|
resp = http.request("GET", url)
|
||
|
fmt = HtmlToFText()
|
||
|
if ext == "txt":
|
||
|
try:
|
||
|
txt = fmt.convert(resp.data.decode("utf-8"))
|
||
|
with open(path, "w", encoding="utf-8") as fh:
|
||
|
fh.write(txt)
|
||
|
except UnicodeDecodeError:
|
||
|
# Attempt to work around "codec can't decode byte" error
|
||
|
# if certain this is a txt/html file
|
||
|
txt = fmt.convert(resp.data.decode("ISO-8859-1"))
|
||
|
with open(path, "w", encoding="ISO-8859-1") as fh:
|
||
|
fh.write(txt)
|
||
|
else:
|
||
|
try:
|
||
|
with open(path, "wb") as fh:
|
||
|
fh.write(resp.data)
|
||
|
except:
|
||
|
with open(path, "w") as fh:
|
||
|
fh.write("An error occurred while saving the file." + \
|
||
|
"Please notify the administrator.")
|
||
|
|
||
|
def parse_file_list(self, file_list, path):
|
||
|
"""Given a list of file urls and target directory path, save the links
|
||
|
as files to the path."""
|
||
|
count = 0
|
||
|
# Make sure path exists
|
||
|
os.makedirs(path, exist_ok=True)
|
||
|
for i in file_list:
|
||
|
count += 1
|
||
|
file_meta = self.check_filetype(i["link"])
|
||
|
if "ext" in file_meta:
|
||
|
print("Downloading item (" + str(count) + "/" + \
|
||
|
str(len(file_list)) + ") ...")
|
||
|
file_path = path + "/" + str(count) + "." + \
|
||
|
file_meta["ext"]
|
||
|
self.get_file(i["link"], file_meta["ext"], file_path)
|
||
|
sleep(self.conf["update"]["sleep"])
|
||
|
|
||
|
def clear_cache(self, path, *args, **kwargs):
|
||
|
"""Given a directory path and removal mode, remove the selections.
|
||
|
Modes: dirs, files, all"""
|
||
|
mode = kwargs.get("mode", "")
|
||
|
if (mode == "dirs") or (mode == ""):
|
||
|
for rt, dirs, files in os.walk(path):
|
||
|
for d in dirs:
|
||
|
rmtree(path + "/" + d)
|
||
|
elif (mode == "files") or (mode == ""):
|
||
|
for rt, dirs, files in os.walk(path):
|
||
|
for f in files:
|
||
|
os.remove(path + "/" + f)
|
||
|
|
||
|
def gen_home_map(self, feed_data, *args, **kwargs):
|
||
|
"""Write the top-level gophermap."""
|
||
|
if kwargs.get("mode", "") == "temp":
|
||
|
print("Placing temporary gophermap at " + \
|
||
|
self.conf["home"]["dir"] + " ...")
|
||
|
os.makedirs(self.conf["home"]["dir"], exist_ok=True)
|
||
|
with open(self.conf["home"]["dir"] + "/gophermap", "w") as fh:
|
||
|
fh.write(self.conf["home"]["title"] + \
|
||
|
self.conf["home"]["info"] + "\r\n" + \
|
||
|
self.conf["home"]["temp"])
|
||
|
else:
|
||
|
print("Generating gophermap at " + self.conf["home"]["dir"] + \
|
||
|
" ...")
|
||
|
os.makedirs(self.conf["home"]["dir"], exist_ok=True)
|
||
|
with open(self.conf["home"]["dir"] + "/gophermap", "w") as fh:
|
||
|
fh.write(self.conf["home"]["title"] + \
|
||
|
self.conf["home"]["info"] + "\r\n" + \
|
||
|
self.conf["home"]["updated"] + \
|
||
|
strftime((self.conf["home"]["timestamp"])) + "\n\n\n")
|
||
|
for f in feed_data:
|
||
|
fh.write("1" + f["display_title"] + "\t" + \
|
||
|
f["permalink"] + "\n")
|
||
|
|
||
|
def gen_feed_map(self, feed_data):
|
||
|
"""Given a data dictionary for a feed source, write a feed
|
||
|
gophermap."""
|
||
|
dir_path = self.conf["home"]["dir"] + "/" + feed_data["permalink"]
|
||
|
os.makedirs(dir_path, exist_ok=True)
|
||
|
self.clear_cache(dir_path)
|
||
|
count = 0
|
||
|
print("Generating gophermap " + feed_data["permalink"] + " ...")
|
||
|
with open(dir_path + "/gophermap", "w") as fh:
|
||
|
# Info text
|
||
|
fh.write(feed_data["display_title"] + "\r\n\n" + \
|
||
|
"1" + self.conf["home"]["nav_back"] + "\t" + \
|
||
|
self.conf["home"]["url"] + "\r\n\n" + \
|
||
|
"hWebsite" + "\tURL:" + feed_data["channel"]["link"] + \
|
||
|
"\r\n" + "hFeed" + "\tURL:" + feed_data["url"] + "\r\n\n")
|
||
|
# Item links
|
||
|
for i in feed_data["items"]:
|
||
|
count += 1
|
||
|
fh.write("h" + i["title"] + "\tURL:" + i["link"] + "\r\n")
|
||
|
file_meta = self.check_filetype(i["link"])
|
||
|
if "ext" in file_meta:
|
||
|
fh.write(file_meta["type"] + "(" + file_meta["ext"] + \
|
||
|
")\t" + str(count) + "." + file_meta["ext"] + "\r\n")
|
||
|
if ("author" in i) and (i["author"] != ""):
|
||
|
fh.write("author: " + i["author"] + "\n")
|
||
|
if ("date" in i) and (i["date"] != ""):
|
||
|
fh.write("posted: " + i["date"] + "\n")
|
||
|
fh.write("\n")
|
||
|
|
||
|
|
||
|
ftg = FTG()
|
||
|
ftg.init("config.yml")
|
||
|
ftg.run()
|