Skip to content

Commit 88adefb

Browse files
committed
Avoid dropping packets while queue is full.
Closes: #78
1 parent 0f7c7e5 commit 88adefb

1 file changed

Lines changed: 52 additions & 14 deletions

File tree

internal/client/dispatcher.go

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,32 @@ func (c *Client) asyncStreamDispatcher(ctx context.Context) {
5757
return true
5858
}
5959

60+
waitForTxCapacity := func(required int) bool {
61+
if required <= 0 {
62+
return true
63+
}
64+
for {
65+
if c.txChannelHasCapacity(required) {
66+
return true
67+
}
68+
69+
select {
70+
case <-ctx.Done():
71+
return false
72+
case <-c.txSpaceSignal:
73+
case <-idleTimer.C:
74+
}
75+
76+
if !idleTimer.Stop() {
77+
select {
78+
case <-idleTimer.C:
79+
default:
80+
}
81+
}
82+
idleTimer.Reset(idlePoll)
83+
}
84+
}
85+
6086
dispatchLoop:
6187
for {
6288
currentVersion := c.streamSetVersion.Load()
@@ -196,8 +222,8 @@ dispatchLoop:
196222
continue dispatchLoop
197223
}
198224

199-
if !c.txChannelHasCapacity(len(conns)) {
200-
if !waitForWork() {
225+
if !waitForTxCapacity(len(conns)) {
226+
if ctx.Err() != nil {
201227
return
202228
}
203229
continue dispatchLoop
@@ -329,6 +355,7 @@ dispatchLoop:
329355
finalPacket.packetType = Enums.PACKET_PACKED_CONTROL_BLOCKS
330356
finalPacket.payload = payload
331357
wasPacked = true
358+
332359
if selected != nil {
333360
selected.ReleaseTXPacket(item)
334361
}
@@ -394,20 +421,31 @@ dispatchLoop:
394421
pkt := finalPacket
395422
pkt.conn = conn
396423
pkt.payload = dnsPacket
424+
pkt.sequenceNum = item.SequenceNum
425+
426+
for {
427+
select {
428+
case c.txChannel <- pkt:
429+
goto enqueued
430+
case <-ctx.Done():
431+
if !wasPacked && selected != nil {
432+
selected.ReleaseTXPacket(item)
433+
}
434+
return
435+
case <-c.txSpaceSignal:
436+
case <-idleTimer.C:
437+
}
397438

398-
select {
399-
case c.txChannel <- pkt:
400-
// if !isLogged && pkt.packetType != Enums.PACKET_PING {
401-
// packedSummary := ""
402-
// if opts.PacketType == Enums.PACKET_PACKED_CONTROL_BLOCKS {
403-
// packedSummary = " | " + VpnProto.DescribePackedControlBlocks(opts.Payload, 4)
404-
// }
405-
// c.logOutboundPacket(opts.PacketType, opts.SessionID, len(opts.Payload), opts.StreamID, opts.SequenceNum, opts.FragmentID, opts.TotalFragments, packedSummary)
406-
// }
407-
// isLogged = true
408-
default:
409-
c.log.Warnf("TX channel filled before enqueue completed | Packet: %s | Stream: %d", Enums.PacketTypeName(finalPacket.packetType), selectedStreamID)
439+
if !idleTimer.Stop() {
440+
select {
441+
case <-idleTimer.C:
442+
default:
443+
}
444+
}
445+
idleTimer.Reset(idlePoll)
410446
}
447+
448+
enqueued:
411449
}
412450

413451
if !wasPacked && selected != nil {

0 commit comments

Comments
 (0)