From b69070421a3ef5b9fb9aa3f4835b03484b350eab Mon Sep 17 00:00:00 2001 From: jordanstephens Date: Mon, 6 Apr 2026 15:29:24 -0700 Subject: [PATCH 1/3] fix(gcp): handle empty TailLogEntries responses instead of returning an error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The GCP TailLogEntries streaming API can return a TailLogEntriesResponse with zero entries. This happens for heartbeat messages (sent periodically to keep the stream alive) and for suppression-info responses (sent when entries are being rate-limited or sampled on the server side). The response proto has a dedicated SuppressionInfo field for this purpose. Previously, gcpLoggingTailer.Next() treated any empty-entries response as a hard error ("no log entries found"). Because this is not a transient error, WaitServiceState would not retry — the subscribe stream would terminate and the deployment monitor would fail mid-deployment with a spurious error unrelated to the actual deployment outcome. The fix returns nil, nil from Next() on an empty response. The Follow() loop in stream.go now skips nil entries and continues waiting, matching the expected streaming behavior. Co-Authored-By: Claude Sonnet 4.6 --- src/pkg/cli/client/byoc/gcp/stream.go | 3 + src/pkg/cli/client/byoc/gcp/stream_test.go | 71 +++++++++++++++++ src/pkg/clouds/gcp/logging.go | 4 +- src/pkg/clouds/gcp/logging_test.go | 89 ++++++++++++++++++++++ 4 files changed, 166 insertions(+), 1 deletion(-) create mode 100644 src/pkg/clouds/gcp/logging_test.go diff --git a/src/pkg/cli/client/byoc/gcp/stream.go b/src/pkg/cli/client/byoc/gcp/stream.go index e9b1e1abf..19fd0ea27 100644 --- a/src/pkg/cli/client/byoc/gcp/stream.go +++ b/src/pkg/cli/client/byoc/gcp/stream.go @@ -105,6 +105,9 @@ func (s *ServerStream[T]) Follow(start time.Time) (iter.Seq2[*T, error], error) yield(nil, err) return } + if entry == nil { + continue // empty Recv response (heartbeat/suppression), keep looping + } resps, err := s.parseAndFilter(entry) if err != nil { yield(nil, err) diff --git a/src/pkg/cli/client/byoc/gcp/stream_test.go b/src/pkg/cli/client/byoc/gcp/stream_test.go index 0ffc17009..325da78a7 100644 --- a/src/pkg/cli/client/byoc/gcp/stream_test.go +++ b/src/pkg/cli/client/byoc/gcp/stream_test.go @@ -1,9 +1,11 @@ package gcp import ( + "context" "iter" "strconv" "testing" + "time" "cloud.google.com/go/logging/apiv2/loggingpb" "github.com/DefangLabs/defang/src/pkg/clouds/gcp" @@ -179,3 +181,72 @@ func TestServerStream_Start(t *testing.T) { }) } } + +// TestServerStream_Follow_SkipsNilEntries verifies that Follow() skips nil entries +// returned by the tailer (heartbeat or suppression-info responses from GCP) and +// continues yielding real log entries without error. +func TestServerStream_Follow_SkipsNilEntries(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + // instanceId avoids a nil-pointer dereference in the parser when Resource is absent. + svcLabels := map[string]string{"defang-service": "svc", "instanceId": "inst1"} + + realEntry := &loggingpb.LogEntry{ + Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "real log"}, + Labels: svcLabels, + Timestamp: timestamppb.Now(), + } + + // cancelEntry is a sentinel: when the tailer returns it, we cancel the context + // so the Follow loop exits cleanly rather than blocking forever. + cancelEntry := &loggingpb.LogEntry{ + Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "cancel"}, + Labels: svcLabels, + Timestamp: timestamppb.Now(), + } + + tailerEntries := []*loggingpb.LogEntry{ + nil, // heartbeat — must be skipped + realEntry, + nil, // suppression info — must be skipped + cancelEntry, + } + + mockClient := &MockGcpLogsClient{ + lister: &MockGcpLoggingLister{}, + tailer: &MockGcpLoggingTailer{ + MockGcpLoggingLister: MockGcpLoggingLister{logEntries: tailerEntries}, + }, + } + + services := []string{"svc"} + restoreServiceName := getServiceNameRestorer(services, gcp.SafeLabelValue, + func(entry *defangv1.TailResponse) string { return entry.Service }, + func(entry *defangv1.TailResponse, name string) *defangv1.TailResponse { + entry.Service = name + return entry + }) + + stream := NewServerStream(ctx, mockClient, getLogEntryParser(ctx, mockClient), restoreServiceName) + stream.query = NewLogQuery(mockClient.GetProjectID()) + + seq, err := stream.Follow(time.Time{}) // zero start → skip listing, go straight to tail + assert.NoError(t, err) + + var messages []string + for resp, err := range seq { + assert.NoError(t, err) + if err != nil { + break + } + msg := resp.Entries[0].Message + messages = append(messages, msg) + if msg == "cancel" { + cancel() + } + } + + assert.Equal(t, []string{"real log", "cancel"}, messages, + "Follow() should skip nil tailer entries and yield real entries") +} diff --git a/src/pkg/clouds/gcp/logging.go b/src/pkg/clouds/gcp/logging.go index c7b107e3d..992611e4f 100644 --- a/src/pkg/clouds/gcp/logging.go +++ b/src/pkg/clouds/gcp/logging.go @@ -62,7 +62,9 @@ func (t *gcpLoggingTailer) Next(ctx context.Context) (*loggingpb.LogEntry, error } t.cache = resp.GetEntries() if len(t.cache) == 0 { - return nil, errors.New("no log entries found") + // GCP may send empty responses (heartbeats, suppression info); return nil + // so the caller can continue looping without treating this as an error. + return nil, nil } } diff --git a/src/pkg/clouds/gcp/logging_test.go b/src/pkg/clouds/gcp/logging_test.go new file mode 100644 index 000000000..f88a49eba --- /dev/null +++ b/src/pkg/clouds/gcp/logging_test.go @@ -0,0 +1,89 @@ +package gcp + +import ( + "context" + "io" + "testing" + + "cloud.google.com/go/logging/apiv2/loggingpb" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// mockTailLogEntriesClient implements loggingpb.LoggingServiceV2_TailLogEntriesClient +// for unit testing gcpLoggingTailer.Next(). +type mockTailLogEntriesClient struct { + responses []*loggingpb.TailLogEntriesResponse + err error +} + +func (m *mockTailLogEntriesClient) Send(*loggingpb.TailLogEntriesRequest) error { return nil } +func (m *mockTailLogEntriesClient) Recv() (*loggingpb.TailLogEntriesResponse, error) { + if len(m.responses) == 0 { + if m.err != nil { + return nil, m.err + } + return nil, io.EOF + } + resp := m.responses[0] + m.responses = m.responses[1:] + return resp, nil +} +func (m *mockTailLogEntriesClient) Header() (metadata.MD, error) { return nil, nil } +func (m *mockTailLogEntriesClient) Trailer() metadata.MD { return nil } +func (m *mockTailLogEntriesClient) CloseSend() error { return nil } +func (m *mockTailLogEntriesClient) Context() context.Context { return context.Background() } +func (m *mockTailLogEntriesClient) SendMsg(any) error { return nil } +func (m *mockTailLogEntriesClient) RecvMsg(any) error { return nil } + +var _ grpc.ClientStream = (*mockTailLogEntriesClient)(nil) + +func TestGcpLoggingTailerNext_EmptyResponse(t *testing.T) { + // An empty-entries response (heartbeat or suppression info) must return nil, nil + // so the caller can continue looping without treating it as an error. + client := &mockTailLogEntriesClient{ + responses: []*loggingpb.TailLogEntriesResponse{ + {Entries: nil}, // empty — heartbeat + }, + } + tailer := &gcpLoggingTailer{tleClient: client} + + entry, err := tailer.Next(context.Background()) + if err != nil { + t.Fatalf("Next() error = %v, want nil", err) + } + if entry != nil { + t.Fatalf("Next() entry = %v, want nil", entry) + } +} + +func TestGcpLoggingTailerNext_WithEntries(t *testing.T) { + // A response with entries should return the first entry and cache the rest. + entries := []*loggingpb.LogEntry{ + {InsertId: "entry1"}, + {InsertId: "entry2"}, + } + client := &mockTailLogEntriesClient{ + responses: []*loggingpb.TailLogEntriesResponse{ + {Entries: entries}, + }, + } + tailer := &gcpLoggingTailer{tleClient: client} + + entry, err := tailer.Next(context.Background()) + if err != nil { + t.Fatalf("Next() error = %v, want nil", err) + } + if entry == nil || entry.InsertId != "entry1" { + t.Fatalf("Next() entry = %v, want entry1", entry) + } + + // Second call should return cached entry without calling Recv again. + entry, err = tailer.Next(context.Background()) + if err != nil { + t.Fatalf("Next() error = %v, want nil", err) + } + if entry == nil || entry.InsertId != "entry2" { + t.Fatalf("Next() entry = %v, want entry2", entry) + } +} From e792e7205a95e50b06f6f767d4fa97e814199b38 Mon Sep 17 00:00:00 2001 From: jordanstephens Date: Tue, 7 Apr 2026 13:13:22 -0700 Subject: [PATCH 2/3] fix(gcp): retry on logging quota exhaustion with exponential backoff GCP Cloud Logging enforces a ReadRequestsPerMinutePerProject quota (120/min) that covers both ListLogEntries and TailLogEntries.Recv calls. During an active deployment with multiple services generating logs, the two concurrent streams (subscribe + log tail) can exhaust this quota, causing the deployment monitor to fail with ResourceExhausted. Add codes.ResourceExhausted to isTransientError so both the log tail (receiveLogs) and the subscribe stream (WaitServiceState) automatically retry with exponential backoff instead of surfacing a fatal error. The RetryDelayer backs off up to 1 minute, which aligns with the quota window reset. Co-Authored-By: Claude Sonnet 4.6 --- src/pkg/cli/subscribe.go | 9 ++++++++- src/pkg/cli/subscribe_test.go | 7 +++++++ src/pkg/cli/tail.go | 2 +- src/pkg/cli/tail_test.go | 31 +++++++++++++++++++++++++++++++ src/pkg/cli/waitForCdTaskExit.go | 3 +-- 5 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/pkg/cli/subscribe.go b/src/pkg/cli/subscribe.go index be00a1e37..cda9b1b40 100644 --- a/src/pkg/cli/subscribe.go +++ b/src/pkg/cli/subscribe.go @@ -5,6 +5,7 @@ import ( "errors" "iter" + "connectrpc.com/connect" "github.com/DefangLabs/defang/src/pkg/cli/client" "github.com/DefangLabs/defang/src/pkg/term" "github.com/DefangLabs/defang/src/pkg/types" @@ -52,8 +53,14 @@ func WaitServiceState( return serviceStates, nil } if err != nil { - // Reconnect on transient errors + // Reconnect on transient errors (including ResourceExhausted — quota resets within + // a minute and DelayBeforeRetry backs off exponentially up to 1 minute). if isTransientError(err) { + if connect.CodeOf(err) == connect.CodeResourceExhausted { + term.Warn("GCP logging quota exceeded; will retry subscribe stream after backoff.") + } else { + term.Debugf("WaitServiceState: transient error, reconnecting subscribe stream: %v", err) + } if err := provider.DelayBeforeRetry(ctx); err != nil { return serviceStates, err } diff --git a/src/pkg/cli/subscribe_test.go b/src/pkg/cli/subscribe_test.go index bc307e0fa..4df5b61dc 100644 --- a/src/pkg/cli/subscribe_test.go +++ b/src/pkg/cli/subscribe_test.go @@ -12,6 +12,8 @@ import ( "github.com/DefangLabs/defang/src/pkg/cli/client" "github.com/DefangLabs/defang/src/pkg/types" defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // mockSubscribeProvider mocks the provider for Subscribe. @@ -283,6 +285,11 @@ func TestWaitServiceStateStreamReceive(t *testing.T) { err: connect.NewError(connect.CodeInternal, errors.New("internal error")), expectRetry: true, }, + { + name: "stream receive returns resource exhausted error and retry to connect", + err: status.Error(codes.ResourceExhausted, "quota exceeded"), + expectRetry: true, + }, } for _, tt := range tests { diff --git a/src/pkg/cli/tail.go b/src/pkg/cli/tail.go index 1f1dcd43a..e62511d42 100644 --- a/src/pkg/cli/tail.go +++ b/src/pkg/cli/tail.go @@ -192,7 +192,7 @@ func isTransientError(err error) bool { // GCP grpc transient errors if st, ok := status.FromError(err); ok { - transientCodes := []codes.Code{codes.Unavailable, codes.Internal} + transientCodes := []codes.Code{codes.Unavailable, codes.Internal, codes.ResourceExhausted} if slices.Contains(transientCodes, st.Code()) { return true } diff --git a/src/pkg/cli/tail_test.go b/src/pkg/cli/tail_test.go index 09e45660b..f97c3c54f 100644 --- a/src/pkg/cli/tail_test.go +++ b/src/pkg/cli/tail_test.go @@ -22,10 +22,41 @@ import ( defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1" cwTypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" ) +func TestIsTransientError(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"eof", io.EOF, false}, + {"connect unavailable", connect.NewError(connect.CodeUnavailable, errors.New("unavailable")), true}, + {"connect internal non-wire", connect.NewError(connect.CodeInternal, errors.New("internal")), true}, + {"connect permission denied", connect.NewError(connect.CodePermissionDenied, errors.New("denied")), false}, + {"grpc unavailable", mustGRPCStatus(codes.Unavailable, "unavailable"), true}, + {"grpc internal", mustGRPCStatus(codes.Internal, "internal"), true}, + {"grpc resource exhausted", mustGRPCStatus(codes.ResourceExhausted, "quota exceeded"), true}, + {"grpc permission denied", mustGRPCStatus(codes.PermissionDenied, "denied"), false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isTransientError(tt.err); got != tt.want { + t.Errorf("isTransientError(%v) = %v, want %v", tt.err, got, tt.want) + } + }) + } +} + +func mustGRPCStatus(code codes.Code, msg string) error { + return grpcstatus.Error(code, msg) +} + func TestIsProgressDot(t *testing.T) { tests := []struct { name string diff --git a/src/pkg/cli/waitForCdTaskExit.go b/src/pkg/cli/waitForCdTaskExit.go index 13280f918..18fc558c9 100644 --- a/src/pkg/cli/waitForCdTaskExit.go +++ b/src/pkg/cli/waitForCdTaskExit.go @@ -7,7 +7,6 @@ import ( "time" "github.com/DefangLabs/defang/src/pkg/cli/client" - "github.com/DefangLabs/defang/src/pkg/term" ) var pollDuration = 2 * time.Second @@ -20,7 +19,7 @@ func WaitForCdTaskExit(ctx context.Context, provider client.Provider) error { select { case <-ticker.C: done, err := provider.GetDeploymentStatus(ctx) - term.Debugf("Polled CD task status: done=%v, err=%v", done, err) + // term.Debugf("Polled CD task status: done=%v, err=%v", done, err) if err != nil { // End condition: EOF indicates that the task has completed successfully if errors.Is(err, io.EOF) { From 32c637901b63ad3cb12be4b5180ca7caf0f9155d Mon Sep 17 00:00:00 2001 From: jordanstephens Date: Tue, 7 Apr 2026 14:30:07 -0700 Subject: [PATCH 3/3] log actual error --- src/pkg/cli/subscribe.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pkg/cli/subscribe.go b/src/pkg/cli/subscribe.go index cda9b1b40..7d0ddad96 100644 --- a/src/pkg/cli/subscribe.go +++ b/src/pkg/cli/subscribe.go @@ -57,7 +57,7 @@ func WaitServiceState( // a minute and DelayBeforeRetry backs off exponentially up to 1 minute). if isTransientError(err) { if connect.CodeOf(err) == connect.CodeResourceExhausted { - term.Warn("GCP logging quota exceeded; will retry subscribe stream after backoff.") + term.Warnf("quota exceeded; will retry subscribe stream after backoff: %v", err) } else { term.Debugf("WaitServiceState: transient error, reconnecting subscribe stream: %v", err) }