From d73eac3816750c59432795d5548986524a421bae Mon Sep 17 00:00:00 2001 From: Alan Barker Date: Mon, 30 Jun 2025 15:08:39 -0400 Subject: [PATCH 1/5] feat: Add Environment ID support --- .github/variables/go-versions.env | 6 ++-- contract-tests/go.mod | 2 +- contract-tests/go.sum | 11 ++----- decoder.go | 30 +++++++++++++++---- decoder_test.go | 43 ++++++++++++--------------- go.mod | 4 +-- go.sum | 7 ++--- interface.go | 14 +++++++++ stream.go | 20 +++++++------ stream_error_after_subscribe_test.go | 2 +- stream_error_during_subscribe_test.go | 2 +- stream_reading_test.go | 21 ++++++++++++- stream_reconnect_test.go | 2 +- stream_requests_test.go | 2 +- stream_restart_close_test.go | 2 +- 15 files changed, 103 insertions(+), 65 deletions(-) diff --git a/.github/variables/go-versions.env b/.github/variables/go-versions.env index 3c277c3..cb4024a 100644 --- a/.github/variables/go-versions.env +++ b/.github/variables/go-versions.env @@ -1,3 +1,3 @@ -latest=1.22 -penultimate=1.21 -min=1.17 +latest=1.24 +penultimate=1.23 +min=1.23 diff --git a/contract-tests/go.mod b/contract-tests/go.mod index abd7a70..0900a0b 100644 --- a/contract-tests/go.mod +++ b/contract-tests/go.mod @@ -1,6 +1,6 @@ module github.com/launchdarkly/eventsource/contract-tests -go 1.17 +go 1.23 replace github.com/launchdarkly/eventsource => ../ diff --git a/contract-tests/go.sum b/contract-tests/go.sum index d26315c..acf1872 100644 --- a/contract-tests/go.sum +++ b/contract-tests/go.sum @@ -1,17 +1,10 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/launchdarkly/go-test-helpers/v2 v2.2.0 h1:L3kGILP/6ewikhzhdNkHy1b5y4zs50LueWenVF0sBbs= -github.com/launchdarkly/go-test-helpers/v2 v2.2.0/go.mod h1:L7+th5govYp5oKU9iN7To5PgznBuIjBPn+ejqKR0avw= +github.com/launchdarkly/go-test-helpers/v3 v3.1.0 h1:E3bxJMzMoA+cJSF3xxtk2/chr1zshl1ZWa0/oR+8bvg= +github.com/launchdarkly/go-test-helpers/v3 v3.1.0/go.mod h1:Ake5+hZFS/DmIGKx/cizhn5W9pGA7pplcR7xCxWiLIo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.0 h1:jlIyCplCJFULU/01vCkhKuTyc3OorI3bJFuw6obfgho= github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/decoder.go b/decoder.go index 4e5ec4a..a50e518 100644 --- a/decoder.go +++ b/decoder.go @@ -9,8 +9,8 @@ import ( ) type publication struct { - id, event, data, lastEventID string - retry int64 + id, event, data, lastEventID, environmentID string + retry int64 } //nolint:revive,stylecheck // should be ID; retained for backward compatibility @@ -22,12 +22,16 @@ func (s *publication) Retry() int64 { return s.retry } // LastEventID is from a separate interface, EventWithLastID func (s *publication) LastEventID() string { return s.lastEventID } +// EnvironmentID is from a separate interface, EventWithEnvironmentID +func (s *publication) EnvironmentID() string { return s.environmentID } + // A Decoder is capable of reading Events from a stream. type Decoder struct { - linesCh <-chan string - errorCh <-chan error - readTimeout time.Duration - lastEventID string + linesCh <-chan string + errorCh <-chan error + readTimeout time.Duration + lastEventID string + environmentID string } // DecoderOption is a common interface for optional configuration parameters that can be @@ -48,6 +52,12 @@ func (o lastEventIDDecoderOption) apply(d *Decoder) { d.lastEventID = string(o) } +type envrionmentIDDecoderOption string + +func (o envrionmentIDDecoderOption) apply(d *Decoder) { + d.environmentID = string(o) +} + // DecoderOptionReadTimeout returns an option that sets the read timeout interval for a // Decoder when the Decoder is created. If the Decoder does not receive new data within this // length of time, it will return an error. By default, there is no read timeout. @@ -62,6 +72,13 @@ func DecoderOptionLastEventID(lastEventID string) DecoderOption { return lastEventIDDecoderOption(lastEventID) } +// DecoderOptionEnvironmentID returns an option that sets the environment ID property for a +// Decoder when the Decoder is created. This allows the environment ID to be read from the +// X-Ld-Envid response header and included in new events. +func DecoderOptionEnvironmentID(environmentID string) DecoderOption { + return envrionmentIDDecoderOption(environmentID) +} + // NewDecoder returns a new Decoder instance that reads events with the given io.Reader. func NewDecoder(r io.Reader) *Decoder { bufReader := bufio.NewReader(newNormaliser(r)) @@ -152,6 +169,7 @@ ReadLoop: } pub.data = strings.TrimSuffix(pub.data, "\n") pub.lastEventID = dec.lastEventID + pub.environmentID = dec.environmentID return pub, nil } diff --git a/decoder_test.go b/decoder_test.go index f306e13..4c5b1f4 100644 --- a/decoder_test.go +++ b/decoder_test.go @@ -57,6 +57,13 @@ func requireLastEventID(t *testing.T, event Event) string { return eventWithID.LastEventID() } +func requireEnvironmentId(t *testing.T, event Event) string { + // necessary because we can't yet add EnvironmentID to the basic Event interface; see EventWithEnvironmentID + eventWithEnvID, ok := event.(EventWithEnvironmentID) + require.True(t, ok, "event should have implemented EventWithEnvironmentID") + return eventWithEnvID.EnvironmentID() +} + func TestDecoderTracksLastEventID(t *testing.T) { t.Run("uses last ID that is passed in options", func(t *testing.T) { inputData := "data: abc\n\n" @@ -96,8 +103,8 @@ func TestDecoderTracksLastEventID(t *testing.T) { assert.Equal(t, "def", requireLastEventID(t, event3)) }) - t.Run("last ID persists if not overridden", func(t *testing.T) { - inputData := "id: abc\ndata: first\n\ndata: second\n\nid: def\ndata:third\n\n" + t.Run("last ID can be overridden with empty string", func(t *testing.T) { + inputData := "id: abc\ndata: first\n\nid: \ndata: second\n\n" decoder := NewDecoderWithOptions(strings.NewReader(inputData), DecoderOptionLastEventID("my-id")) event1, err := decoder.Decode() @@ -112,32 +119,20 @@ func TestDecoderTracksLastEventID(t *testing.T) { assert.Equal(t, "second", event2.Data()) assert.Equal(t, "", event2.Id()) - assert.Equal(t, "abc", requireLastEventID(t, event2)) - - event3, err := decoder.Decode() - require.NoError(t, err) - - assert.Equal(t, "third", event3.Data()) - assert.Equal(t, "def", event3.Id()) - assert.Equal(t, "def", requireLastEventID(t, event3)) + assert.Equal(t, "", requireLastEventID(t, event2)) }) +} - t.Run("last ID can be overridden with empty string", func(t *testing.T) { - inputData := "id: abc\ndata: first\n\nid: \ndata: second\n\n" - decoder := NewDecoderWithOptions(strings.NewReader(inputData), DecoderOptionLastEventID("my-id")) - - event1, err := decoder.Decode() - require.NoError(t, err) - - assert.Equal(t, "first", event1.Data()) - assert.Equal(t, "abc", event1.Id()) - assert.Equal(t, "abc", requireLastEventID(t, event1)) +func TestDecoderTracksEnvironmentID(t *testing.T) { + t.Run("uses environment ID that is passed in options", func(t *testing.T) { + inputData := "data: abc\n\n" + decoder := NewDecoderWithOptions(strings.NewReader(inputData), DecoderOptionEnvironmentID("env-id")) - event2, err := decoder.Decode() + event, err := decoder.Decode() require.NoError(t, err) - assert.Equal(t, "second", event2.Data()) - assert.Equal(t, "", event2.Id()) - assert.Equal(t, "", requireLastEventID(t, event2)) + assert.Equal(t, "abc", event.Data()) + assert.Equal(t, "", event.Id()) + assert.Equal(t, "env-id", requireEnvironmentId(t, event)) }) } diff --git a/go.mod b/go.mod index 0188e78..2a71659 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,9 @@ module github.com/launchdarkly/eventsource -go 1.17 +go 1.23 require ( - github.com/launchdarkly/go-test-helpers/v2 v2.2.0 + github.com/launchdarkly/go-test-helpers/v3 v3.1.0 github.com/stretchr/testify v1.6.0 ) diff --git a/go.sum b/go.sum index 5c4b8ff..f6afd00 100644 --- a/go.sum +++ b/go.sum @@ -1,18 +1,15 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/launchdarkly/go-test-helpers/v2 v2.2.0 h1:L3kGILP/6ewikhzhdNkHy1b5y4zs50LueWenVF0sBbs= -github.com/launchdarkly/go-test-helpers/v2 v2.2.0/go.mod h1:L7+th5govYp5oKU9iN7To5PgznBuIjBPn+ejqKR0avw= +github.com/launchdarkly/go-test-helpers/v3 v3.1.0 h1:E3bxJMzMoA+cJSF3xxtk2/chr1zshl1ZWa0/oR+8bvg= +github.com/launchdarkly/go-test-helpers/v3 v3.1.0/go.mod h1:Ake5+hZFS/DmIGKx/cizhn5W9pGA7pplcR7xCxWiLIo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.0 h1:jlIyCplCJFULU/01vCkhKuTyc3OorI3bJFuw6obfgho= github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/interface.go b/interface.go index ba4a8a0..6807f05 100644 --- a/interface.go +++ b/interface.go @@ -32,6 +32,20 @@ type EventWithLastID interface { LastEventID() string } +// EventWithEnvironmentID is an additional interface for an event received by the client, +// allowing access to the EnvironmentID method. +// +// This is defined as a separate interface for backward compatibility, since this +// feature was added after the Event interface had been defined and adding a method +// to Event would break existing implementations. All events returned by Stream do +// implement this interface, and in a future major version the Event type will be +// changed to always include this field. +type EventWithEnvironmentID interface { + // EnvironmentID is the value of the `X-Ld-Envid` response header that was most recently + // seen in an event from this stream, if any. + EnvironmentID() string +} + // Repository is an interface to be used with Server.Register() allowing clients to replay previous events // through the server, if history is required. type Repository interface { diff --git a/stream.go b/stream.go index 2293960..2c07162 100644 --- a/stream.go +++ b/stream.go @@ -127,9 +127,9 @@ func SubscribeWithRequestAndOptions(request *http.Request, options ...StreamOpti initialRetryTimeoutCh = time.After(configuredOptions.initialRetryTimeout) } for { - r, err := stream.connect() + r, h, err := stream.connect() if err == nil { - go stream.stream(r) + go stream.stream(r, h) return stream, nil } lastError = err @@ -229,7 +229,7 @@ func (stream *Stream) Close() { }) } -func (stream *Stream) connect() (io.ReadCloser, error) { +func (stream *Stream) connect() (io.ReadCloser, http.Header, error) { var err error var resp *http.Response stream.req.Header.Set("Cache-Control", "no-cache") @@ -245,12 +245,12 @@ func (stream *Stream) connect() (io.ReadCloser, error) { // All but the initial connection will need to regenerate the body if stream.connections > 0 && req.GetBody != nil { if req.Body, err = req.GetBody(); err != nil { - return nil, err + return nil, nil, err } } if resp, err = stream.c.Do(&req); err != nil { - return nil, err + return nil, nil, err } stream.connections++ if resp.StatusCode != 200 { @@ -261,12 +261,12 @@ func (stream *Stream) connect() (io.ReadCloser, error) { Message: string(message), Header: resp.Header, } - return nil, err + return nil, nil, err } - return resp.Body, nil + return resp.Body, resp.Header, nil } -func (stream *Stream) stream(r io.ReadCloser) { +func (stream *Stream) stream(r io.ReadCloser, h http.Header) { retryChan := make(chan struct{}, 1) scheduleRetry := func() { @@ -302,6 +302,7 @@ NewStream: dec := NewDecoderWithOptions(r, DecoderOptionReadTimeout(stream.readTimeout), DecoderOptionLastEventID(stream.lastEventID), + DecoderOptionEnvironmentID(h.Get("X-Ld-Envid")), ) go func() { for { @@ -358,9 +359,10 @@ NewStream: break NewStream case <-retryChan: var err error - r, err = stream.connect() + r, h, err = stream.connect() if err != nil { r = nil + h = nil if !reportErrorAndMaybeContinue(err) { break NewStream } diff --git a/stream_error_after_subscribe_test.go b/stream_error_after_subscribe_test.go index 33767aa..5e3774b 100644 --- a/stream_error_after_subscribe_test.go +++ b/stream_error_after_subscribe_test.go @@ -8,7 +8,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/launchdarkly/go-test-helpers/v2/httphelpers" + "github.com/launchdarkly/go-test-helpers/v3/httphelpers" ) func TestStreamErrorsAreSentToErrorsChannel(t *testing.T) { diff --git a/stream_error_during_subscribe_test.go b/stream_error_during_subscribe_test.go index 1c7cc8b..5ce3f1a 100644 --- a/stream_error_during_subscribe_test.go +++ b/stream_error_during_subscribe_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/launchdarkly/go-test-helpers/v2/httphelpers" + "github.com/launchdarkly/go-test-helpers/v3/httphelpers" ) func handlerCausingNetworkError() http.Handler { diff --git a/stream_reading_test.go b/stream_reading_test.go index cbe37db..c0c3bf5 100644 --- a/stream_reading_test.go +++ b/stream_reading_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/launchdarkly/go-test-helpers/v2/httphelpers" + "github.com/launchdarkly/go-test-helpers/v3/httphelpers" ) func TestStreamSubscribeEventsChan(t *testing.T) { @@ -29,6 +29,25 @@ func TestStreamSubscribeEventsChan(t *testing.T) { } } +func TestStreamCanReadEnvironmentID(t *testing.T) { + streamHandler, streamControl := httphelpers.SSEHandlerWithEnvironmentID(nil, "env-id") + defer streamControl.Close() + httpServer := httptest.NewServer(streamHandler) + defer httpServer.Close() + + stream := mustSubscribe(t, httpServer.URL) + defer stream.Close() + + streamControl.Send(httphelpers.SSEEvent{ID: "123"}) + + select { + case receivedEvent := <-stream.Events: + assert.Equal(t, &publication{id: "123", lastEventID: "123", environmentID: "env-id"}, receivedEvent) + case <-time.After(timeToWaitForEvent): + t.Error("Timed out waiting for event") + } +} + func TestStreamCanChangeRetryDelayBasedOnEvent(t *testing.T) { streamHandler, streamControl := httphelpers.SSEHandler(nil) defer streamControl.Close() diff --git a/stream_reconnect_test.go b/stream_reconnect_test.go index db51e4c..65d6aac 100644 --- a/stream_reconnect_test.go +++ b/stream_reconnect_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/launchdarkly/go-test-helpers/v2/httphelpers" + "github.com/launchdarkly/go-test-helpers/v3/httphelpers" ) func TestStreamReconnectsIfConnectionIsBroken(t *testing.T) { diff --git a/stream_requests_test.go b/stream_requests_test.go index d98fb52..ea800ce 100644 --- a/stream_requests_test.go +++ b/stream_requests_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/launchdarkly/go-test-helpers/v2/httphelpers" + "github.com/launchdarkly/go-test-helpers/v3/httphelpers" ) func TestStreamCanUseCustomClient(t *testing.T) { diff --git a/stream_restart_close_test.go b/stream_restart_close_test.go index 7fab2d7..18135b0 100644 --- a/stream_restart_close_test.go +++ b/stream_restart_close_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/launchdarkly/go-test-helpers/v2/httphelpers" + "github.com/launchdarkly/go-test-helpers/v3/httphelpers" ) func toPublication(e httphelpers.SSEEvent) *publication { From 98879ab13187952410feeb15c648e0ca51397b56 Mon Sep 17 00:00:00 2001 From: Alan Barker Date: Mon, 30 Jun 2025 15:33:15 -0400 Subject: [PATCH 2/5] chore: fix ci build --- .golangci.yml | 46 ++++++++++++++++++++++++---------------------- Makefile | 2 +- stream.go | 3 +-- 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index a592413..1caf2a0 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,26 +1,17 @@ +version: "2" run: - deadline: 120s tests: false - linters: enable: - bodyclose - dupl - - errcheck - - exportloopref - - goconst - gochecknoglobals - gochecknoinits - goconst - gocritic - gocyclo - godox - - gofmt - - goimports - gosec - - gosimple - - govet - - ineffassign - lll - misspell - nakedret @@ -28,20 +19,31 @@ linters: - prealloc - revive - staticcheck - - stylecheck - - typecheck - unconvert - unparam - - unused - whitespace - fast: false - -linters-settings: - gofmt: - simplify: false - goimports: - local-prefixes: github.com/launchdarkly - + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ issues: - exclude-use-default: false max-same-issues: 1000 +formatters: + enable: + - gofmt + - goimports + settings: + gofmt: + simplify: false + goimports: + local-prefixes: + - gopkg.in/launchdarkly + - github.com/launchdarkly + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/Makefile b/Makefile index c52ae2b..9c7b988 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -GOLANGCI_LINT_VERSION=v1.60.1 +GOLANGCI_LINT_VERSION=v1.64.5 LINTER=./bin/golangci-lint LINTER_VERSION_FILE=./bin/.golangci-lint-version-$(GOLANGCI_LINT_VERSION) diff --git a/stream.go b/stream.go index 2c07162..00beb28 100644 --- a/stream.go +++ b/stream.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net/http" "net/url" "sync" @@ -254,7 +253,7 @@ func (stream *Stream) connect() (io.ReadCloser, http.Header, error) { } stream.connections++ if resp.StatusCode != 200 { - message, _ := ioutil.ReadAll(resp.Body) + message, _ := io.ReadAll(resp.Body) _ = resp.Body.Close() err = SubscriptionError{ Code: resp.StatusCode, From 0c24a6dd2af2e6617cbd80549fe8b82473fc5764 Mon Sep 17 00:00:00 2001 From: Alan Barker Date: Mon, 30 Jun 2025 15:43:42 -0400 Subject: [PATCH 3/5] chore: fix duplicate go versions issue in ci build --- .github/workflows/go-versions.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/go-versions.yml b/.github/workflows/go-versions.yml index 86090db..5de11c7 100644 --- a/.github/workflows/go-versions.yml +++ b/.github/workflows/go-versions.yml @@ -49,4 +49,8 @@ jobs: - name: Set Go Version Matrices id: set-matrix run: | - echo "all=[\"${{ steps.set-env.outputs.latest }}\",\"${{ steps.set-env.outputs.penultimate }}\",\"${{ steps.set-env.outputs.min }}\"]" >> $GITHUB_OUTPUT + if [ "${{ steps.set-env.outputs.penultimate }}" == "${{ steps.set-env.outputs.min }}" ]; then + echo "all=[\"${{ steps.set-env.outputs.latest }}\",\"${{ steps.set-env.outputs.penultimate }}\"]" >> $GITHUB_OUTPUT + else + echo "all=[\"${{ steps.set-env.outputs.latest }}\",\"${{ steps.set-env.outputs.penultimate }}\",\"${{ steps.set-env.outputs.min }}\"]" >> $GITHUB_OUTPUT + fi From b3a5dcba44b66090316652d939a50527ed30b792 Mon Sep 17 00:00:00 2001 From: Alan Barker Date: Tue, 1 Jul 2025 09:11:51 -0400 Subject: [PATCH 4/5] chore: change implementation from environmentId to more generic header support --- decoder.go | 38 ++++++++++++++++++++------------------ decoder_test.go | 23 ++++++++++++++--------- interface.go | 13 +++++++------ stream.go | 2 +- stream_reading_test.go | 11 ++++++++--- stream_reconnect_test.go | 6 +++++- 6 files changed, 55 insertions(+), 38 deletions(-) diff --git a/decoder.go b/decoder.go index a50e518..1fcea70 100644 --- a/decoder.go +++ b/decoder.go @@ -3,14 +3,16 @@ package eventsource import ( "bufio" "io" + "net/http" "strconv" "strings" "time" ) type publication struct { - id, event, data, lastEventID, environmentID string - retry int64 + id, event, data, lastEventID string + headers http.Header + retry int64 } //nolint:revive,stylecheck // should be ID; retained for backward compatibility @@ -22,16 +24,16 @@ func (s *publication) Retry() int64 { return s.retry } // LastEventID is from a separate interface, EventWithLastID func (s *publication) LastEventID() string { return s.lastEventID } -// EnvironmentID is from a separate interface, EventWithEnvironmentID -func (s *publication) EnvironmentID() string { return s.environmentID } +// Headers is from a separate interface, EventWithHeaders +func (s *publication) Headers() http.Header { return s.headers } // A Decoder is capable of reading Events from a stream. type Decoder struct { - linesCh <-chan string - errorCh <-chan error - readTimeout time.Duration - lastEventID string - environmentID string + linesCh <-chan string + errorCh <-chan error + readTimeout time.Duration + lastEventID string + headers http.Header } // DecoderOption is a common interface for optional configuration parameters that can be @@ -52,10 +54,10 @@ func (o lastEventIDDecoderOption) apply(d *Decoder) { d.lastEventID = string(o) } -type envrionmentIDDecoderOption string +type headersDecoderOption http.Header -func (o envrionmentIDDecoderOption) apply(d *Decoder) { - d.environmentID = string(o) +func (o headersDecoderOption) apply(d *Decoder) { + d.headers = http.Header(o) } // DecoderOptionReadTimeout returns an option that sets the read timeout interval for a @@ -72,11 +74,11 @@ func DecoderOptionLastEventID(lastEventID string) DecoderOption { return lastEventIDDecoderOption(lastEventID) } -// DecoderOptionEnvironmentID returns an option that sets the environment ID property for a -// Decoder when the Decoder is created. This allows the environment ID to be read from the -// X-Ld-Envid response header and included in new events. -func DecoderOptionEnvironmentID(environmentID string) DecoderOption { - return envrionmentIDDecoderOption(environmentID) +// DecoderOptionHeaders returns an option that sets the Headers property for a +// Decoder when the Decoder is created. This allows access to the HTTP response +// headers for the event. +func DecoderOptionHeaders(headers http.Header) DecoderOption { + return headersDecoderOption(headers) } // NewDecoder returns a new Decoder instance that reads events with the given io.Reader. @@ -169,7 +171,7 @@ ReadLoop: } pub.data = strings.TrimSuffix(pub.data, "\n") pub.lastEventID = dec.lastEventID - pub.environmentID = dec.environmentID + pub.headers = dec.headers return pub, nil } diff --git a/decoder_test.go b/decoder_test.go index 4c5b1f4..c2b9e6f 100644 --- a/decoder_test.go +++ b/decoder_test.go @@ -2,6 +2,7 @@ package eventsource import ( "io" + "net/http" "strings" "testing" @@ -57,11 +58,11 @@ func requireLastEventID(t *testing.T, event Event) string { return eventWithID.LastEventID() } -func requireEnvironmentId(t *testing.T, event Event) string { - // necessary because we can't yet add EnvironmentID to the basic Event interface; see EventWithEnvironmentID - eventWithEnvID, ok := event.(EventWithEnvironmentID) - require.True(t, ok, "event should have implemented EventWithEnvironmentID") - return eventWithEnvID.EnvironmentID() +func requireHeaders(t *testing.T, event Event) http.Header { + // necessary because we can't yet add Headers to the basic Event interface; see EventWithHeaders + eventWithHeaders, ok := event.(EventWithHeaders) + require.True(t, ok, "event should have implemented EventWithHeaders") + return eventWithHeaders.Headers() } func TestDecoderTracksLastEventID(t *testing.T) { @@ -123,16 +124,20 @@ func TestDecoderTracksLastEventID(t *testing.T) { }) } -func TestDecoderTracksEnvironmentID(t *testing.T) { - t.Run("uses environment ID that is passed in options", func(t *testing.T) { +func TestDecoderTracksHeaders(t *testing.T) { + t.Run("uses headers that are passed in options", func(t *testing.T) { inputData := "data: abc\n\n" - decoder := NewDecoderWithOptions(strings.NewReader(inputData), DecoderOptionEnvironmentID("env-id")) + headers := http.Header{ + "X-Ld-Envid": {"env-id"}, + } + + decoder := NewDecoderWithOptions(strings.NewReader(inputData), DecoderOptionHeaders(headers)) event, err := decoder.Decode() require.NoError(t, err) assert.Equal(t, "abc", event.Data()) assert.Equal(t, "", event.Id()) - assert.Equal(t, "env-id", requireEnvironmentId(t, event)) + assert.Equal(t, headers, requireHeaders(t, event)) }) } diff --git a/interface.go b/interface.go index 6807f05..112dd5a 100644 --- a/interface.go +++ b/interface.go @@ -5,6 +5,8 @@ // If the Repository interface is implemented on the server, events can be replayed in case of a network disconnection. package eventsource +import "net/http" + // Event is the interface for any event received by the client or sent by the server. type Event interface { // Id is an identifier that can be used to allow a client to replay @@ -32,18 +34,17 @@ type EventWithLastID interface { LastEventID() string } -// EventWithEnvironmentID is an additional interface for an event received by the client, -// allowing access to the EnvironmentID method. +// EventWithHeaders is an additional interface for an event received by the client, +// allowing access to the HTTP response headers. // // This is defined as a separate interface for backward compatibility, since this // feature was added after the Event interface had been defined and adding a method // to Event would break existing implementations. All events returned by Stream do // implement this interface, and in a future major version the Event type will be // changed to always include this field. -type EventWithEnvironmentID interface { - // EnvironmentID is the value of the `X-Ld-Envid` response header that was most recently - // seen in an event from this stream, if any. - EnvironmentID() string +type EventWithHeaders interface { + // Headers provides access to the HTTP response headers for the event. + Headers() http.Header } // Repository is an interface to be used with Server.Register() allowing clients to replay previous events diff --git a/stream.go b/stream.go index 00beb28..5d886e0 100644 --- a/stream.go +++ b/stream.go @@ -301,7 +301,7 @@ NewStream: dec := NewDecoderWithOptions(r, DecoderOptionReadTimeout(stream.readTimeout), DecoderOptionLastEventID(stream.lastEventID), - DecoderOptionEnvironmentID(h.Get("X-Ld-Envid")), + DecoderOptionHeaders(h), ) go func() { for { diff --git a/stream_reading_test.go b/stream_reading_test.go index c0c3bf5..b438853 100644 --- a/stream_reading_test.go +++ b/stream_reading_test.go @@ -23,13 +23,17 @@ func TestStreamSubscribeEventsChan(t *testing.T) { select { case receivedEvent := <-stream.Events: - assert.Equal(t, &publication{id: "123", lastEventID: "123"}, receivedEvent) + receivedWithLastId := receivedEvent.(EventWithLastID) + receivedWithHeaders := receivedEvent.(EventWithHeaders) + assert.Equal(t, "123", receivedEvent.Id()) + assert.Equal(t, "123", receivedWithLastId.LastEventID()) + assert.NotEmpty(t, receivedWithHeaders.Headers()) case <-time.After(timeToWaitForEvent): t.Error("Timed out waiting for event") } } -func TestStreamCanReadEnvironmentID(t *testing.T) { +func TestStreamCanReadSpecificHeader(t *testing.T) { streamHandler, streamControl := httphelpers.SSEHandlerWithEnvironmentID(nil, "env-id") defer streamControl.Close() httpServer := httptest.NewServer(streamHandler) @@ -42,7 +46,8 @@ func TestStreamCanReadEnvironmentID(t *testing.T) { select { case receivedEvent := <-stream.Events: - assert.Equal(t, &publication{id: "123", lastEventID: "123", environmentID: "env-id"}, receivedEvent) + receivedWithHeaders := receivedEvent.(EventWithHeaders) + assert.Equal(t, "env-id", receivedWithHeaders.Headers().Get("X-Ld-Envid")) case <-time.After(timeToWaitForEvent): t.Error("Timed out waiting for event") } diff --git a/stream_reconnect_test.go b/stream_reconnect_test.go index 65d6aac..a284d93 100644 --- a/stream_reconnect_test.go +++ b/stream_reconnect_test.go @@ -49,7 +49,11 @@ func TestStreamReconnectsIfConnectionIsBroken(t *testing.T) { t.Error("Timed out waiting for event") return case receivedEvent := <-stream.Events: - assert.Equal(t, &publication{id: "123", lastEventID: "123"}, receivedEvent) + receivedWithLastId := receivedEvent.(EventWithLastID) + receivedWithHeaders := receivedEvent.(EventWithHeaders) + assert.Equal(t, "123", receivedEvent.Id()) + assert.Equal(t, "123", receivedWithLastId.LastEventID()) + assert.NotEmpty(t, receivedWithHeaders.Headers()) return } } From 4065d35b00ba53c11ea397a3f201b8bcf453cb0a Mon Sep 17 00:00:00 2001 From: Alan Barker Date: Tue, 1 Jul 2025 14:54:53 -0400 Subject: [PATCH 5/5] chore: Making event headers immutable --- decoder.go | 7 ++++++- decoder_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/decoder.go b/decoder.go index 1fcea70..2e84e0a 100644 --- a/decoder.go +++ b/decoder.go @@ -25,7 +25,12 @@ func (s *publication) Retry() int64 { return s.retry } func (s *publication) LastEventID() string { return s.lastEventID } // Headers is from a separate interface, EventWithHeaders -func (s *publication) Headers() http.Header { return s.headers } +func (s *publication) Headers() http.Header { + if s.headers == nil { + return nil + } + return s.headers.Clone() +} // A Decoder is capable of reading Events from a stream. type Decoder struct { diff --git a/decoder_test.go b/decoder_test.go index c2b9e6f..a56ac3a 100644 --- a/decoder_test.go +++ b/decoder_test.go @@ -125,6 +125,19 @@ func TestDecoderTracksLastEventID(t *testing.T) { } func TestDecoderTracksHeaders(t *testing.T) { + t.Run("event headers is nil if not provided in options", func(t *testing.T) { + inputData := "data: abc\n\n" + + decoder := NewDecoderWithOptions(strings.NewReader(inputData)) + + event, err := decoder.Decode() + require.NoError(t, err) + + assert.Equal(t, "abc", event.Data()) + assert.Equal(t, "", event.Id()) + assert.Nil(t, requireHeaders(t, event)) + }) + t.Run("uses headers that are passed in options", func(t *testing.T) { inputData := "data: abc\n\n" headers := http.Header{ @@ -140,4 +153,24 @@ func TestDecoderTracksHeaders(t *testing.T) { assert.Equal(t, "", event.Id()) assert.Equal(t, headers, requireHeaders(t, event)) }) + + t.Run("event headers are immutable", func(t *testing.T) { + inputData := "data: abc\n\n" + headers := http.Header{ + "X-Ld-Envid": {"env-id"}, + } + + decoder := NewDecoderWithOptions(strings.NewReader(inputData), DecoderOptionHeaders(headers)) + + event, err := decoder.Decode() + require.NoError(t, err) + + eventHeaders := requireHeaders(t, event) + assert.Equal(t, "abc", event.Data()) + assert.Equal(t, "", event.Id()) + assert.Equal(t, headers, eventHeaders) + + eventHeaders.Add("New-Header", "new-value") + assert.NotContains(t, headers, "New-Header") + }) }