From b06173d5fb287a6087fc4485d61e842a0f001128 Mon Sep 17 00:00:00 2001 From: Souyama Date: Thu, 4 Dec 2025 18:45:59 +0530 Subject: [PATCH 01/24] feat: implement protocol recovery and connection handling in serial transporters --- asciiclient.go | 78 +++++++++++++++++++++++++++++++++++++++++------ rtuclient.go | 82 +++++++++++++++++++++++++++++++++++++++----------- serial.go | 16 ++++++++-- 3 files changed, 147 insertions(+), 29 deletions(-) diff --git a/asciiclient.go b/asciiclient.go index a310372..b05bae1 100644 --- a/asciiclient.go +++ b/asciiclient.go @@ -9,6 +9,8 @@ import ( "context" "encoding/hex" "fmt" + "io" + "syscall" "time" ) @@ -181,17 +183,76 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( mb.lastActivity = time.Now() mb.startCloseTimer() - // Send the request - mb.logf("modbus: send % x\n", aduRequest) - if _, err = mb.port.Write(aduRequest); err != nil { + connDeadline := time.Now().Add(mb.Timeout) + linkRecoveryDeadline := time.Now().Add(mb.LinkRecoveryTimeout) + + for { + // Send the request + mb.logf("modbus: send % x\n", aduRequest) + if _, err = mb.port.Write(aduRequest); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { + if time.Now().After(linkRecoveryDeadline) { + err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) + return + } + // reconnect on connection reset + mb.logf("modbus: connection reset, reconnecting") + if cerr := mb.close(); cerr != nil { + mb.logf("modbus: error closing connection: %v", cerr) + return + } + if cerr := mb.connect(ctx); cerr != nil { + mb.logf("modbus: error reconnecting: %v", cerr) + return + } + // retry the communication + continue + } + + return + } + // Get the response + aduResponse, err = readAscii(mb.port, connDeadline) + mb.logf("modbus: recv % x\n", aduResponse) + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { + if time.Now().After(linkRecoveryDeadline) { + err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) + return + } + // reconnect on connection reset + mb.logf("modbus: connection reset, reconnecting") + if cerr := mb.close(); cerr != nil { + mb.logf("modbus: error closing connection: %v", cerr) + return + } + if cerr := mb.connect(ctx); cerr != nil { + mb.logf("modbus: error reconnecting: %v", cerr) + return + } + // retry the communication + continue + } + // Unknown error + mb.logf("modbus: read error: %v", err) + return + } + return } - // Get the response +} + +func readAscii(r io.Reader, deadline time.Time) ([]byte, error) { var n, length int var data [asciiMaxSize]byte + var err error + for { - if n, err = mb.port.Read(data[length:]); err != nil { - return + if time.Now().After(deadline) { + return nil, context.DeadlineExceeded + } + if n, err = r.Read(data[length:]); err != nil { + return nil, err } length += n if length >= asciiMaxSize || n == 0 { @@ -204,9 +265,8 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( } } } - aduResponse = data[:length] - mb.logf("modbus: recv % x\n", aduResponse) - return + + return data[:length], nil } // writeHex encodes byte to string in hexadecimal, e.g. 0xA5 => "A5" diff --git a/rtuclient.go b/rtuclient.go index e1bf12d..abb1907 100644 --- a/rtuclient.go +++ b/rtuclient.go @@ -7,9 +7,9 @@ package modbus import ( "context" "encoding/binary" - "errors" "fmt" "io" + "syscall" "time" ) @@ -168,7 +168,7 @@ func readIncrementally(slaveID, functionCode byte, r io.Reader, deadline time.Ti for { if time.Now().After(deadline) { // Possible that serialport may spew data - return nil, errors.New("failed to read from serial port within deadline") + return nil, context.DeadlineExceeded } if _, err := io.ReadAtLeast(r, buf, 1); err != nil { @@ -264,24 +264,72 @@ func (mb *rtuSerialTransporter) Send(ctx context.Context, aduRequest []byte) (ad mb.lastActivity = time.Now() mb.startCloseTimer() - // Send the request - mb.logf("modbus: send % x\n", aduRequest) - if _, err = mb.port.Write(aduRequest); err != nil { + connDeadline := time.Now().Add(mb.Timeout) + linkRecoveryDeadline := time.Now().Add(mb.LinkRecoveryTimeout) + + for { + // Send the request + mb.logf("modbus: send % x\n", aduRequest) + if _, err = mb.port.Write(aduRequest); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { + if time.Now().After(linkRecoveryDeadline) { + err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) + return + } + // reconnect on connection reset + mb.logf("modbus: connection reset, reconnecting") + if cerr := mb.close(); cerr != nil { + mb.logf("modbus: error closing connection: %v", cerr) + return + } + if cerr := mb.connect(ctx); cerr != nil { + mb.logf("modbus: error reconnecting: %v", cerr) + return + } + // retry the communication + continue + } + + return + } + // function := aduRequest[1] + // functionFail := aduRequest[1] & 0x80 + bytesToRead := calculateResponseLength(aduRequest) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(mb.calculateDelay(len(aduRequest) + bytesToRead)): + } + + aduResponse, err = readIncrementally(aduRequest[0], aduRequest[1], mb.port, connDeadline) + mb.logf("modbus: recv % x\n", aduResponse[:]) + + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { + if time.Now().After(linkRecoveryDeadline) { + err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) + return + } + // reconnect on connection reset + mb.logf("modbus: connection reset, reconnecting") + if cerr := mb.close(); cerr != nil { + mb.logf("modbus: error closing connection: %v", cerr) + return + } + if cerr := mb.connect(ctx); cerr != nil { + mb.logf("modbus: error reconnecting: %v", cerr) + return + } + // retry the communication + continue + } + // Unknown error + mb.logf("modbus: read error: %v", err) + return + } return } - // function := aduRequest[1] - // functionFail := aduRequest[1] & 0x80 - bytesToRead := calculateResponseLength(aduRequest) - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-time.After(mb.calculateDelay(len(aduRequest) + bytesToRead)): - } - data, err := readIncrementally(aduRequest[0], aduRequest[1], mb.port, time.Now().Add(mb.Config.Timeout)) - mb.logf("modbus: recv % x\n", data[:]) - aduResponse = data - return } // calculateDelay roughly calculates time needed for the next frame. diff --git a/serial.go b/serial.go index 5ad9692..98f4f8d 100644 --- a/serial.go +++ b/serial.go @@ -25,8 +25,13 @@ type serialPort struct { // Serial port configuration. serial.Config - Logger Logger + Logger Logger + // IdleTimeout is the duration to close the connection when no activity. IdleTimeout time.Duration + // Silent period after successful connection + ConnectDelay time.Duration + // Recovery timeout if the connection is lost + LinkRecoveryTimeout time.Duration mu sync.Mutex // port is platform-dependent data structure for serial port. @@ -52,9 +57,14 @@ func (mb *serialPort) connect(ctx context.Context) error { if mb.port == nil { port, err := serial.Open(&mb.Config) if err != nil { - return fmt.Errorf("could not open %s: %w", mb.Config.Address, err) + return fmt.Errorf("could not open %s: %w", mb.Address, err) } mb.port = port + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(mb.ConnectDelay): //silent period + } } return nil } @@ -103,6 +113,6 @@ func (mb *serialPort) closeIdle() { if idle := time.Since(mb.lastActivity); idle >= mb.IdleTimeout { mb.logf("modbus: closing connection due to idle timeout: %v", idle) - mb.close() + _ = mb.close() } } From 81d1a8b867d70c4c68c1c32b82d916d305aada06 Mon Sep 17 00:00:00 2001 From: Souyama Date: Thu, 4 Dec 2025 19:17:21 +0530 Subject: [PATCH 02/24] Lint issue fix --- asciiclient.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asciiclient.go b/asciiclient.go index b05bae1..bc78051 100644 --- a/asciiclient.go +++ b/asciiclient.go @@ -212,7 +212,7 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( return } // Get the response - aduResponse, err = readAscii(mb.port, connDeadline) + aduResponse, err = readASCII(mb.port, connDeadline) mb.logf("modbus: recv % x\n", aduResponse) if err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { @@ -242,7 +242,7 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( } } -func readAscii(r io.Reader, deadline time.Time) ([]byte, error) { +func readASCII(r io.Reader, deadline time.Time) ([]byte, error) { var n, length int var data [asciiMaxSize]byte var err error From 8fb2118fcc3b1f94298d87b2f47161757d5d9cae Mon Sep 17 00:00:00 2001 From: Souyama Date: Sun, 7 Dec 2025 10:59:38 +0530 Subject: [PATCH 03/24] feat: add PTY tests for RTUSerialTransporter functionality --- rtu_transport_test.go | 117 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 rtu_transport_test.go diff --git a/rtu_transport_test.go b/rtu_transport_test.go new file mode 100644 index 0000000..acb1778 --- /dev/null +++ b/rtu_transport_test.go @@ -0,0 +1,117 @@ +//go:build darwin || linux || freebsd || openbsd || netbsd +// +build darwin linux freebsd openbsd netbsd + +// Copyright 2014 Quoc-Viet Nguyen. All rights reserved. +// This software may be modified and distributed under the terms +// of the BSD license. See the LICENSE file for details. + +package modbus + +import ( + "bytes" + "context" + "fmt" + "os" + "syscall" + "testing" + "time" + "unsafe" +) + +// openPTY opens a PTY pair and returns the master file and the slave path. +func openPTY() (master *os.File, slavePath string, err error) { + master, err = os.OpenFile("/dev/ptmx", os.O_RDWR, 0) + if err != nil { + return nil, "", err + } + + // unlockpt + var unlock int32 + // TIOCSPTLCK + _, _, errno := syscall.Syscall(syscall.SYS_IOCTL, master.Fd(), syscall.TIOCSPTLCK, uintptr(unsafe.Pointer(&unlock))) + if errno != 0 { + master.Close() + return nil, "", errno + } + + // ptsname + var ptyno int32 + // TIOCGPTN + _, _, errno = syscall.Syscall(syscall.SYS_IOCTL, master.Fd(), syscall.TIOCGPTN, uintptr(unsafe.Pointer(&ptyno))) + if errno != 0 { + master.Close() + return nil, "", errno + } + + slavePath = fmt.Sprintf("/dev/pts/%d", ptyno) + return master, slavePath, nil +} + +func TestRTUSerialTransporter_Send_PTY(t *testing.T) { + master, slavePath, err := openPTY() + if err != nil { + t.Skipf("Skipping PTY test: %v", err) + } + defer master.Close() + + // Request: 01 03 00 00 00 01 84 0A (Read Holding Registers) + // Response: 01 03 02 00 00 B8 44 + req := []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A} + resp := []byte{0x01, 0x03, 0x02, 0x00, 0x00, 0xB8, 0x44} + + transporter := &rtuSerialTransporter{} + transporter.Address = slavePath + transporter.BaudRate = 19200 + transporter.Timeout = 1 * time.Second + + // Start a goroutine to read request and write response to master + go func() { + buf := make([]byte, 1024) + n, err := master.Read(buf) + if err != nil { + return + } + if !bytes.Equal(buf[:n], req) { + // t.Errorf would be racy here, just log or ignore + return + } + // Write response + _, err = master.Write(resp) + if err != nil { + t.Errorf("Failed to write response: %v", err) + } + }() + + ctx := context.Background() + aduResponse, err := transporter.Send(ctx, req) + if err != nil { + t.Fatalf("Send failed: %v", err) + } + + if !bytes.Equal(aduResponse, resp) { + t.Errorf("Expected response %x, got %x", resp, aduResponse) + } +} + +func TestRTUSerialTransporter_Timeout_PTY(t *testing.T) { + master, slavePath, err := openPTY() + if err != nil { + t.Skipf("Skipping PTY test: %v", err) + } + defer master.Close() + + req := []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A} + + transporter := &rtuSerialTransporter{} + transporter.Address = slavePath + transporter.BaudRate = 19200 + transporter.Timeout = 100 * time.Millisecond + + // Don't write anything to master + + ctx := context.Background() + _, err = transporter.Send(ctx, req) + if err == nil { + t.Fatal("Expected timeout error, got nil") + } +} From ea9f4665a0b01338d4ad10b35b230c6c1f4ed00e Mon Sep 17 00:00:00 2001 From: Souyama Date: Sun, 7 Dec 2025 11:05:33 +0530 Subject: [PATCH 04/24] feat: add PTY tests for ASCIISerialTransporter functionality --- ascii_transport_test.go | 89 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 ascii_transport_test.go diff --git a/ascii_transport_test.go b/ascii_transport_test.go new file mode 100644 index 0000000..b07ccde --- /dev/null +++ b/ascii_transport_test.go @@ -0,0 +1,89 @@ +//go:build darwin || linux || freebsd || openbsd || netbsd +// +build darwin linux freebsd openbsd netbsd + +// Copyright 2014 Quoc-Viet Nguyen. All rights reserved. +// This software may be modified and distributed under the terms +// of the BSD license. See the LICENSE file for details. + +package modbus + +import ( + "bytes" + "context" + "testing" + "time" +) + +func TestASCIISerialTransporter_Send_PTY(t *testing.T) { + master, slavePath, err := openPTY() + if err != nil { + t.Skipf("Skipping PTY test: %v", err) + } + defer master.Close() + + // Request: 01 03 00 00 00 01 (Read Holding Registers) + // ASCII: :010300000001FB\r\n + reqASCII := []byte(":010300000001FB\r\n") + + // Response: 01 03 02 00 00 + // ASCII: :0103020000FA\r\n + respASCII := []byte(":0103020000FA\r\n") + + transporter := &asciiSerialTransporter{} + transporter.Address = slavePath + transporter.BaudRate = 19200 + transporter.Timeout = 1 * time.Second + transporter.IdleTimeout = serialIdleTimeout + + // Start a goroutine to read request and write response to master + go func() { + buf := make([]byte, 1024) + n, err := master.Read(buf) + if err != nil { + return + } + if !bytes.Equal(buf[:n], reqASCII) { + // t.Errorf would be racy here, just log or ignore + return + } + // Write response + _, err = master.Write(respASCII) + if err != nil { + t.Errorf("Failed to write response: %v", err) + } + }() + + ctx := context.Background() + aduResponse, err := transporter.Send(ctx, reqASCII) + if err != nil { + t.Fatalf("Send failed: %v", err) + } + + if !bytes.Equal(aduResponse, respASCII) { + t.Errorf("Expected response %s, got %s", respASCII, aduResponse) + } +} + +func TestASCIISerialTransporter_Timeout_PTY(t *testing.T) { + master, slavePath, err := openPTY() + if err != nil { + t.Skipf("Skipping PTY test: %v", err) + } + defer master.Close() + + reqASCII := []byte(":010300000001FB\r\n") + + transporter := &asciiSerialTransporter{} + transporter.Address = slavePath + transporter.BaudRate = 19200 + transporter.Timeout = 100 * time.Millisecond + transporter.IdleTimeout = serialIdleTimeout + + // Don't write anything to master + + ctx := context.Background() + _, err = transporter.Send(ctx, reqASCII) + if err == nil { + t.Fatal("Expected timeout error, got nil") + } +} From 673dfe2ff8599df0eae3887529b02665e276c16b Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 27 Mar 2026 17:45:14 +0530 Subject: [PATCH 05/24] consistent ascii and rtu serial recovery process with tcp --- asciiclient.go | 4 ++-- rtuclient.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/asciiclient.go b/asciiclient.go index bc78051..8e9062c 100644 --- a/asciiclient.go +++ b/asciiclient.go @@ -191,7 +191,7 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( mb.logf("modbus: send % x\n", aduRequest) if _, err = mb.port.Write(aduRequest); err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { - if time.Now().After(linkRecoveryDeadline) { + if mb.LinkRecoveryTimeout == 0 || time.Until(linkRecoveryDeadline) < 0 { err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) return } @@ -216,7 +216,7 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( mb.logf("modbus: recv % x\n", aduResponse) if err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { - if time.Now().After(linkRecoveryDeadline) { + if mb.LinkRecoveryTimeout == 0 || time.Until(linkRecoveryDeadline) < 0 { err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) return } diff --git a/rtuclient.go b/rtuclient.go index abb1907..116c5f6 100644 --- a/rtuclient.go +++ b/rtuclient.go @@ -272,7 +272,7 @@ func (mb *rtuSerialTransporter) Send(ctx context.Context, aduRequest []byte) (ad mb.logf("modbus: send % x\n", aduRequest) if _, err = mb.port.Write(aduRequest); err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { - if time.Now().After(linkRecoveryDeadline) { + if mb.LinkRecoveryTimeout == 0 || time.Until(linkRecoveryDeadline) < 0 { err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) return } @@ -306,7 +306,7 @@ func (mb *rtuSerialTransporter) Send(ctx context.Context, aduRequest []byte) (ad if err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { - if time.Now().After(linkRecoveryDeadline) { + if mb.LinkRecoveryTimeout == 0 || time.Until(linkRecoveryDeadline) < 0 { err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) return } From 21047b52a73315f12a59eef104d06f6f573fc26f Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 27 Mar 2026 17:56:39 +0530 Subject: [PATCH 06/24] refactor: recovery helper for connections --- asciiclient.go | 33 ++++----------------------------- rtuclient.go | 33 ++++----------------------------- serial.go | 24 ++++++++++++++++++++++++ tcpclient.go | 9 +++++++-- 4 files changed, 39 insertions(+), 60 deletions(-) diff --git a/asciiclient.go b/asciiclient.go index 8e9062c..e95b5e3 100644 --- a/asciiclient.go +++ b/asciiclient.go @@ -10,7 +10,6 @@ import ( "encoding/hex" "fmt" "io" - "syscall" "time" ) @@ -190,22 +189,10 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( // Send the request mb.logf("modbus: send % x\n", aduRequest) if _, err = mb.port.Write(aduRequest); err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { - if mb.LinkRecoveryTimeout == 0 || time.Until(linkRecoveryDeadline) < 0 { - err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) + if mb.shouldRecover(err) { + if err = mb.reconnect(ctx, err, linkRecoveryDeadline); err != nil { return } - // reconnect on connection reset - mb.logf("modbus: connection reset, reconnecting") - if cerr := mb.close(); cerr != nil { - mb.logf("modbus: error closing connection: %v", cerr) - return - } - if cerr := mb.connect(ctx); cerr != nil { - mb.logf("modbus: error reconnecting: %v", cerr) - return - } - // retry the communication continue } @@ -215,22 +202,10 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( aduResponse, err = readASCII(mb.port, connDeadline) mb.logf("modbus: recv % x\n", aduResponse) if err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { - if mb.LinkRecoveryTimeout == 0 || time.Until(linkRecoveryDeadline) < 0 { - err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) - return - } - // reconnect on connection reset - mb.logf("modbus: connection reset, reconnecting") - if cerr := mb.close(); cerr != nil { - mb.logf("modbus: error closing connection: %v", cerr) - return - } - if cerr := mb.connect(ctx); cerr != nil { - mb.logf("modbus: error reconnecting: %v", cerr) + if mb.shouldRecover(err) { + if err = mb.reconnect(ctx, err, linkRecoveryDeadline); err != nil { return } - // retry the communication continue } // Unknown error diff --git a/rtuclient.go b/rtuclient.go index 116c5f6..08fee19 100644 --- a/rtuclient.go +++ b/rtuclient.go @@ -9,7 +9,6 @@ import ( "encoding/binary" "fmt" "io" - "syscall" "time" ) @@ -271,22 +270,10 @@ func (mb *rtuSerialTransporter) Send(ctx context.Context, aduRequest []byte) (ad // Send the request mb.logf("modbus: send % x\n", aduRequest) if _, err = mb.port.Write(aduRequest); err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { - if mb.LinkRecoveryTimeout == 0 || time.Until(linkRecoveryDeadline) < 0 { - err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) + if mb.shouldRecover(err) { + if err = mb.reconnect(ctx, err, linkRecoveryDeadline); err != nil { return } - // reconnect on connection reset - mb.logf("modbus: connection reset, reconnecting") - if cerr := mb.close(); cerr != nil { - mb.logf("modbus: error closing connection: %v", cerr) - return - } - if cerr := mb.connect(ctx); cerr != nil { - mb.logf("modbus: error reconnecting: %v", cerr) - return - } - // retry the communication continue } @@ -305,22 +292,10 @@ func (mb *rtuSerialTransporter) Send(ctx context.Context, aduRequest []byte) (ad mb.logf("modbus: recv % x\n", aduResponse[:]) if err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { - if mb.LinkRecoveryTimeout == 0 || time.Until(linkRecoveryDeadline) < 0 { - err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) - return - } - // reconnect on connection reset - mb.logf("modbus: connection reset, reconnecting") - if cerr := mb.close(); cerr != nil { - mb.logf("modbus: error closing connection: %v", cerr) - return - } - if cerr := mb.connect(ctx); cerr != nil { - mb.logf("modbus: error reconnecting: %v", cerr) + if mb.shouldRecover(err) { + if err = mb.reconnect(ctx, err, linkRecoveryDeadline); err != nil { return } - // retry the communication continue } // Unknown error diff --git a/serial.go b/serial.go index 98f4f8d..ff49267 100644 --- a/serial.go +++ b/serial.go @@ -6,9 +6,11 @@ package modbus import ( "context" + "errors" "fmt" "io" "sync" + "syscall" "time" "github.com/grid-x/serial" @@ -91,6 +93,28 @@ func (mb *serialPort) logf(format string, v ...interface{}) { } } +func (mb *serialPort) shouldRecover(err error) bool { + return errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.ECONNRESET) +} + +func (mb *serialPort) reconnect(ctx context.Context, err error, linkRecoveryDeadline time.Time) error { + if mb.LinkRecoveryTimeout == 0 || time.Until(linkRecoveryDeadline) < 0 { + return fmt.Errorf("modbus: link recovery timeout reached: %w", err) + } + + mb.logf("modbus: connection reset, reconnecting") + if cerr := mb.close(); cerr != nil { + mb.logf("modbus: error closing connection: %v", cerr) + return cerr + } + if cerr := mb.connect(ctx); cerr != nil { + mb.logf("modbus: error reconnecting: %v", cerr) + return cerr + } + + return nil +} + func (mb *serialPort) startCloseTimer() { if mb.IdleTimeout <= 0 { return diff --git a/tcpclient.go b/tcpclient.go index d6fcbe0..0ff0c15 100644 --- a/tcpclient.go +++ b/tcpclient.go @@ -302,6 +302,11 @@ func (mb *tcpTransporter) Send(ctx context.Context, aduRequest []byte) (aduRespo } } +func (mb *tcpTransporter) shouldRecover(err error) bool { + return errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.ECONNRESET) +} + + func (mb *tcpTransporter) readResponse(aduRequest []byte, data []byte, recoveryDeadline time.Time, protocolDeadline time.Time) (aduResponse []byte, res readResult, err error) { // res is readResultDone by default, which either means we succeeded or err contains the fatal error for { @@ -310,7 +315,7 @@ func (mb *tcpTransporter) readResponse(aduRequest []byte, data []byte, recoveryD if mb.LinkRecoveryTimeout == 0 || time.Until(recoveryDeadline) < 0 { return } - if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { + if mb.shouldRecover(err) { mb.logf("modbus: connection closed by remote side: %v", err) res = readResultCloseRetry } @@ -322,7 +327,7 @@ func (mb *tcpTransporter) readResponse(aduRequest []byte, data []byte, recoveryD if mb.LinkRecoveryTimeout == 0 || time.Until(recoveryDeadline) < 0 { return } - if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { + if mb.shouldRecover(err) { mb.logf("modbus: connection closed by remote side: %v", err) res = readResultCloseRetry return From 19c22602229a2dcaa177e141abb0270416d1ceac Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 27 Mar 2026 18:04:25 +0530 Subject: [PATCH 07/24] rtuclient: aduResponse nil guard --- rtuclient.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rtuclient.go b/rtuclient.go index 08fee19..825b170 100644 --- a/rtuclient.go +++ b/rtuclient.go @@ -289,7 +289,9 @@ func (mb *rtuSerialTransporter) Send(ctx context.Context, aduRequest []byte) (ad } aduResponse, err = readIncrementally(aduRequest[0], aduRequest[1], mb.port, connDeadline) - mb.logf("modbus: recv % x\n", aduResponse[:]) + if aduResponse != nil { + mb.logf("modbus: recv % x\n", aduResponse[:]) + } if err != nil { if mb.shouldRecover(err) { From 38973394cecf8a33c756c0cfa643421bfcd2cad4 Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 27 Mar 2026 18:15:35 +0530 Subject: [PATCH 08/24] expanded rtu test cases for non-happy paths --- rtu_transport_test.go | 188 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 188 insertions(+) diff --git a/rtu_transport_test.go b/rtu_transport_test.go index acb1778..b681e94 100644 --- a/rtu_transport_test.go +++ b/rtu_transport_test.go @@ -10,12 +10,20 @@ package modbus import ( "bytes" "context" + "errors" "fmt" + "io" + "log" "os" + "path/filepath" + "strings" + "sync" "syscall" "testing" "time" "unsafe" + + serialpkg "github.com/grid-x/serial" ) // openPTY opens a PTY pair and returns the master file and the slave path. @@ -47,6 +55,47 @@ func openPTY() (master *os.File, slavePath string, err error) { return master, slavePath, nil } +type scriptedPort struct { + mu sync.Mutex + readData []byte + readErr error + writeErr error + written bytes.Buffer + closed bool +} + +func (p *scriptedPort) Read(b []byte) (int, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if len(p.readData) > 0 { + b[0] = p.readData[0] + p.readData = p.readData[1:] + return 1, nil + } + if p.readErr != nil { + return 0, p.readErr + } + return 0, io.EOF +} + +func (p *scriptedPort) Write(b []byte) (int, error) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.writeErr != nil { + return 0, p.writeErr + } + return p.written.Write(b) +} + +func (p *scriptedPort) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + p.closed = true + return nil +} + func TestRTUSerialTransporter_Send_PTY(t *testing.T) { master, slavePath, err := openPTY() if err != nil { @@ -115,3 +164,142 @@ func TestRTUSerialTransporter_Timeout_PTY(t *testing.T) { t.Fatal("Expected timeout error, got nil") } } + +func TestRTUSerialTransporter_ReconnectOnMidCommunicationEOF_PTY(t *testing.T) { + master1, slavePath1, err := openPTY() + if err != nil { + t.Skipf("Skipping PTY test: %v", err) + } + defer master1.Close() + + // Request: 01 03 00 00 00 01 84 0A (Read Holding Registers) + // Response: 01 03 02 00 00 B8 44 + req := []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A} + resp := []byte{0x01, 0x03, 0x02, 0x00, 0x00, 0xB8, 0x44} + partialResp := resp[:len(resp)-1] + var logs bytes.Buffer + + transporter := &rtuSerialTransporter{} + transporter.Address = slavePath1 + transporter.BaudRate = 19200 + transporter.Timeout = 200 * time.Millisecond + transporter.IdleTimeout = serialIdleTimeout + transporter.LinkRecoveryTimeout = 200 * time.Millisecond + transporter.Logger = log.New(&logs, "", 0) + + serverDone := make(chan error, 1) + + go func() { + buf := make([]byte, 1024) + n, err := master1.Read(buf) + if err != nil { + serverDone <- fmt.Errorf("failed to read initial request: %w", err) + return + } + if !bytes.Equal(buf[:n], req) { + serverDone <- fmt.Errorf("unexpected initial request: got %x want %x", buf[:n], req) + return + } + if _, err := master1.Write(partialResp); err != nil { + serverDone <- fmt.Errorf("failed to write partial response: %w", err) + return + } + if err := master1.Close(); err != nil { + serverDone <- fmt.Errorf("failed to close initial PTY master: %w", err) + return + } + serverDone <- nil + }() + + ctx := context.Background() + _, err = transporter.Send(ctx, req) + if err == nil { + t.Fatal("expected Send to fail after reconnect attempt, got nil") + } + + if !strings.Contains(logs.String(), "connection reset, reconnecting") { + t.Fatalf("expected reconnect log entry, got %q", logs.String()) + } + + select { + case err := <-serverDone: + if err != nil { + t.Fatal(err) + } + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for PTY server") + } + + if !strings.Contains(err.Error(), "could not open") && !strings.Contains(err.Error(), "link recovery timeout reached") { + t.Fatalf("expected reconnect-related error, got %v", err) + } +} + +func TestRTUSerialTransporter_PartialResponseThenTimeout(t *testing.T) { + req := []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A} + resp := []byte{0x01, 0x03, 0x02, 0x00, 0x00, 0xB8, 0x44} + + port := &scriptedPort{ + readData: resp[:len(resp)-2], + readErr: serialpkg.ErrTimeout, + } + + transporter := &rtuSerialTransporter{} + transporter.port = port + transporter.BaudRate = 19200 + transporter.Timeout = 100 * time.Millisecond + + _, err := transporter.Send(context.Background(), req) + if !errors.Is(err, serialpkg.ErrTimeout) { + t.Fatalf("expected timeout after partial response, got %v", err) + } + if got := port.written.Bytes(); !bytes.Equal(got, req) { + t.Fatalf("expected request %x, got %x", req, got) + } +} + +func TestRTUSerialTransporter_ReconnectBudgetExhaustedOnReadEOF(t *testing.T) { + req := []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A} + port := &scriptedPort{readErr: io.EOF} + + transporter := &rtuSerialTransporter{} + transporter.Address = filepath.Join(t.TempDir(), "missing-serial") + transporter.port = port + transporter.BaudRate = 19200 + transporter.Timeout = 100 * time.Millisecond + transporter.LinkRecoveryTimeout = 0 + + _, err := transporter.Send(context.Background(), req) + if err == nil { + t.Fatal("expected link recovery timeout error, got nil") + } + if !strings.Contains(err.Error(), "link recovery timeout reached") || !errors.Is(err, io.EOF) { + t.Fatalf("expected link recovery timeout wrapping EOF, got %v", err) + } + if got := port.written.Bytes(); !bytes.Equal(got, req) { + t.Fatalf("expected request %x, got %x", req, got) + } +} + +func TestRTUSerialTransporter_ReconnectOnWriteEOF(t *testing.T) { + req := []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A} + port := &scriptedPort{writeErr: io.EOF} + + transporter := &rtuSerialTransporter{} + transporter.Address = filepath.Join(t.TempDir(), "missing-serial") + transporter.port = port + transporter.BaudRate = 19200 + transporter.Timeout = 100 * time.Millisecond + transporter.LinkRecoveryTimeout = 100 * time.Millisecond + + _, err := transporter.Send(context.Background(), req) + if err == nil { + t.Fatal("expected reconnect error after write EOF, got nil") + } + if !strings.Contains(err.Error(), "could not open") { + t.Fatalf("expected reconnect open failure, got %v", err) + } + if !port.closed { + t.Fatal("expected reconnect to close the original port after write EOF") + } +} From fbe2cb7658b3e5ad4913647eb8e4dba759e1ae6b Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 27 Mar 2026 18:19:31 +0530 Subject: [PATCH 09/24] drop darwin support for rtu tests --- rtu_transport_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rtu_transport_test.go b/rtu_transport_test.go index b681e94..cc1a4ae 100644 --- a/rtu_transport_test.go +++ b/rtu_transport_test.go @@ -1,5 +1,5 @@ -//go:build darwin || linux || freebsd || openbsd || netbsd -// +build darwin linux freebsd openbsd netbsd +//go:build linux || freebsd || openbsd || netbsd +// +build linux freebsd openbsd netbsd // Copyright 2014 Quoc-Viet Nguyen. All rights reserved. // This software may be modified and distributed under the terms From a2f29cd3253939e2fed80cea6456cf2c90334935 Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 27 Mar 2026 18:21:52 +0530 Subject: [PATCH 10/24] rewrap deadline error for rtu --- rtuclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rtuclient.go b/rtuclient.go index 825b170..71794db 100644 --- a/rtuclient.go +++ b/rtuclient.go @@ -167,7 +167,7 @@ func readIncrementally(slaveID, functionCode byte, r io.Reader, deadline time.Ti for { if time.Now().After(deadline) { // Possible that serialport may spew data - return nil, context.DeadlineExceeded + return nil, fmt.Errorf("failed to read from serial port within deadline: %w", context.DeadlineExceeded) } if _, err := io.ReadAtLeast(r, buf, 1); err != nil { From 81ce48cf802d8c55b974ee2153217a8e2e74ebc5 Mon Sep 17 00:00:00 2001 From: Souyama Date: Wed, 8 Apr 2026 11:02:17 +0530 Subject: [PATCH 11/24] set rtu conn deadline before each read request instead of shared deadline --- asciiclient.go | 2 +- rtuclient.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/asciiclient.go b/asciiclient.go index e95b5e3..2bc6704 100644 --- a/asciiclient.go +++ b/asciiclient.go @@ -182,7 +182,6 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( mb.lastActivity = time.Now() mb.startCloseTimer() - connDeadline := time.Now().Add(mb.Timeout) linkRecoveryDeadline := time.Now().Add(mb.LinkRecoveryTimeout) for { @@ -199,6 +198,7 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( return } // Get the response + connDeadline := time.Now().Add(mb.Timeout) aduResponse, err = readASCII(mb.port, connDeadline) mb.logf("modbus: recv % x\n", aduResponse) if err != nil { diff --git a/rtuclient.go b/rtuclient.go index 71794db..cb1b2ad 100644 --- a/rtuclient.go +++ b/rtuclient.go @@ -263,7 +263,6 @@ func (mb *rtuSerialTransporter) Send(ctx context.Context, aduRequest []byte) (ad mb.lastActivity = time.Now() mb.startCloseTimer() - connDeadline := time.Now().Add(mb.Timeout) linkRecoveryDeadline := time.Now().Add(mb.LinkRecoveryTimeout) for { @@ -288,6 +287,7 @@ func (mb *rtuSerialTransporter) Send(ctx context.Context, aduRequest []byte) (ad case <-time.After(mb.calculateDelay(len(aduRequest) + bytesToRead)): } + connDeadline := time.Now().Add(mb.Timeout) aduResponse, err = readIncrementally(aduRequest[0], aduRequest[1], mb.port, connDeadline) if aduResponse != nil { mb.logf("modbus: recv % x\n", aduResponse[:]) From 15bfbc88de28072bd57f8cf758235927dacf5672 Mon Sep 17 00:00:00 2001 From: Souyama Date: Wed, 8 Apr 2026 11:03:42 +0530 Subject: [PATCH 12/24] remove ECONNRESET check from serial as it's tcp only --- serial.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/serial.go b/serial.go index ff49267..6535b84 100644 --- a/serial.go +++ b/serial.go @@ -10,7 +10,6 @@ import ( "fmt" "io" "sync" - "syscall" "time" "github.com/grid-x/serial" @@ -94,7 +93,7 @@ func (mb *serialPort) logf(format string, v ...interface{}) { } func (mb *serialPort) shouldRecover(err error) bool { - return errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.ECONNRESET) + return errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) } func (mb *serialPort) reconnect(ctx context.Context, err error, linkRecoveryDeadline time.Time) error { From c931e8d62c57364e81dc90ce4cdf2962c6016878 Mon Sep 17 00:00:00 2001 From: Souyama Date: Wed, 8 Apr 2026 11:05:16 +0530 Subject: [PATCH 13/24] aduResponse nil check for ASCII before print --- asciiclient.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/asciiclient.go b/asciiclient.go index 2bc6704..3efded3 100644 --- a/asciiclient.go +++ b/asciiclient.go @@ -200,7 +200,9 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( // Get the response connDeadline := time.Now().Add(mb.Timeout) aduResponse, err = readASCII(mb.port, connDeadline) - mb.logf("modbus: recv % x\n", aduResponse) + if aduResponse != nil { + mb.logf("modbus: recv % x\n", aduResponse[:]) + } if err != nil { if mb.shouldRecover(err) { if err = mb.reconnect(ctx, err, linkRecoveryDeadline); err != nil { From 8ec965168a705306ae56118c3ae07f2c353eb84e Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 17 Apr 2026 17:03:54 +0530 Subject: [PATCH 14/24] remove darwin support for ascii tests --- ascii_transport_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ascii_transport_test.go b/ascii_transport_test.go index b07ccde..c860f04 100644 --- a/ascii_transport_test.go +++ b/ascii_transport_test.go @@ -1,5 +1,5 @@ -//go:build darwin || linux || freebsd || openbsd || netbsd -// +build darwin linux freebsd openbsd netbsd +//go:build linux || freebsd || openbsd || netbsd +// +build linux freebsd openbsd netbsd // Copyright 2014 Quoc-Viet Nguyen. All rights reserved. // This software may be modified and distributed under the terms From 73c84c390f470835e6e252f138cac0f9c0e86264 Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 17 Apr 2026 17:08:37 +0530 Subject: [PATCH 15/24] doc: serial connection session management by the open caller --- serial.go | 1 + 1 file changed, 1 insertion(+) diff --git a/serial.go b/serial.go index 6535b84..4e381e2 100644 --- a/serial.go +++ b/serial.go @@ -49,6 +49,7 @@ func (mb *serialPort) Connect(ctx context.Context) (err error) { } // connect connects to the serial port if it is not connected. Caller must hold the mutex. +// Note: caller must handle the connection close and recovery if the connection is lost. func (mb *serialPort) connect(ctx context.Context) error { select { case <-ctx.Done(): From b66461d6c6b84fef3da457ae878d5e1ae4761d3c Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 17 Apr 2026 17:18:56 +0530 Subject: [PATCH 16/24] chore: remove commented code --- rtuclient.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/rtuclient.go b/rtuclient.go index cb1b2ad..0179ea4 100644 --- a/rtuclient.go +++ b/rtuclient.go @@ -278,8 +278,6 @@ func (mb *rtuSerialTransporter) Send(ctx context.Context, aduRequest []byte) (ad return } - // function := aduRequest[1] - // functionFail := aduRequest[1] & 0x80 bytesToRead := calculateResponseLength(aduRequest) select { case <-ctx.Done(): From 8e346d63c4bd682c0f2dc0c89348af2513bc763b Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 17 Apr 2026 17:21:08 +0530 Subject: [PATCH 17/24] wrap ascii error --- asciiclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asciiclient.go b/asciiclient.go index 3efded3..e96be95 100644 --- a/asciiclient.go +++ b/asciiclient.go @@ -226,7 +226,7 @@ func readASCII(r io.Reader, deadline time.Time) ([]byte, error) { for { if time.Now().After(deadline) { - return nil, context.DeadlineExceeded + return nil, fmt.Errorf("failed to read from serial port before deadline: %w", context.DeadlineExceeded) } if n, err = r.Read(data[length:]); err != nil { return nil, err From a8fd2098a5c5645458e5d4b03f52ed0f9fdcca8e Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 17 Apr 2026 17:46:30 +0530 Subject: [PATCH 18/24] Add retry interval for serial connections to avoid link flapping --- asciiclient.go | 1 + rtuclient.go | 1 + serial.go | 37 +++++++++++++++++++++++++++++++++---- 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/asciiclient.go b/asciiclient.go index e96be95..3aa6a0d 100644 --- a/asciiclient.go +++ b/asciiclient.go @@ -36,6 +36,7 @@ func NewASCIIClientHandler(address string) *ASCIIClientHandler { handler.Address = address handler.Timeout = serialTimeout handler.IdleTimeout = serialIdleTimeout + handler.ReconnectRetryInterval = serialReconnectRetryInterval return handler } diff --git a/rtuclient.go b/rtuclient.go index 0179ea4..9874ba4 100644 --- a/rtuclient.go +++ b/rtuclient.go @@ -55,6 +55,7 @@ func NewRTUClientHandler(address string) *RTUClientHandler { handler.Address = address handler.Timeout = serialTimeout handler.IdleTimeout = serialIdleTimeout + handler.ReconnectRetryInterval = serialReconnectRetryInterval return handler } diff --git a/serial.go b/serial.go index 4e381e2..bc6debe 100644 --- a/serial.go +++ b/serial.go @@ -19,6 +19,8 @@ const ( // Default timeout serialTimeout = 5 * time.Second serialIdleTimeout = 60 * time.Second + // Retry interval while spending the link recovery budget on reconnects. + serialReconnectRetryInterval = 10 * time.Millisecond ) // serialPort has configuration and I/O controller. @@ -33,6 +35,9 @@ type serialPort struct { ConnectDelay time.Duration // Recovery timeout if the connection is lost LinkRecoveryTimeout time.Duration + // Interval between reconnect attempts while spending the link recovery budget. + // Zero or negative values fall back to the default retry interval. + ReconnectRetryInterval time.Duration mu sync.Mutex // port is platform-dependent data structure for serial port. @@ -107,12 +112,36 @@ func (mb *serialPort) reconnect(ctx context.Context, err error, linkRecoveryDead mb.logf("modbus: error closing connection: %v", cerr) return cerr } - if cerr := mb.connect(ctx); cerr != nil { - mb.logf("modbus: error reconnecting: %v", cerr) - return cerr + + lastErr := err + deadlineTimer := time.NewTimer(time.Until(linkRecoveryDeadline)) + defer deadlineTimer.Stop() + retryTicker := time.NewTicker(mb.reconnectRetryInterval()) + defer retryTicker.Stop() + + for { + if cerr := mb.connect(ctx); cerr == nil { + return nil + } else { + lastErr = cerr + mb.logf("modbus: error reconnecting: %v", cerr) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-deadlineTimer.C: + return fmt.Errorf("modbus: link recovery timeout reached: %w", lastErr) + case <-retryTicker.C: + } } +} - return nil +func (mb *serialPort) reconnectRetryInterval() time.Duration { + if mb.ReconnectRetryInterval > 0 { + return mb.ReconnectRetryInterval + } + return serialReconnectRetryInterval } func (mb *serialPort) startCloseTimer() { From 8a387f782bc03bbad4360d0d5d3e5ccd4425b6db Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 17 Apr 2026 17:52:30 +0530 Subject: [PATCH 19/24] Add tests for serial port reconnect behavior with retry intervals --- serial_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/serial_test.go b/serial_test.go index 226ebb5..8828a07 100644 --- a/serial_test.go +++ b/serial_test.go @@ -2,7 +2,11 @@ package modbus import ( "bytes" + "context" "io" + "log" + "path/filepath" + "strings" "sync/atomic" "testing" "time" @@ -37,3 +41,51 @@ func TestSerialCloseIdle(t *testing.T) { t.Fatalf("serial port is not closed when inactivity: %+v", port) } } + +func TestSerialReconnect_UsesConfiguredRetryInterval(t *testing.T) { + var logs bytes.Buffer + port := &nopCloser{ReadWriter: &bytes.Buffer{}} + recoveryTimeout := 40 * time.Millisecond + + s := serialPort{ + Logger: log.New(&logs, "", 0), + port: port, + LinkRecoveryTimeout: recoveryTimeout, + ReconnectRetryInterval: 50 * time.Millisecond, + } + s.Address = filepath.Join(t.TempDir(), "missing-serial") + + err := s.reconnect(context.Background(), io.EOF, time.Now().Add(recoveryTimeout)) + if err == nil { + t.Fatal("expected reconnect to fail when the serial device is missing") + } + if !strings.Contains(err.Error(), "link recovery timeout reached") { + t.Fatalf("expected link recovery timeout error, got %v", err) + } + if count := strings.Count(logs.String(), "error reconnecting"); count != 1 { + t.Fatalf("expected exactly one reconnect attempt before timeout, got %d logs: %q", count, logs.String()) + } + if !port.closed.Load() || s.port != nil { + t.Fatalf("expected reconnect to close the original port: closed=%v port=%v", port.closed.Load(), s.port) + } +} + +func TestSerialReconnect_DefaultRetryIntervalRetriesMultipleTimes(t *testing.T) { + var logs bytes.Buffer + recoveryTimeout := 45 * time.Millisecond + + s := serialPort{ + Logger: log.New(&logs, "", 0), + port: &nopCloser{ReadWriter: &bytes.Buffer{}}, + LinkRecoveryTimeout: recoveryTimeout, + } + s.Address = filepath.Join(t.TempDir(), "missing-serial") + + err := s.reconnect(context.Background(), io.EOF, time.Now().Add(recoveryTimeout)) + if err == nil { + t.Fatal("expected reconnect to fail when the serial device is missing") + } + if count := strings.Count(logs.String(), "error reconnecting"); count < 2 { + t.Fatalf("expected default retry interval to attempt reconnect multiple times, got %d logs: %q", count, logs.String()) + } +} From 980c964c1b0ec8561f752d2eda1c4b4a247dd501 Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 17 Apr 2026 17:58:25 +0530 Subject: [PATCH 20/24] Refactor reconnect test cases for link recovery timeout handling --- rtu_transport_test.go | 49 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/rtu_transport_test.go b/rtu_transport_test.go index cc1a4ae..02d480b 100644 --- a/rtu_transport_test.go +++ b/rtu_transport_test.go @@ -258,7 +258,7 @@ func TestRTUSerialTransporter_PartialResponseThenTimeout(t *testing.T) { } } -func TestRTUSerialTransporter_ReconnectBudgetExhaustedOnReadEOF(t *testing.T) { +func TestRTUSerialTransporter_RecoveryDisabledOnReadEOF(t *testing.T) { req := []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A} port := &scriptedPort{readErr: io.EOF} @@ -281,23 +281,64 @@ func TestRTUSerialTransporter_ReconnectBudgetExhaustedOnReadEOF(t *testing.T) { } } +func TestRTUSerialTransporter_ReconnectBudgetExhaustedOnReadEOF(t *testing.T) { + req := []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A} + port := &scriptedPort{readErr: io.EOF} + recoveryTimeout := 80 * time.Millisecond + + transporter := &rtuSerialTransporter{} + transporter.Address = filepath.Join(t.TempDir(), "missing-serial") + transporter.port = port + transporter.BaudRate = 19200 + transporter.Timeout = 100 * time.Millisecond + transporter.LinkRecoveryTimeout = recoveryTimeout + + start := time.Now() + _, err := transporter.Send(context.Background(), req) + elapsed := time.Since(start) + if err == nil { + t.Fatal("expected link recovery timeout error, got nil") + } + if !strings.Contains(err.Error(), "link recovery timeout reached") { + t.Fatalf("expected link recovery timeout error, got %v", err) + } + if !strings.Contains(err.Error(), "could not open") { + t.Fatalf("expected reconnect open failure to be wrapped, got %v", err) + } + if elapsed < recoveryTimeout-20*time.Millisecond { + t.Fatalf("expected recovery to keep retrying for about %v, returned after %v", recoveryTimeout, elapsed) + } + if !port.closed { + t.Fatal("expected reconnect to close the original port after read EOF") + } + if got := port.written.Bytes(); !bytes.Equal(got, req) { + t.Fatalf("expected request %x, got %x", req, got) + } +} + func TestRTUSerialTransporter_ReconnectOnWriteEOF(t *testing.T) { req := []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A} port := &scriptedPort{writeErr: io.EOF} + recoveryTimeout := 80 * time.Millisecond transporter := &rtuSerialTransporter{} transporter.Address = filepath.Join(t.TempDir(), "missing-serial") transporter.port = port transporter.BaudRate = 19200 transporter.Timeout = 100 * time.Millisecond - transporter.LinkRecoveryTimeout = 100 * time.Millisecond + transporter.LinkRecoveryTimeout = recoveryTimeout + start := time.Now() _, err := transporter.Send(context.Background(), req) + elapsed := time.Since(start) if err == nil { t.Fatal("expected reconnect error after write EOF, got nil") } - if !strings.Contains(err.Error(), "could not open") { - t.Fatalf("expected reconnect open failure, got %v", err) + if !strings.Contains(err.Error(), "link recovery timeout reached") || !strings.Contains(err.Error(), "could not open") { + t.Fatalf("expected timed-out reconnect open failure, got %v", err) + } + if elapsed < recoveryTimeout-20*time.Millisecond { + t.Fatalf("expected recovery to keep retrying for about %v, returned after %v", recoveryTimeout, elapsed) } if !port.closed { t.Fatal("expected reconnect to close the original port after write EOF") From a2ab32d23746fec3687cc9b952c6f979286a186d Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 17 Apr 2026 18:03:02 +0530 Subject: [PATCH 21/24] ASCII pty driven tests --- ascii_transport_test.go | 185 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) diff --git a/ascii_transport_test.go b/ascii_transport_test.go index c860f04..19c8cea 100644 --- a/ascii_transport_test.go +++ b/ascii_transport_test.go @@ -10,8 +10,16 @@ package modbus import ( "bytes" "context" + "errors" + "fmt" + "io" + "log" + "path/filepath" + "strings" "testing" "time" + + serialpkg "github.com/grid-x/serial" ) func TestASCIISerialTransporter_Send_PTY(t *testing.T) { @@ -87,3 +95,180 @@ func TestASCIISerialTransporter_Timeout_PTY(t *testing.T) { t.Fatal("Expected timeout error, got nil") } } + +func TestASCIISerialTransporter_ReconnectOnMidCommunicationEOF_PTY(t *testing.T) { + master, slavePath, err := openPTY() + if err != nil { + t.Skipf("Skipping PTY test: %v", err) + } + defer master.Close() + + reqASCII := []byte(":010300000001FB\r\n") + respASCII := []byte(":0103020000FA\r\n") + partialResp := respASCII[:len(respASCII)-2] + var logs bytes.Buffer + + transporter := &asciiSerialTransporter{} + transporter.Address = slavePath + transporter.BaudRate = 19200 + transporter.Timeout = 200 * time.Millisecond + transporter.IdleTimeout = serialIdleTimeout + transporter.LinkRecoveryTimeout = 200 * time.Millisecond + transporter.Logger = log.New(&logs, "", 0) + + serverDone := make(chan error, 1) + + go func() { + buf := make([]byte, 1024) + n, err := master.Read(buf) + if err != nil { + serverDone <- fmt.Errorf("failed to read initial request: %w", err) + return + } + if !bytes.Equal(buf[:n], reqASCII) { + serverDone <- fmt.Errorf("unexpected initial request: got %q want %q", buf[:n], reqASCII) + return + } + if _, err := master.Write(partialResp); err != nil { + serverDone <- fmt.Errorf("failed to write partial response: %w", err) + return + } + if err := master.Close(); err != nil { + serverDone <- fmt.Errorf("failed to close initial PTY master: %w", err) + return + } + serverDone <- nil + }() + + _, err = transporter.Send(context.Background(), reqASCII) + if err == nil { + t.Fatal("expected Send to fail after reconnect attempt, got nil") + } + + if !strings.Contains(logs.String(), "connection reset, reconnecting") { + t.Fatalf("expected reconnect log entry, got %q", logs.String()) + } + + select { + case err := <-serverDone: + if err != nil { + t.Fatal(err) + } + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for PTY server") + } + + if !strings.Contains(err.Error(), "could not open") && !strings.Contains(err.Error(), "link recovery timeout reached") { + t.Fatalf("expected reconnect-related error, got %v", err) + } +} + +func TestASCIISerialTransporter_PartialResponseThenTimeout(t *testing.T) { + reqASCII := []byte(":010300000001FB\r\n") + respASCII := []byte(":0103020000FA\r\n") + + port := &scriptedPort{ + readData: respASCII[:len(respASCII)-2], + readErr: serialpkg.ErrTimeout, + } + + transporter := &asciiSerialTransporter{} + transporter.port = port + transporter.BaudRate = 19200 + transporter.Timeout = 100 * time.Millisecond + + _, err := transporter.Send(context.Background(), reqASCII) + if !errors.Is(err, serialpkg.ErrTimeout) { + t.Fatalf("expected timeout after partial response, got %v", err) + } + if got := port.written.Bytes(); !bytes.Equal(got, reqASCII) { + t.Fatalf("expected request %q, got %q", reqASCII, got) + } +} + +func TestASCIISerialTransporter_RecoveryDisabledOnReadEOF(t *testing.T) { + reqASCII := []byte(":010300000001FB\r\n") + port := &scriptedPort{readErr: io.EOF} + + transporter := &asciiSerialTransporter{} + transporter.Address = filepath.Join(t.TempDir(), "missing-serial") + transporter.port = port + transporter.BaudRate = 19200 + transporter.Timeout = 100 * time.Millisecond + transporter.LinkRecoveryTimeout = 0 + + _, err := transporter.Send(context.Background(), reqASCII) + if err == nil { + t.Fatal("expected link recovery timeout error, got nil") + } + if !strings.Contains(err.Error(), "link recovery timeout reached") || !errors.Is(err, io.EOF) { + t.Fatalf("expected link recovery timeout wrapping EOF, got %v", err) + } + if got := port.written.Bytes(); !bytes.Equal(got, reqASCII) { + t.Fatalf("expected request %q, got %q", reqASCII, got) + } +} + +func TestASCIISerialTransporter_ReconnectBudgetExhaustedOnReadEOF(t *testing.T) { + reqASCII := []byte(":010300000001FB\r\n") + port := &scriptedPort{readErr: io.EOF} + recoveryTimeout := 80 * time.Millisecond + + transporter := &asciiSerialTransporter{} + transporter.Address = filepath.Join(t.TempDir(), "missing-serial") + transporter.port = port + transporter.BaudRate = 19200 + transporter.Timeout = 100 * time.Millisecond + transporter.LinkRecoveryTimeout = recoveryTimeout + + start := time.Now() + _, err := transporter.Send(context.Background(), reqASCII) + elapsed := time.Since(start) + if err == nil { + t.Fatal("expected link recovery timeout error, got nil") + } + if !strings.Contains(err.Error(), "link recovery timeout reached") { + t.Fatalf("expected link recovery timeout error, got %v", err) + } + if !strings.Contains(err.Error(), "could not open") { + t.Fatalf("expected reconnect open failure to be wrapped, got %v", err) + } + if elapsed < recoveryTimeout-20*time.Millisecond { + t.Fatalf("expected recovery to keep retrying for about %v, returned after %v", recoveryTimeout, elapsed) + } + if !port.closed { + t.Fatal("expected reconnect to close the original port after read EOF") + } + if got := port.written.Bytes(); !bytes.Equal(got, reqASCII) { + t.Fatalf("expected request %q, got %q", reqASCII, got) + } +} + +func TestASCIISerialTransporter_ReconnectOnWriteEOF(t *testing.T) { + reqASCII := []byte(":010300000001FB\r\n") + port := &scriptedPort{writeErr: io.EOF} + recoveryTimeout := 80 * time.Millisecond + + transporter := &asciiSerialTransporter{} + transporter.Address = filepath.Join(t.TempDir(), "missing-serial") + transporter.port = port + transporter.BaudRate = 19200 + transporter.Timeout = 100 * time.Millisecond + transporter.LinkRecoveryTimeout = recoveryTimeout + + start := time.Now() + _, err := transporter.Send(context.Background(), reqASCII) + elapsed := time.Since(start) + if err == nil { + t.Fatal("expected reconnect error after write EOF, got nil") + } + if !strings.Contains(err.Error(), "link recovery timeout reached") || !strings.Contains(err.Error(), "could not open") { + t.Fatalf("expected timed-out reconnect open failure, got %v", err) + } + if elapsed < recoveryTimeout-20*time.Millisecond { + t.Fatalf("expected recovery to keep retrying for about %v, returned after %v", recoveryTimeout, elapsed) + } + if !port.closed { + t.Fatal("expected reconnect to close the original port after write EOF") + } +} From b6ab4a6b8a4cedd06c302ccffc36346f09637d6f Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 17 Apr 2026 18:05:40 +0530 Subject: [PATCH 22/24] ASCII decoding and reading tests --- asciiclient_test.go | 137 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) diff --git a/asciiclient_test.go b/asciiclient_test.go index 1502066..006cadd 100644 --- a/asciiclient_test.go +++ b/asciiclient_test.go @@ -6,9 +6,31 @@ package modbus import ( "bytes" + "context" + "errors" + "io" + "strings" "testing" + "time" ) +type asciiTestReader struct { + readData []byte + readErr error +} + +func (r *asciiTestReader) Read(b []byte) (int, error) { + if len(r.readData) > 0 { + b[0] = r.readData[0] + r.readData = r.readData[1:] + return 1, nil + } + if r.readErr != nil { + return 0, r.readErr + } + return 0, io.EOF +} + func TestASCIIEncoding(t *testing.T) { encoder := asciiPackager{} encoder.SlaveID = 17 @@ -68,6 +90,121 @@ func TestASCIIDecodeStartCharacter(t *testing.T) { } } +func TestASCIIDecodeInvalidLRC(t *testing.T) { + decoder := asciiPackager{} + adu := []byte(":F7031389000A61\r\n") + + _, err := decoder.Decode(adu) + if err == nil { + t.Fatal("expected invalid LRC error, got nil") + } + if !strings.Contains(err.Error(), "response lrc") { + t.Fatalf("expected LRC mismatch error, got %v", err) + } +} + +func TestASCIIVerifyErrors(t *testing.T) { + decoder := asciiPackager{} + aduReq := []byte(":010300010002F9\r\n") + + testcases := []struct { + name string + aduResp []byte + expectedErr string + }{ + { + name: "too short", + aduResp: []byte(":01\r\n"), + expectedErr: "does not meet minimum", + }, + { + name: "odd payload length", + aduResp: []byte(":010304010F1509CA\r"), + expectedErr: "is not an even number", + }, + { + name: "invalid start character", + aduResp: []byte("!010304010F1509CA\r\n"), + expectedErr: "is not started", + }, + { + name: "invalid frame terminator", + aduResp: []byte(":010304010F1509CA\n\n"), + expectedErr: "is not ended", + }, + { + name: "slave mismatch", + aduResp: []byte(":020304010F1509CA\r\n"), + expectedErr: "does not match request", + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + err := decoder.Verify(aduReq, tc.aduResp) + if err == nil { + t.Fatal("expected Verify to fail, got nil") + } + if !strings.Contains(err.Error(), tc.expectedErr) { + t.Fatalf("expected error containing %q, got %v", tc.expectedErr, err) + } + }) + } +} + +func TestReadASCII(t *testing.T) { + testcases := []struct { + name string + reader *asciiTestReader + deadline time.Time + want []byte + wantErr error + }{ + { + name: "complete frame", + reader: &asciiTestReader{readData: []byte(":0103020000FA\r\n")}, + deadline: time.Now().Add(time.Second), + want: []byte(":0103020000FA\r\n"), + }, + { + name: "stops at terminator", + reader: &asciiTestReader{readData: []byte(":0103020000FA\r\nignored")}, + deadline: time.Now().Add(time.Second), + want: []byte(":0103020000FA\r\n"), + }, + { + name: "reader timeout after partial frame", + reader: &asciiTestReader{readData: []byte(":0103020000FA\r"), readErr: context.DeadlineExceeded}, + deadline: time.Now().Add(time.Second), + wantErr: context.DeadlineExceeded, + }, + { + name: "deadline exceeded before read", + reader: &asciiTestReader{readData: []byte(":0103020000FA\r\n")}, + deadline: time.Now().Add(-time.Millisecond), + wantErr: context.DeadlineExceeded, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + got, err := readASCII(tc.reader, tc.deadline) + if tc.wantErr != nil { + if !errors.Is(err, tc.wantErr) { + t.Fatalf("expected error %v, got %v", tc.wantErr, err) + } + return + } + if err != nil { + t.Fatalf("readASCII returned error: %v", err) + } + if !bytes.Equal(got, tc.want) { + t.Fatalf("expected %q, got %q", tc.want, got) + } + }) + } +} + func BenchmarkASCIIEncoder(b *testing.B) { encoder := asciiPackager{ SlaveID: 10, From b79526aa5c14fd8cc0431c6e8b9bff2ea3e37ece Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 17 Apr 2026 18:20:01 +0530 Subject: [PATCH 23/24] support serial device hot plug --- serial.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/serial.go b/serial.go index bc6debe..f9199c4 100644 --- a/serial.go +++ b/serial.go @@ -108,12 +108,12 @@ func (mb *serialPort) reconnect(ctx context.Context, err error, linkRecoveryDead } mb.logf("modbus: connection reset, reconnecting") + recoveryErr := err if cerr := mb.close(); cerr != nil { + recoveryErr = errors.Join(recoveryErr, cerr) mb.logf("modbus: error closing connection: %v", cerr) - return cerr } - lastErr := err deadlineTimer := time.NewTimer(time.Until(linkRecoveryDeadline)) defer deadlineTimer.Stop() retryTicker := time.NewTicker(mb.reconnectRetryInterval()) @@ -123,7 +123,7 @@ func (mb *serialPort) reconnect(ctx context.Context, err error, linkRecoveryDead if cerr := mb.connect(ctx); cerr == nil { return nil } else { - lastErr = cerr + recoveryErr = errors.Join(recoveryErr, cerr) mb.logf("modbus: error reconnecting: %v", cerr) } @@ -131,7 +131,7 @@ func (mb *serialPort) reconnect(ctx context.Context, err error, linkRecoveryDead case <-ctx.Done(): return ctx.Err() case <-deadlineTimer.C: - return fmt.Errorf("modbus: link recovery timeout reached: %w", lastErr) + return fmt.Errorf("modbus: link recovery timeout reached: %w", recoveryErr) case <-retryTicker.C: } } From e0c387ed062cac1a4e1e942a3c9c4282f784ef32 Mon Sep 17 00:00:00 2001 From: Souyama Date: Fri, 17 Apr 2026 18:24:43 +0530 Subject: [PATCH 24/24] hot plug test recovery --- serial_test.go | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/serial_test.go b/serial_test.go index 8828a07..86cd010 100644 --- a/serial_test.go +++ b/serial_test.go @@ -3,8 +3,10 @@ package modbus import ( "bytes" "context" + "errors" "io" "log" + "os" "path/filepath" "strings" "sync/atomic" @@ -23,6 +25,18 @@ func (n *nopCloser) Close() error { return nil } +type errCloser struct { + io.ReadWriter + + closed atomic.Bool + err error +} + +func (e *errCloser) Close() error { + e.closed.Store(true) + return e.err +} + func TestSerialCloseIdle(t *testing.T) { port := &nopCloser{ ReadWriter: &bytes.Buffer{}, @@ -85,7 +99,84 @@ func TestSerialReconnect_DefaultRetryIntervalRetriesMultipleTimes(t *testing.T) if err == nil { t.Fatal("expected reconnect to fail when the serial device is missing") } + if !errors.Is(err, io.EOF) { + t.Fatalf("expected reconnect to preserve the original EOF, got %v", err) + } if count := strings.Count(logs.String(), "error reconnecting"); count < 2 { t.Fatalf("expected default retry interval to attempt reconnect multiple times, got %d logs: %q", count, logs.String()) } + if count := strings.Count(err.Error(), "could not open"); count < 2 { + t.Fatalf("expected reconnect error to include multiple open failures, got %d in %q", count, err.Error()) + } +} + +func TestSerialReconnectHotPlug_EventuallySucceedsWithinRecoveryWindow_PTY(t *testing.T) { + var logs bytes.Buffer + + initialPort := &errCloser{ + ReadWriter: &bytes.Buffer{}, + err: errors.New("device disappeared"), + } + recoveryTimeout := 200 * time.Millisecond + stablePath := filepath.Join(t.TempDir(), "recovering-serial") + + type reopenResult struct { + master *os.File + err error + } + + reopenReady := make(chan reopenResult, 1) + go func() { + time.Sleep(45 * time.Millisecond) + + master, slavePath, err := openPTY() + if err != nil { + reopenReady <- reopenResult{err: err} + return + } + if err := os.Symlink(slavePath, stablePath); err != nil { + _ = master.Close() + reopenReady <- reopenResult{err: err} + return + } + reopenReady <- reopenResult{master: master} + }() + + s := serialPort{ + Logger: log.New(&logs, "", 0), + port: initialPort, + LinkRecoveryTimeout: recoveryTimeout, + ReconnectRetryInterval: 10 * time.Millisecond, + } + s.Address = stablePath + s.BaudRate = 19200 + s.Timeout = 50 * time.Millisecond + + err := s.reconnect(context.Background(), io.EOF, time.Now().Add(recoveryTimeout)) + if err != nil { + t.Fatalf("expected reconnect to succeed before timeout, got %v", err) + } + if s.port == nil || s.port == initialPort { + t.Fatalf("expected reconnect to replace the original port, got %v", s.port) + } + + select { + case result := <-reopenReady: + if result.err != nil { + t.Fatal(result.err) + } + t.Cleanup(func() { + _ = s.Close() + _ = result.master.Close() + }) + case <-time.After(1 * time.Second): + t.Fatal("timed out waiting for PTY reopen setup") + } + + if !strings.Contains(logs.String(), "error closing connection") { + t.Fatalf("expected close error to be logged, got %q", logs.String()) + } + if count := strings.Count(logs.String(), "error reconnecting"); count < 1 { + t.Fatalf("expected reconnect to log failed reopen attempts before success, got %d logs: %q", count, logs.String()) + } }