From dcc038b26bdd95587788bdb3eff32d4d44145f2b Mon Sep 17 00:00:00 2001 From: hai pham Date: Tue, 16 Jun 2026 10:57:54 +0700 Subject: [PATCH 1/2] fix: ws concurrent write --- pkg/flashblock/block_listener.go | 16 ++++++++++++++-- pkg/flashblock/blox_route_client.go | 21 ++++++++++++++++++--- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/pkg/flashblock/block_listener.go b/pkg/flashblock/block_listener.go index 328cdbb..2065e92 100644 --- a/pkg/flashblock/block_listener.go +++ b/pkg/flashblock/block_listener.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "time" "github.com/gorilla/websocket" @@ -98,17 +99,28 @@ func (c *NodeClient) connectAndListen(ctx context.Context, resetRetryDelay func( c.conn = conn defer conn.Close() + // Gorilla websocket allows only one concurrent writer. Serialize all writes + // (outbound pings, pong responses) through this mutex. + var writeMu sync.Mutex + writeMessage := func(messageType int, data []byte) error { + writeMu.Lock() + defer writeMu.Unlock() + return conn.WriteMessage(messageType, data) + } + c.l.Info("Connected to flashblock stream, listening for events...") // Reset retry delay after successful connection resetRetryDelay() // PING PONG + // The pong handler is invoked from within conn.ReadMessage (read goroutine), + // so it must go through the same write mutex as the ping goroutine. conn.SetPingHandler(func(appData string) error { c.l.Debugw("Ping received", "data", appData) // nolint: errcheck conn.SetReadDeadline(time.Now().Add(5 * time.Second)) - return conn.WriteMessage(websocket.PongMessage, []byte{}) + return writeMessage(websocket.PongMessage, []byte{}) }) // Create a context for the ping goroutine that will be cancelled when this function exits @@ -123,7 +135,7 @@ func (c *NodeClient) connectAndListen(ctx context.Context, resetRetryDelay func( case <-pingCtx.Done(): return case <-pingTicker.C: - if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + if err := writeMessage(websocket.PingMessage, []byte{}); err != nil { c.l.Errorw("Ping error", "error", err) _ = conn.Close() return diff --git a/pkg/flashblock/blox_route_client.go b/pkg/flashblock/blox_route_client.go index f9bda59..b955f5d 100644 --- a/pkg/flashblock/blox_route_client.go +++ b/pkg/flashblock/blox_route_client.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strconv" + "sync" "syscall" "time" @@ -169,7 +170,19 @@ func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func(), s c.conn = conn defer conn.Close() - if err := conn.WriteJSON(subscribeMsg); err != nil { + // Gorilla websocket allows only one concurrent writer. Serialize all writes + // (subscribe, outbound pings, pong responses) through this mutex. + var writeMu sync.Mutex + writeMessage := func(messageType int, data []byte) error { + writeMu.Lock() + defer writeMu.Unlock() + return conn.WriteMessage(messageType, data) + } + + writeMu.Lock() + err = conn.WriteJSON(subscribeMsg) + writeMu.Unlock() + if err != nil { return fmt.Errorf("failed to send subscription message: %w", err) } @@ -179,9 +192,11 @@ func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func(), s resetRetryDelay() // PING PONG + // The pong handler is invoked from within conn.ReadMessage (read goroutine), + // so it must go through the same write mutex as the ping goroutine. conn.SetPingHandler(func(appData string) error { c.l.Debugw("Ping received", "data", appData) - return conn.WriteMessage(websocket.PongMessage, []byte{}) + return writeMessage(websocket.PongMessage, []byte{}) }) // Create a context for the ping goroutine that will be cancelled when this function exits @@ -196,7 +211,7 @@ func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func(), s case <-pingCtx.Done(): return case <-pingTicker.C: - if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + if err := writeMessage(websocket.PingMessage, []byte{}); err != nil { c.l.Errorw("Ping error", "error", err) _ = conn.Close() return From fa6fb96e3c08bcd7cd361bff64dc9876618ef5ed Mon Sep 17 00:00:00 2001 From: hai pham Date: Tue, 16 Jun 2026 11:03:12 +0700 Subject: [PATCH 2/2] chore: lint --- pkg/flashblock/block_listener.go | 2 +- pkg/flashblock/blox_route_client.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/flashblock/block_listener.go b/pkg/flashblock/block_listener.go index 2065e92..6e5e71f 100644 --- a/pkg/flashblock/block_listener.go +++ b/pkg/flashblock/block_listener.go @@ -83,7 +83,7 @@ func (c *NodeClient) ListenFlashBlocks(ctx context.Context) error { } } -// nolint:cyclop +// nolint:cyclop,funlen func (c *NodeClient) connectAndListen(ctx context.Context, resetRetryDelay func()) error { c.l.Infow("Connecting to flashblock stream", "url", c.config.WebSocketURL) diff --git a/pkg/flashblock/blox_route_client.go b/pkg/flashblock/blox_route_client.go index b955f5d..f94d959 100644 --- a/pkg/flashblock/blox_route_client.go +++ b/pkg/flashblock/blox_route_client.go @@ -155,7 +155,7 @@ func (c *Client) Listen(ctx context.Context, subscribeMsg interface{}, messageHa } // connectAndListen establishes connection and listens for bloxroute -// nolint: lll +// nolint: lll,funlen func (c *Client) connectAndListen(ctx context.Context, resetRetryDelay func(), subscribeMsg interface{}, messageHandler messageHandler) error { c.l.Infow("Connecting to flashblock stream", "url", c.config.WebSocketURL)