forked from twissandra/twissandra
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathcass.py
More file actions
198 lines (162 loc) · 6.82 KB
/
cass.py
File metadata and controls
198 lines (162 loc) · 6.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import uuid
import time
import threading
import cql
_local = threading.local()
try:
cursor = _local.cursor
except AttributeError:
conn = cql.connect("localhost", cql_version="3.0.0")
cursor = _local.cursor = conn.cursor()
cursor.execute("USE twissandra")
__all__ = [
'get_user_by_username', 'get_friend_usernames', 'get_follower_usernames', 'get_timeline',
'get_userline', 'get_tweet', 'save_user', 'save_tweet', 'add_friends', 'remove_friend',
'DatabaseError', 'NotFound', 'InvalidDictionary', 'PUBLIC_TIMELINE_KEY'
]
# NOTE: Having a single userline key to store all of the public tweets is not
# scalable. Currently, Cassandra requires that an entire row (meaning
# every column under a given key) to be able to fit in memory. You can
# imagine that after a while, the entire public timeline would exceed
# available memory.
#
# The fix for this is to partition the timeline by time, so we could use
# a key like !PUBLIC!2010-04-01 to partition it per day. We could drill
# down even further into hourly keys, etc. Since this is a demonstration
# and that would add quite a bit of extra code, this excercise is left to
# the reader.
PUBLIC_TIMELINE_KEY = '!PUBLIC!'
# EXCEPTIONS
class DatabaseError(Exception):
"""
The base error that functions in this module will raise when things go
wrong.
"""
pass
class NotFound(DatabaseError): pass
class InvalidDictionary(DatabaseError): pass
# QUERYING APIs
def get_user_by_username(username):
"""
Given a username, this gets the user record.
"""
cursor.execute("SELECT password FROM users WHERE username = :user", dict(user=username))
if not (cursor.rowcount > 0):
raise NotFound('User %s not found' % (username,))
return dict(password=cursor.fetchone()[0])
def get_friend_usernames(username, count=5000):
"""
Given a username, gets the usernames of the people that the user is
following.
"""
cursor.execute("SELECT followed FROM following WHERE username = :user", dict(user=username))
return [row[0] for row in cursor if cursor.rowcount > 0]
def get_follower_usernames(username, count=5000):
"""
Given a username, gets the usernames of the people following that user.
"""
cursor.execute("SELECT following FROM followers WHERE username = :user", dict(user=username))
return [row[0] for row in cursor if cursor.rowcount > 0]
def get_timeline(username, start=None, limit=40):
"""
Given a username, get their tweet timeline (tweets from people they follow).
"""
if start: posted_at_start = str(start)
else: posted_at_start = "now()"
query = """
SELECT tweetid, posted_by, body FROM timeline
WHERE username = :username AND tweetid < %s ORDER BY tweetid DESC LIMIT %d
"""
cursor.execute(query % (posted_at_start, limit+1), dict(username=username))
nextid = None
tweets = []
for row in cursor:
tweets.append({"id": row[0], "username": row[1], "body": row[2]})
if len(tweets) > limit:
nextid = tweets.pop()["id"]
return (tweets, nextid)
def get_userline(username, start=None, limit=40):
"""
Given a username, get their userline (their tweets).
"""
if start: posted_at_start = str(start)
else: posted_at_start = "now()"
query = """
SELECT tweetid, body FROM userline
WHERE username = :username AND tweetid < %s ORDER BY tweetid DESC LIMIT %d
"""
cursor.execute(query % (posted_at_start, limit+1), dict(username=username))
nextid = None
tweets = []
for row in cursor:
tweets.append({"id": row[0], "username": username, "body": row[1]})
if len(tweets) > limit:
nextid = tweets.pop()["id"]
return (tweets, nextid)
def get_tweet(tweet_id):
"""
Given a tweet id, this gets the entire tweet record.
"""
cursor.execute("SELECT username, body FROM tweets WHERE tweetid = :uuid", dict(uuid=tweet_id))
if not (cursor.rowcount > 0):
raise NotFound('Tweet %s not found' % (tweet_id,))
row = cursor.fetchone()
return {'username': row[0], 'body': row[1].decode('utf-8')}
# INSERTING APIs
def save_user(username, password):
"""
Saves the user record.
"""
cursor.execute(
"UPDATE users SET password = :password WHERE username = :user_id",
dict(password=password, user_id=username))
def save_tweet(username, body):
"""
Saves the tweet record.
"""
# Create a type 1 UUID based on the current time.
tweet_id = uuid.uuid1()
# Make sure the tweet body is utf-8 encoded.
body = body.encode('utf-8')
# Insert the tweet, then into the user's userline, then into the public userline.
cursor.execute(
"INSERT INTO tweets (tweetid, username, body) VALUES (:tweet_id, :username, :body)",
dict(tweet_id=tweet_id, username=username, body=body))
cursor.execute(
"INSERT INTO userline (username, tweetid, body) VALUES (:username, :posted_at, :body)",
dict(username=username, posted_at=tweet_id, body=body))
cursor.execute(
"""INSERT INTO timeline (username, tweetid, posted_by, body)
VALUES (:username, :posted_at, :posted_by, :body)""",
dict(username=PUBLIC_TIMELINE_KEY, posted_at=tweet_id, posted_by=username, body=body))
# Get the user's followers, and insert the tweet into all of their streams
follower_usernames = [username] + get_follower_usernames(username)
for follower_username in follower_usernames:
cursor.execute(
"""INSERT INTO timeline (username, tweetid, posted_by, body)
VALUES (:username, :posted_at, :posted_by, :body)""",
dict(username=follower_username, posted_at=tweet_id, posted_by=username, body=body))
def add_friends(from_username, to_usernames):
"""
Adds a friendship relationship from one user to some others.
"""
# FIXME: use a BATCH here
for to_username in to_usernames:
cursor.execute(
"INSERT INTO following (username, followed) VALUES (:from_username, :to_username)",
dict(from_username=from_username, to_username=to_username))
cursor.execute(
"INSERT INTO followers (username, following) VALUES (:to_username, :from_username)",
dict(from_username=from_username, to_username=to_username))
def remove_friend(from_username, to_username):
"""
Removes a friendship relationship from one user to some others.
"""
# FIXME: use a BATCH here
cursor.execute(
"DELETE FROM following WHERE username = :from_username AND followed = :to_username",
dict(from_username=from_username, to_username=to_username))
cursor.execute(
"DELETE FROM followers WHERE username = :to_username AND following = :from_username",
dict(from_username=from_username, to_username=to_username))
# vi:se ts=4 sw=4 ai et nu: