Add medium elaborations for syndication
This commit is contained in:
@@ -4,11 +4,15 @@ import os
|
||||
import json
|
||||
import hashlib
|
||||
import requests
|
||||
import re
|
||||
import logging
|
||||
|
||||
domain = "https://fundor333.com"
|
||||
rss_url_mastodon = "https://mastodon.social/@fundor333.rss"
|
||||
rss_url_bsky = "https://bsky.app/profile/did:plc:u7piwonv4s27ysugjaa6im2q/rss"
|
||||
|
||||
rss = [
|
||||
"https://mastodon.social/@fundor333.rss",
|
||||
"https://bsky.app/profile/did:plc:u7piwonv4s27ysugjaa6im2q/rss",
|
||||
"https://fundor333.medium.com/feed",
|
||||
]
|
||||
hacker_news_username = "fundor333"
|
||||
|
||||
|
||||
@@ -57,47 +61,31 @@ class HackerNewsFinder:
|
||||
json.dump(data, fp)
|
||||
|
||||
|
||||
class BskyFinder:
|
||||
class FeedFinder:
|
||||
def find_urls(self, string):
|
||||
x = string.split(" ")
|
||||
res = []
|
||||
for i in x:
|
||||
if i.startswith("https:") or i.startswith("http:"):
|
||||
res.append(i)
|
||||
return res
|
||||
urls = re.findall(r"(?:(?:https?|ftp):\/\/)?[\w/\-?=%.]+\.[\w/\-?=%.]+", string)
|
||||
|
||||
return urls
|
||||
|
||||
def get_label(self, feed_type: str, entry: dict):
|
||||
if feed_type == "rss20":
|
||||
return entry.get("summary")
|
||||
|
||||
def run(self, rss_url: str, domain: str, output: dict):
|
||||
feed = feedparser.parse(rss_url)
|
||||
if feed.status == 200:
|
||||
for entry in feed.entries:
|
||||
link = entry.get("link")
|
||||
for e in self.find_urls(entry.get("summary")):
|
||||
if domain in e:
|
||||
e = clean_slug(e)
|
||||
if output.get(e, False):
|
||||
output[e].append(link.strip())
|
||||
else:
|
||||
output[e] = [link.strip()]
|
||||
else:
|
||||
print("Failed to get RSS feed. Status code:", feed.status)
|
||||
logging.error(f"Feed type not supported: {feed_type}")
|
||||
|
||||
|
||||
class MastodonFinder:
|
||||
def find_urls(self, string):
|
||||
x = string.split('"')
|
||||
res = []
|
||||
for i in x:
|
||||
if i.startswith("https:") or i.startswith("http:"):
|
||||
res.append(i)
|
||||
return res
|
||||
return entry.get("content")
|
||||
|
||||
def run(self, rss_url: str, domain: str, output: dict):
|
||||
feed = feedparser.parse(rss_url)
|
||||
|
||||
if feed.status == 200:
|
||||
for entry in feed.entries:
|
||||
link = entry.get("link")
|
||||
for e in self.find_urls(entry.get("description")):
|
||||
|
||||
for e in self.find_urls(self.get_label(feed.version, entry)):
|
||||
if domain in e:
|
||||
print(f"{link} - {e}")
|
||||
e = clean_slug(e)
|
||||
if output.get(e, False):
|
||||
output[e].append(link.strip())
|
||||
@@ -110,23 +98,19 @@ class MastodonFinder:
|
||||
class WriterSyndication:
|
||||
def __init__(
|
||||
self,
|
||||
rss_url_mastodon: str,
|
||||
rss_url_bsky: str,
|
||||
domain: str,
|
||||
rss: list[str],
|
||||
hacker_news_username: str,
|
||||
domain: str,
|
||||
):
|
||||
self.output = {}
|
||||
self.rss_url_mastodon = rss_url_mastodon
|
||||
self.rss_url_bsky = rss_url_bsky
|
||||
self.rss = rss
|
||||
self.domain = domain
|
||||
self.hacker_news_username = hacker_news_username
|
||||
|
||||
def data_gathering(self):
|
||||
m = MastodonFinder()
|
||||
m.run(self.rss_url_mastodon, self.domain, self.output)
|
||||
|
||||
bs = BskyFinder()
|
||||
bs.run(self.rss_url_bsky, self.domain, self.output)
|
||||
ff = FeedFinder()
|
||||
for i in self.rss:
|
||||
ff.run(i, self.domain, self.output)
|
||||
|
||||
hn = HackerNewsFinder(self.hacker_news_username)
|
||||
hn.run(self.output)
|
||||
@@ -161,5 +145,9 @@ class WriterSyndication:
|
||||
self.write()
|
||||
|
||||
|
||||
w = WriterSyndication(rss_url_mastodon, rss_url_bsky, domain, hacker_news_username)
|
||||
w = WriterSyndication(
|
||||
rss,
|
||||
hacker_news_username,
|
||||
domain,
|
||||
)
|
||||
w.run()
|
||||
|
||||
Reference in New Issue
Block a user