11import asyncio
2+ import time
3+ import uuid
24
35import structlog
46from django .conf import settings
1820)
1921@permission_classes ([AllowAny ])
2022async def sse_stream_view (request ):
21- raw_channels = request .GET .get ('channels' , 'beat' )
22- channels = [ c . strip () for c in raw_channels . split ( ',' ) if c . strip ()]
23+ channels = [ c . strip () for c in request .GET .get ('channels' , 'beat' ). split ( ',' ) if c . strip ()]
24+ redis_keys = { f'stream: { c } ' : request . headers . get ( 'Last-Event-ID' , '0' ) for c in channels }
2325
24- header_id = request .headers .get ('Last-Event-ID' )
25-
26- if header_id :
27- # catch up to browsers last id
28- last_id = header_id
29- else :
30- # get existing history
31- last_id = '0'
32-
33- redis_keys = {f'stream:{ c } ' : last_id for c in channels }
26+ # unique connection ID for stats
27+ conn_id = str (uuid .uuid4 ())
28+ stats_key = 'stream:active_connections'
3429
3530 redis_url = settings .CACHES ['default' ]['LOCATION' ]
3631
@@ -40,13 +35,14 @@ async def event_generator():
4035
4136 # initiate redis, pubsub and subscribe to channels
4237 redis = await aioredis .from_url (redis_url )
43- pubsub = redis .pubsub ()
44- await pubsub .subscribe (* channels )
4538
4639 total_sent_count = 0
4740
48- # loop
4941 try :
42+ # register connection with current timestamp
43+ await redis .zadd (stats_key , {conn_id : time .time ()})
44+
45+ # loop
5046 while True :
5147 # XREAD expects a dict with { key: last_id }
5248 events = await redis .xread (redis_keys , count = 1 , block = 5000 )
@@ -61,6 +57,8 @@ async def event_generator():
6157 payload = data [b'payload' ].decode ('utf-8' )
6258 yield f'id: { last_id } \n data: { payload } \n \n '
6359 else :
60+ # keep-alive + heartbeat in redis
61+ await redis .zadd (stats_key , {conn_id : time .time ()})
6462 yield ': \n \n '
6563
6664 # user left stream
@@ -69,7 +67,12 @@ async def event_generator():
6967
7068 # cleanup
7169 finally :
72- await pubsub .unsubscribe (* channels )
70+ # prune this connection
71+ await redis .zrem (stats_key , conn_id )
72+
73+ # prune stale connections (> 30s)
74+ await redis .zremrangebyscore (stats_key , 0 , time .time () - 30 )
75+
7376 await redis .close ()
7477
7578 log .info (action = 'stream_close' , messages_sent = total_sent_count )
0 commit comments