@@ -35,6 +35,22 @@ use std::sync::atomic::{AtomicBool, Ordering};
3535use std:: time:: Duration ;
3636use tokio:: sync:: { Semaphore , mpsc, watch} ;
3737
38+ /// Server ID type alias for clarity.
39+ type ServerId = i32 ;
40+
41+ /// Batches grouped by table bucket, keyed by server.
42+ type BatchesByLeader = HashMap < ServerId , HashMap < TableBucket , LookupBatch > > ;
43+
44+ /// Result of grouping lookups by leader.
45+ struct GroupByLeaderResult {
46+ /// Lookup batches grouped by leader server.
47+ batches_by_leader : BatchesByLeader ,
48+ /// Tables with unknown leaders that need metadata refresh.
49+ unknown_leader_tables : HashSet < TablePath > ,
50+ /// Partition IDs with unknown leaders.
51+ unknown_leader_partition_ids : HashSet < PartitionId > ,
52+ }
53+
3854/// Lookup sender that batches and sends lookup requests.
3955pub struct LookupSender {
4056 /// Metadata for leader lookup
@@ -195,8 +211,11 @@ impl LookupSender {
195211 }
196212
197213 // Group by leader
198- let ( lookup_batches, unknown_leader_tables, unknown_leader_partition_ids) =
199- self . group_by_leader ( lookups) ;
214+ let GroupByLeaderResult {
215+ batches_by_leader : lookup_batches,
216+ unknown_leader_tables,
217+ unknown_leader_partition_ids,
218+ } = self . group_by_leader ( lookups) ;
200219
201220 // Update metadata for tables with unknown leaders
202221 if !unknown_leader_tables. is_empty ( ) {
@@ -234,16 +253,9 @@ impl LookupSender {
234253 }
235254
236255 /// Groups lookups by leader server.
237- fn group_by_leader (
238- & self ,
239- lookups : Vec < LookupQuery > ,
240- ) -> (
241- HashMap < i32 , HashMap < TableBucket , LookupBatch > > ,
242- HashSet < TablePath > ,
243- HashSet < PartitionId > ,
244- ) {
256+ fn group_by_leader ( & self , lookups : Vec < LookupQuery > ) -> GroupByLeaderResult {
245257 let cluster = self . metadata . get_cluster ( ) ;
246- let mut batches_by_leader: HashMap < i32 , HashMap < TableBucket , LookupBatch > > = HashMap :: new ( ) ;
258+ let mut batches_by_leader: BatchesByLeader = HashMap :: new ( ) ;
247259 let mut unknown_leader_tables: HashSet < TablePath > = HashSet :: new ( ) ;
248260 let mut unknown_leader_partition_ids: HashSet < PartitionId > = HashSet :: new ( ) ;
249261
@@ -275,11 +287,11 @@ impl LookupSender {
275287 . add_lookup ( lookup) ;
276288 }
277289
278- (
290+ GroupByLeaderResult {
279291 batches_by_leader,
280292 unknown_leader_tables,
281293 unknown_leader_partition_ids,
282- )
294+ }
283295 }
284296
285297 /// Sends lookup requests to a specific destination server.
0 commit comments