Skip to content

Commit 90a7ffc

Browse files
authored
feat: add get_last_sync_time_ns to TimeSyncClient (#102)
* fix: fix issue where extra sync packets would be sent * chore: more robust estimate calculation, add dispersion calculation
1 parent e45e5f5 commit 90a7ffc

1 file changed

Lines changed: 130 additions & 46 deletions

File tree

synapse/utils/time_sync.py

Lines changed: 130 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import socket
66
import threading
77
import time
8-
from typing import Union
8+
from typing import Tuple, Union, List
99

1010
from synapse.api.time_pb2 import TimeSyncPacket
1111

@@ -23,6 +23,26 @@ class TimeSyncEstimate:
2323
offset_ns = 0
2424

2525

26+
def calculate_root_dispersion(samples: List[TimeSyncEstimate], best_offset_ns: int) -> Union[int, None]:
27+
if len(samples) == 0:
28+
return 0
29+
30+
min_rtt_sample = None
31+
for sample in samples:
32+
if sample.rtt_ns <= 0:
33+
continue
34+
if min_rtt_sample is None or sample.rtt_ns < min_rtt_sample.rtt_ns:
35+
min_rtt_sample = sample
36+
37+
if min_rtt_sample is None:
38+
return None
39+
40+
squared_deviations = [(sample.offset_ns - best_offset_ns) ** 2 for sample in samples]
41+
std_dev_ns = int((sum(squared_deviations) / len(samples)) ** 0.5)
42+
43+
root_dispersion_ns = (min_rtt_sample.rtt_ns // 2) + (2 * std_dev_ns)
44+
return root_dispersion_ns
45+
2646
def get_time_sync_estimate(packet: TimeSyncPacket) -> TimeSyncEstimate:
2747
server_calculation_time_ns = packet.server_send_time_ns - packet.server_receive_time_ns
2848

@@ -39,25 +59,51 @@ def get_time_sync_estimate(packet: TimeSyncPacket) -> TimeSyncEstimate:
3959
estimate.offset_ns = calculated_offset_ns
4060
return estimate
4161

62+
class OffsetEstimator:
63+
def __init__(self, window_size: int = 120):
64+
self._window_size = window_size
65+
self._best_offset_ns = 0
66+
self._samples: List[TimeSyncEstimate] = []
67+
68+
def add_sample(self, estimate: TimeSyncEstimate):
69+
self._samples.append(estimate)
70+
if len(self._samples) > self._window_size:
71+
self._samples.pop(0)
72+
73+
self._update()
74+
75+
def get_offset_ns(self) -> int:
76+
return self._best_offset_ns
77+
78+
def root_dispersion_ns(self) -> Union[int, None]:
79+
return calculate_root_dispersion(self._samples, self._best_offset_ns)
80+
81+
def _update(self):
82+
if len(self._samples) == 0:
83+
return
84+
85+
sorted_samples = sorted(self._samples, key=lambda x: x.rtt_ns)
86+
self._best_offset_ns = sorted_samples[0].offset_ns
87+
4288
class TimeSyncClient:
4389
def __init__(self, host: str, port: int, config: TimeSyncConfig = TimeSyncConfig(), logger: Union[logging.Logger, None] = None):
44-
self.client_id = self.generate_client_id()
90+
self.sequence_number = 0
91+
self.client_id = self._generate_client_id()
4592
self.host = host
4693
self.port = port
4794
self.running = False
48-
self.sequence_number = 0
4995
self.config = config or TimeSyncConfig()
96+
self.offset_estimator = OffsetEstimator()
5097
self.current_rtts = [TimeSyncEstimate() for _ in range(self.config.max_sync_packets)]
5198
self.latest_offset_ns = 0
99+
self.last_sync_time_ns = (0, 0)
52100
self.socket = None
53-
self.sync_thread = None
54-
self.receive_thread = None
55-
101+
self.worker_thread = None
56102
self.logger = logging.getLogger("time-sync") if logger is None else logger
57-
58-
self.logger.debug(f"TimeSyncClient initialized with client_id: {self.client_id}")
59103

60-
def generate_client_id(self) -> int:
104+
self.logger.debug(f"TimeSyncClient initialized with client_id: {self.client_id} and host: {self.host} and port: {self.port}")
105+
106+
def _generate_client_id(self) -> int:
61107
return random.randint(0, 2**32 - 1)
62108

63109
def start(self) -> bool:
@@ -68,17 +114,13 @@ def start(self) -> bool:
68114
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
69115
self.socket.bind(('0.0.0.0', 0))
70116
self.socket.connect((self.host, self.port))
117+
self.socket.settimeout(self.config.timeout_s)
71118

72-
self.receive_thread = threading.Thread(target=self._receive_loop)
73-
self.receive_thread.daemon = True
74-
self.receive_thread.start()
75-
76-
self.sync_thread = threading.Thread(target=self._sync_loop)
77-
self.sync_thread.daemon = True
78-
self.sync_thread.start()
119+
self.worker_thread = threading.Thread(target=self._worker_thread)
120+
self.worker_thread.daemon = True
121+
self.worker_thread.start()
79122

80-
self.logger.info(f"TimeSyncClient started with client_id: {self.client_id} and host: {self.host} and port: {self.port}")
81-
123+
self.logger.info("TimeSyncClient started")
82124
return True
83125

84126
def stop(self):
@@ -89,78 +131,120 @@ def stop(self):
89131
if self.socket:
90132
self.socket.close()
91133

92-
if self.sync_thread:
93-
self.sync_thread.join(timeout=1.0)
94-
if self.receive_thread:
95-
self.receive_thread.join(timeout=1.0)
134+
if self.worker_thread:
135+
self.worker_thread.join(timeout=1.0)
96136

97-
def _sync_loop(self):
98-
while self.running:
137+
def _worker_thread(self):
138+
try:
99139
self._send_next_sync_packet()
140+
while self.running:
141+
try:
142+
data, _ = self.socket.recvfrom(self.config.max_packet_size)
143+
self._handle_response(data)
144+
except socket.timeout:
145+
self.logger.debug("Timeout waiting for response")
146+
self._schedule_next_sync()
147+
except socket.error as e:
148+
if self.running:
149+
self.logger.error(f"Socket error: {e}")
150+
self._schedule_next_sync()
151+
except Exception as e:
152+
self.logger.error(f"Worker thread error: {e}")
153+
self.running = False
100154

101-
def _receive_loop(self):
102-
while self.running:
103-
try:
104-
data, _ = self.socket.recvfrom(self.config.max_packet_size)
105-
self.handle_response(data)
106-
except (socket.error, Exception) as e:
107-
if self.running:
108-
self.logger.error(f"Error receiving data: {e}")
155+
def _schedule_next_sync(self):
156+
if self.sequence_number >= self.config.max_sync_packets - 1:
157+
self._update_estimate()
158+
self.logger.debug(f"Synced with {self.config.max_sync_packets} packets, updating estimate - current offset: {self.latest_offset_ns} ns")
159+
time.sleep(self.config.sync_interval_s)
160+
if self.running:
161+
self._send_next_sync_packet()
162+
163+
else:
164+
time.sleep(self.config.send_delay_ms / 1000.0)
165+
if self.running:
166+
self.sequence_number += 1
167+
self._send_next_sync_packet()
109168

110169
def _send_next_sync_packet(self):
111-
if self.sequence_number >= self.config.max_sync_packets:
112-
self.update_estimate()
113-
self.logger.info(f"Synced with {self.config.max_sync_packets} packets, updating estimate - current offset: {self.latest_offset_ns} ns")
114-
time.sleep(self.config.sync_interval_s)
170+
if not self.running:
115171
return
116172

117173
if self.sequence_number == 0:
118-
self.logger.info(f"Sending sync packets...")
174+
self.logger.debug("Sending sync packets...")
119175

120176
request = TimeSyncPacket()
121177
request.client_id = self.client_id
122178
request.sequence = self.sequence_number
123179
request.client_send_time_ns = int(time.time_ns())
124180

125181
try:
182+
self.logger.debug(f"Sending sync packet {self.sequence_number} / {self.config.max_sync_packets}")
126183
self.socket.send(request.SerializeToString())
127184
except Exception as e:
128185
self.logger.error(f"Error sending packet: {e}")
129-
finally:
130-
time.sleep(self.config.send_delay_ms / 1000.0)
186+
self._schedule_next_sync()
131187

132-
def handle_response(self, data: bytes):
188+
def _handle_response(self, data: bytes):
133189
now_ns = time.time_ns()
134190

191+
response = None
135192
try:
136193
response = TimeSyncPacket()
137194
response.ParseFromString(data)
138195
response.client_receive_time_ns = now_ns
139196

140197
if response.client_id != self.client_id:
198+
self.logger.warning(f"Received sync packet from {response.client_id}, but expected {self.client_id}")
141199
return
142200

143201
estimate = get_time_sync_estimate(response)
202+
203+
if self.sequence_number >= self.config.max_sync_packets:
204+
self.logger.warning(f"Received sync packet {self.sequence_number} / {self.config.max_sync_packets}, but max is {self.config.max_sync_packets}")
205+
return
206+
144207
self.current_rtts[self.sequence_number] = estimate
145-
self.sequence_number += 1
208+
209+
self.last_sync_time_ns = (time.time_ns(), self.time_ns())
146210

147211
except Exception as e:
148212
self.logger.error(f"Error processing response: {e}")
213+
return
214+
215+
finally:
216+
self._schedule_next_sync()
149217

150-
def update_estimate(self):
218+
def _update_estimate(self):
151219
if not self.current_rtts:
152220
return
153221

154-
best_estimate = min(self.current_rtts[:self.sequence_number],
155-
key=lambda x: x.rtt_ns)
156-
157-
self.latest_offset_ns = best_estimate.offset_ns
222+
best_estimate = None
223+
for estimate in self.current_rtts[:min(self.sequence_number + 1, len(self.current_rtts))]:
224+
if estimate.rtt_ns <= 0:
225+
continue
226+
if best_estimate is None or estimate.rtt_ns < best_estimate.rtt_ns:
227+
best_estimate = estimate
228+
229+
if best_estimate is not None and best_estimate.rtt_ns > 0:
230+
self.offset_estimator.add_sample(best_estimate)
231+
self.latest_offset_ns = self.offset_estimator.get_offset_ns()
232+
root_dispersion_ns = self.offset_estimator.root_dispersion_ns()
233+
234+
self.logger.debug(f"Updated estimate - current offset: {self.latest_offset_ns / 1e6} ms, dispersion: {root_dispersion_ns / 1e6 if root_dispersion_ns is not None else 'N/A'} ms")
235+
158236
self.sequence_number = 0
159237
self.current_rtts = [TimeSyncEstimate() for _ in range(self.config.max_sync_packets)]
160238

161239
def get_offset_ns(self) -> int:
162240
return self.latest_offset_ns
163241

242+
def get_last_sync_time_ns(self) -> Tuple[int, int]:
243+
"""
244+
Returns a tuple of the last sync time in ns as [client's clock from time.time_ns(), synced clock from self.time_ns()]
245+
"""
246+
return self.last_sync_time_ns
247+
164248
def time_ns(self) -> int:
165249
return time.time_ns() + self.latest_offset_ns
166250

0 commit comments

Comments
 (0)