-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerate_rss.py
More file actions
136 lines (117 loc) · 5.75 KB
/
generate_rss.py
File metadata and controls
136 lines (117 loc) · 5.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import requests
from lxml import etree as ET
from datetime import datetime
from dateutil import parser as date_parser
import os
# Constants
BEARER_TOKEN = os.getenv("BEARER_TOKEN")
USERNAME = 'CUSocialMedia'
USER_ID = 132922064 # Value was fetched from using X API v2 /2/users/by/username/CUSocialMedia
MAX_TWEETS = 10
RSS_FILE = os.path.join(os.path.dirname(__file__), '..', 'feeds', 'cusocialmedia_rss.xml')
# Fetches the latest tweets from a user using X API v2
def get_user_tweets(user_id):
url = f'https://api.x.com/2/users/{user_id}/tweets'
headers = {'Authorization': f'Bearer {BEARER_TOKEN}'}
params = {
'max_results': MAX_TWEETS,
'exclude': "replies,retweets",
'tweet.fields': "id,text,author_id,created_at,attachments,entities",
'expansions': "attachments.media_keys",
'media.fields': "url,type,alt_text,preview_image_url",
}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
# Formats tweet text by removing links for media attachments
def format_text(tweet, media_map):
text = tweet["text"]
if tweet.get("entities"):
# URLs - Remove URLs that are media attachments
if 'urls' in tweet["entities"]:
for url in tweet["entities"]['urls']:
if "media_key" in url:
text = text.replace(url['url'], '')
# Formats tweet text with HTML links for URLs, mentions, and hashtags
def format_entities(tweet, media_map):
text = tweet["text"]
if tweet.get("entities"):
# URLs
if 'urls' in tweet["entities"]:
for url in tweet["entities"]['urls']:
expanded_url = url.get('expanded_url', url['url'])
display_url = url.get('display_url', expanded_url)
if "media_key" not in url:
text = text.replace(url['url'], f'<a href="{expanded_url}" target="_blank">{display_url}</a>')
else: # Replace media URLs with empty string
text = text.replace(url['url'], '')
# Mentions
if 'mentions' in tweet["entities"]:
for mention in tweet["entities"]['mentions']:
username = mention['username']
text = text.replace(f"@{username}", f'<a href="https://x.com/{username}" target="_blank">@{username}</a>')
# Hashtags
if 'hashtags' in tweet["entities"]:
for hashtag in tweet["entities"]['hashtags']:
tag = hashtag['tag']
text = text.replace(f"#{tag}", f'<a href="https://x.com/hashtag/{tag}?src=hashtag_click" target="_blank">#{tag}</a>')
# Media attachments
if "attachments" in tweet and "media_keys" in tweet["attachments"]:
count = len(tweet["attachments"]['media_keys'])
text += f'<div style="display:flex; flex-wrap:wrap; border-radius:16px; margin-top: 10px; overflow: hidden; width: fit-content;">'
if count > 1:
width = 50
aspect_ratio = 4/3
else:
width = 100
aspect_ratio = 'auto'
for media_key in tweet["attachments"]['media_keys']:
media = media_map.get(media_key)
if media:
if media["type"] == 'photo':
text += f'<img src="{media["url"]}" alt="" style="max-width:{width}% !important; max-height: 300px; height:auto; aspect-ratio:{aspect_ratio}; object-fit: cover;" />'
elif media["type"] in ['video', 'animated_gif']:
text += f'<img src="{media["preview_image_url"]}" alt="" style="max-width:{width}% !important; max-height: 300px; height:auto; aspect-ratio:{aspect_ratio}; object-fit: cover;" />'
text += '</div>'
return text
# Returns dictionary mapping media keys to media objects
def get_media_map(includes):
media_map = {}
for media in includes.get('media', []):
media_map[media.get("media_key")] = media
return media_map
# Generates the RSS feed from the tweets data
def create_rss_feed(tweets, username):
rss = ET.Element('rss', version='2.0')
channel = ET.SubElement(rss, 'channel')
ET.SubElement(channel, 'title').text = f"Tweets by @{username}"
ET.SubElement(channel, 'link').text = f"https://x.com/{username}"
ET.SubElement(channel, 'description').text = f"Recent tweets posted by @{username}"
ET.SubElement(channel, 'lastBuildDate').text = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
media_map = get_media_map(tweets.get("includes", {}))
for tweet in tweets["data"]:
tweet_text = tweet["text"]
tweet_url = f'https://x.com/{username}/status/{tweet["id"]}'
pub_date = date_parser.parse(tweet["created_at"]).strftime('%a, %d %b %Y %H:%M:%S GMT')
description_html = format_entities(tweet, media_map)
item = ET.SubElement(channel, 'item')
# ET.SubElement(item, 'title').text = tweet_text
ET.SubElement(item, 'title').text = description_html
desc = ET.SubElement(item, 'description')
desc.text = ET.CDATA(description_html)
ET.SubElement(item, 'pubDate').text = pub_date
ET.SubElement(item, 'guid').text = tweet_url
ET.SubElement(item, 'link').text = tweet_url
return ET.ElementTree(rss)
def save_rss(tree, filename):
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'wb') as f:
tree.write(f, pretty_print=True, xml_declaration=True, encoding='utf-8')
if __name__ == '__main__':
tweets = get_user_tweets(USER_ID)
num_tweets = len(tweets["data"])
print(tweets)
print(f"Extracted {num_tweets} tweets")
rss_tree = create_rss_feed(tweets, USERNAME)
save_rss(rss_tree, RSS_FILE)
print(f"✅ RSS feed saved to {RSS_FILE}")