Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions pkg/flashblock/block_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (c *NodeClient) ListenFlashBlocks(ctx context.Context) error {
}
}

// nolint:cyclop
// nolint:cyclop,funlen
func (c *NodeClient) connectAndListen(ctx context.Context, resetRetryDelay func()) error {
c.l.Infow("Connecting to flashblock stream", "url", c.config.WebSocketURL)

Expand All @@ -98,17 +99,28 @@ func (c *NodeClient) connectAndListen(ctx context.Context, resetRetryDelay func(
c.conn = conn
defer conn.Close()

// Gorilla websocket allows only one concurrent writer. Serialize all writes
// (outbound pings, pong responses) through this mutex.
var writeMu sync.Mutex
writeMessage := func(messageType int, data []byte) error {
writeMu.Lock()
defer writeMu.Unlock()
return conn.WriteMessage(messageType, data)
}

c.l.Info("Connected to flashblock stream, listening for events...")

// Reset retry delay after successful connection
resetRetryDelay()

// PING PONG
// The pong handler is invoked from within conn.ReadMessage (read goroutine),
// so it must go through the same write mutex as the ping goroutine.
conn.SetPingHandler(func(appData string) error {
c.l.Debugw("Ping received", "data", appData)
// nolint: errcheck
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
return conn.WriteMessage(websocket.PongMessage, []byte{})
return writeMessage(websocket.PongMessage, []byte{})
})

// Create a context for the ping goroutine that will be cancelled when this function exits
Expand All @@ -123,7 +135,7 @@ func (c *NodeClient) connectAndListen(ctx context.Context, resetRetryDelay func(
case <-pingCtx.Done():
return
case <-pingTicker.C:
if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
if err := writeMessage(websocket.PingMessage, []byte{}); err != nil {
c.l.Errorw("Ping error", "error", err)
_ = conn.Close()
return
Expand Down
23 changes: 19 additions & 4 deletions pkg/flashblock/blox_route_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -154,7 +155,7 @@ func (c *Client) Listen(ctx context.Context, subscribeMsg interface{}, messageHa
}

// connectAndListen establishes connection and listens for bloxroute
// nolint: lll
// nolint: lll,funlen
func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func(), subscribeMsg interface{}, messageHandler messageHandler) error {
c.l.Infow("Connecting to flashblock stream", "url", c.config.WebSocketURL)

Expand All @@ -169,7 +170,19 @@ func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func(), s
c.conn = conn
defer conn.Close()

if err := conn.WriteJSON(subscribeMsg); err != nil {
// Gorilla websocket allows only one concurrent writer. Serialize all writes
// (subscribe, outbound pings, pong responses) through this mutex.
var writeMu sync.Mutex
writeMessage := func(messageType int, data []byte) error {
writeMu.Lock()
defer writeMu.Unlock()
return conn.WriteMessage(messageType, data)
}

writeMu.Lock()
err = conn.WriteJSON(subscribeMsg)
writeMu.Unlock()
if err != nil {
return fmt.Errorf("failed to send subscription message: %w", err)
}

Expand All @@ -179,9 +192,11 @@ func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func(), s
resetRetryDelay()

// PING PONG
// The pong handler is invoked from within conn.ReadMessage (read goroutine),
// so it must go through the same write mutex as the ping goroutine.
conn.SetPingHandler(func(appData string) error {
c.l.Debugw("Ping received", "data", appData)
return conn.WriteMessage(websocket.PongMessage, []byte{})
return writeMessage(websocket.PongMessage, []byte{})
})

// Create a context for the ping goroutine that will be cancelled when this function exits
Expand All @@ -196,7 +211,7 @@ func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func(), s
case <-pingCtx.Done():
return
case <-pingTicker.C:
if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
if err := writeMessage(websocket.PingMessage, []byte{}); err != nil {
c.l.Errorw("Ping error", "error", err)
_ = conn.Close()
return
Expand Down
Loading