import feedparser import urllib3 import yaml import os from shutil import rmtree from sys import exit from time import sleep, strftime from hashi import Hashi from formatter import HtmlToFText class FTG: def init(self, config): """Load the config. Please call this first before other methods.""" self.conf = self.parse_yaml(config) self.hh = Hashi() def run(self): """Download feeds and generate gophermaps.""" any_change = False count = 0 all_feeds = [] for f in self.conf["feeds"]: # Check feed for changes dir_path = self.conf["home"]["dir"] + "/" + f["permalink"] feed_path = dir_path + "/" + self.conf["update"]["feed_file"] hash_path = dir_path + "/" + self.conf["update"]["hash_file"] self.hh.fetch_url(f["url"], feed_path) check = self.hh.check_hash(feed_path, hash_path) # Build a list of feed data to regenerate the home map f["path"] = feed_path all_feeds.append(self.parse_rss(f)) if check["changed"]: print("Getting update ...") any_change = True # Put up placeholder home map while downloading feed items self.gen_home_map([], mode="temp") self.parse_file_list(all_feeds[count]["items"], dir_path) # Cache feed hash with open(hash_path, "w") as fh: fh.write(check["new"]) # Regenerate the map self.gen_feed_map(all_feeds[count]) else: print("Feed is up-to-date.") count += 1 sleep(self.conf["update"]["sleep"]) # If any of the feeds have changed, regenerate the home map # to ensure the permalinks to feed maps are current if any_change: self.gen_home_map(all_feeds) def parse_yaml(self, yml): """Open a YAML file and return a dictionary of values.""" try: fh = open(yml, "r") data = yaml.safe_load(fh) fh.close() except: print("Error: could not load config.") exit(1) return data def parse_rss(self, feed): """Given a dictionary with a feed url, title, permalink and feed file path, parse the url and return a feed data dictionary.""" if ("url" not in feed) or (feed["url"] == None) or \ ("permalink" not in feed) or (feed["permalink"] == None): print("Error: missing/empty field. Please check config.") exit(1) try: print("Parsing " + feed["permalink"] + " ...") resp = feedparser.parse(feed["path"]) except: print("Error: could not parse (" + feed["url"] + ")") exit(1) # Insert custom fields resp["url"] = feed["url"] resp["permalink"] = feed["permalink"] if ("title" in feed) and (feed["title"] != None): resp["display_title"] = feed["title"] else: resp["display_title"] = resp["channel"]["title"] return resp def check_filetype(self, url): """Given a resource url, return a dictionary containing the gopher filetype and file extension.""" meta = {} meta["ext"] = url.rsplit(".", 1)[1] domain = url.rsplit(".", 1)[0] if meta["ext"] == "gif": meta["type"] = "g" elif (meta["ext"] == "png") or (meta["ext"].lower() == "jpg") or \ (meta["ext"].lower() == "jpeg"): meta["type"] = "I" elif meta["ext"] == "pdf": meta["type"] = "d" else: meta = {"type": "0", "ext": "txt"} # Return empty meta if site matches keywords list for kw in self.conf["update"]["skip_cache"]: if (kw in domain) or (kw in meta["ext"]): meta = {} break return meta def get_file(self, url, ext, path): """Save a link to file given the url, extension and file path.""" # Initialise urllib and include user-agent with request hdrs = {"user-agent": self.conf["update"]["user_agent"]} http = urllib3.PoolManager(headers=hdrs) # Disable ssl warnings urllib3.disable_warnings() resp = http.request("GET", url) fmt = HtmlToFText() if ext == "txt": try: txt = fmt.convert(resp.data.decode("utf-8")) with open(path, "w", encoding="utf-8") as fh: fh.write(txt) except UnicodeDecodeError: # Attempt to work around "codec can't decode byte" error # if certain this is a txt/html file txt = fmt.convert(resp.data.decode("ISO-8859-1")) with open(path, "w", encoding="ISO-8859-1") as fh: fh.write(txt) else: try: with open(path, "wb") as fh: fh.write(resp.data) except: with open(path, "w") as fh: fh.write("An error occurred while saving the file." + \ "Please notify the administrator.") def parse_file_list(self, file_list, path): """Given a list of file urls and target directory path, save the links as files to the path.""" count = 0 # Make sure path exists os.makedirs(path, exist_ok=True) for i in file_list: count += 1 file_meta = self.check_filetype(i["link"]) if "ext" in file_meta: print("Downloading item (" + str(count) + "/" + \ str(len(file_list)) + ") ...") file_path = path + "/" + str(count) + "." + \ file_meta["ext"] self.get_file(i["link"], file_meta["ext"], file_path) sleep(self.conf["update"]["sleep"]) def clear_cache(self, path, *args, **kwargs): """Given a directory path and removal mode, remove the selections. Modes: dirs, files, all""" mode = kwargs.get("mode", "") if (mode == "dirs") or (mode == ""): for rt, dirs, files in os.walk(path): for d in dirs: rmtree(path + "/" + d) elif (mode == "files") or (mode == ""): for rt, dirs, files in os.walk(path): for f in files: os.remove(path + "/" + f) def gen_home_map(self, feed_data, *args, **kwargs): """Write the top-level gophermap.""" if kwargs.get("mode", "") == "temp": print("Placing temporary gophermap at " + \ self.conf["home"]["dir"] + " ...") os.makedirs(self.conf["home"]["dir"], exist_ok=True) with open(self.conf["home"]["dir"] + "/gophermap", "w") as fh: fh.write(self.conf["home"]["title"] + \ self.conf["home"]["info"] + "\r\n" + \ self.conf["home"]["temp"]) else: print("Generating gophermap at " + self.conf["home"]["dir"] + \ " ...") os.makedirs(self.conf["home"]["dir"], exist_ok=True) with open(self.conf["home"]["dir"] + "/gophermap", "w") as fh: fh.write(self.conf["home"]["title"] + \ self.conf["home"]["info"] + "\r\n" + \ self.conf["home"]["updated"] + \ strftime((self.conf["home"]["timestamp"])) + "\n\n\n") for f in feed_data: fh.write("1" + f["display_title"] + "\t" + \ f["permalink"] + "\n") def gen_feed_map(self, feed_data): """Given a data dictionary for a feed source, write a feed gophermap.""" dir_path = self.conf["home"]["dir"] + "/" + feed_data["permalink"] os.makedirs(dir_path, exist_ok=True) self.clear_cache(dir_path) count = 0 print("Generating gophermap " + feed_data["permalink"] + " ...") with open(dir_path + "/gophermap", "w") as fh: # Info text fh.write(feed_data["display_title"] + "\r\n\n" + \ "1" + self.conf["home"]["nav_back"] + "\t" + \ self.conf["home"]["url"] + "\r\n\n" + \ "hWebsite" + "\tURL:" + feed_data["channel"]["link"] + \ "\r\n" + "hFeed" + "\tURL:" + feed_data["url"] + "\r\n\n") # Item links for i in feed_data["items"]: count += 1 fh.write("h" + i["title"] + "\tURL:" + i["link"] + "\r\n") file_meta = self.check_filetype(i["link"]) if "ext" in file_meta: fh.write(file_meta["type"] + "(" + file_meta["ext"] + \ ")\t" + str(count) + "." + file_meta["ext"] + "\r\n") if ("author" in i) and (i["author"] != ""): fh.write("author: " + i["author"] + "\n") if ("date" in i) and (i["date"] != ""): fh.write("posted: " + i["date"] + "\n") fh.write("\n") ftg = FTG() ftg.init("config.yml") ftg.run()