diff --git a/src/pkg/cli/client/byoc/gcp/stream.go b/src/pkg/cli/client/byoc/gcp/stream.go index e9b1e1abf..19fd0ea27 100644 --- a/src/pkg/cli/client/byoc/gcp/stream.go +++ b/src/pkg/cli/client/byoc/gcp/stream.go @@ -105,6 +105,9 @@ func (s *ServerStream[T]) Follow(start time.Time) (iter.Seq2[*T, error], error) yield(nil, err) return } + if entry == nil { + continue // empty Recv response (heartbeat/suppression), keep looping + } resps, err := s.parseAndFilter(entry) if err != nil { yield(nil, err) diff --git a/src/pkg/cli/client/byoc/gcp/stream_test.go b/src/pkg/cli/client/byoc/gcp/stream_test.go index 0ffc17009..325da78a7 100644 --- a/src/pkg/cli/client/byoc/gcp/stream_test.go +++ b/src/pkg/cli/client/byoc/gcp/stream_test.go @@ -1,9 +1,11 @@ package gcp import ( + "context" "iter" "strconv" "testing" + "time" "cloud.google.com/go/logging/apiv2/loggingpb" "github.com/DefangLabs/defang/src/pkg/clouds/gcp" @@ -179,3 +181,72 @@ func TestServerStream_Start(t *testing.T) { }) } } + +// TestServerStream_Follow_SkipsNilEntries verifies that Follow() skips nil entries +// returned by the tailer (heartbeat or suppression-info responses from GCP) and +// continues yielding real log entries without error. +func TestServerStream_Follow_SkipsNilEntries(t *testing.T) { + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + // instanceId avoids a nil-pointer dereference in the parser when Resource is absent. + svcLabels := map[string]string{"defang-service": "svc", "instanceId": "inst1"} + + realEntry := &loggingpb.LogEntry{ + Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "real log"}, + Labels: svcLabels, + Timestamp: timestamppb.Now(), + } + + // cancelEntry is a sentinel: when the tailer returns it, we cancel the context + // so the Follow loop exits cleanly rather than blocking forever. + cancelEntry := &loggingpb.LogEntry{ + Payload: &loggingpb.LogEntry_TextPayload{TextPayload: "cancel"}, + Labels: svcLabels, + Timestamp: timestamppb.Now(), + } + + tailerEntries := []*loggingpb.LogEntry{ + nil, // heartbeat — must be skipped + realEntry, + nil, // suppression info — must be skipped + cancelEntry, + } + + mockClient := &MockGcpLogsClient{ + lister: &MockGcpLoggingLister{}, + tailer: &MockGcpLoggingTailer{ + MockGcpLoggingLister: MockGcpLoggingLister{logEntries: tailerEntries}, + }, + } + + services := []string{"svc"} + restoreServiceName := getServiceNameRestorer(services, gcp.SafeLabelValue, + func(entry *defangv1.TailResponse) string { return entry.Service }, + func(entry *defangv1.TailResponse, name string) *defangv1.TailResponse { + entry.Service = name + return entry + }) + + stream := NewServerStream(ctx, mockClient, getLogEntryParser(ctx, mockClient), restoreServiceName) + stream.query = NewLogQuery(mockClient.GetProjectID()) + + seq, err := stream.Follow(time.Time{}) // zero start → skip listing, go straight to tail + assert.NoError(t, err) + + var messages []string + for resp, err := range seq { + assert.NoError(t, err) + if err != nil { + break + } + msg := resp.Entries[0].Message + messages = append(messages, msg) + if msg == "cancel" { + cancel() + } + } + + assert.Equal(t, []string{"real log", "cancel"}, messages, + "Follow() should skip nil tailer entries and yield real entries") +} diff --git a/src/pkg/cli/subscribe.go b/src/pkg/cli/subscribe.go index be00a1e37..7d0ddad96 100644 --- a/src/pkg/cli/subscribe.go +++ b/src/pkg/cli/subscribe.go @@ -5,6 +5,7 @@ import ( "errors" "iter" + "connectrpc.com/connect" "github.com/DefangLabs/defang/src/pkg/cli/client" "github.com/DefangLabs/defang/src/pkg/term" "github.com/DefangLabs/defang/src/pkg/types" @@ -52,8 +53,14 @@ func WaitServiceState( return serviceStates, nil } if err != nil { - // Reconnect on transient errors + // Reconnect on transient errors (including ResourceExhausted — quota resets within + // a minute and DelayBeforeRetry backs off exponentially up to 1 minute). if isTransientError(err) { + if connect.CodeOf(err) == connect.CodeResourceExhausted { + term.Warnf("quota exceeded; will retry subscribe stream after backoff: %v", err) + } else { + term.Debugf("WaitServiceState: transient error, reconnecting subscribe stream: %v", err) + } if err := provider.DelayBeforeRetry(ctx); err != nil { return serviceStates, err } diff --git a/src/pkg/cli/subscribe_test.go b/src/pkg/cli/subscribe_test.go index bc307e0fa..4df5b61dc 100644 --- a/src/pkg/cli/subscribe_test.go +++ b/src/pkg/cli/subscribe_test.go @@ -12,6 +12,8 @@ import ( "github.com/DefangLabs/defang/src/pkg/cli/client" "github.com/DefangLabs/defang/src/pkg/types" defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // mockSubscribeProvider mocks the provider for Subscribe. @@ -283,6 +285,11 @@ func TestWaitServiceStateStreamReceive(t *testing.T) { err: connect.NewError(connect.CodeInternal, errors.New("internal error")), expectRetry: true, }, + { + name: "stream receive returns resource exhausted error and retry to connect", + err: status.Error(codes.ResourceExhausted, "quota exceeded"), + expectRetry: true, + }, } for _, tt := range tests { diff --git a/src/pkg/cli/tail.go b/src/pkg/cli/tail.go index 1f1dcd43a..e62511d42 100644 --- a/src/pkg/cli/tail.go +++ b/src/pkg/cli/tail.go @@ -192,7 +192,7 @@ func isTransientError(err error) bool { // GCP grpc transient errors if st, ok := status.FromError(err); ok { - transientCodes := []codes.Code{codes.Unavailable, codes.Internal} + transientCodes := []codes.Code{codes.Unavailable, codes.Internal, codes.ResourceExhausted} if slices.Contains(transientCodes, st.Code()) { return true } diff --git a/src/pkg/cli/tail_test.go b/src/pkg/cli/tail_test.go index 09e45660b..f97c3c54f 100644 --- a/src/pkg/cli/tail_test.go +++ b/src/pkg/cli/tail_test.go @@ -22,10 +22,41 @@ import ( defangv1 "github.com/DefangLabs/defang/src/protos/io/defang/v1" cwTypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" ) +func TestIsTransientError(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"eof", io.EOF, false}, + {"connect unavailable", connect.NewError(connect.CodeUnavailable, errors.New("unavailable")), true}, + {"connect internal non-wire", connect.NewError(connect.CodeInternal, errors.New("internal")), true}, + {"connect permission denied", connect.NewError(connect.CodePermissionDenied, errors.New("denied")), false}, + {"grpc unavailable", mustGRPCStatus(codes.Unavailable, "unavailable"), true}, + {"grpc internal", mustGRPCStatus(codes.Internal, "internal"), true}, + {"grpc resource exhausted", mustGRPCStatus(codes.ResourceExhausted, "quota exceeded"), true}, + {"grpc permission denied", mustGRPCStatus(codes.PermissionDenied, "denied"), false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isTransientError(tt.err); got != tt.want { + t.Errorf("isTransientError(%v) = %v, want %v", tt.err, got, tt.want) + } + }) + } +} + +func mustGRPCStatus(code codes.Code, msg string) error { + return grpcstatus.Error(code, msg) +} + func TestIsProgressDot(t *testing.T) { tests := []struct { name string diff --git a/src/pkg/cli/waitForCdTaskExit.go b/src/pkg/cli/waitForCdTaskExit.go index 13280f918..18fc558c9 100644 --- a/src/pkg/cli/waitForCdTaskExit.go +++ b/src/pkg/cli/waitForCdTaskExit.go @@ -7,7 +7,6 @@ import ( "time" "github.com/DefangLabs/defang/src/pkg/cli/client" - "github.com/DefangLabs/defang/src/pkg/term" ) var pollDuration = 2 * time.Second @@ -20,7 +19,7 @@ func WaitForCdTaskExit(ctx context.Context, provider client.Provider) error { select { case <-ticker.C: done, err := provider.GetDeploymentStatus(ctx) - term.Debugf("Polled CD task status: done=%v, err=%v", done, err) + // term.Debugf("Polled CD task status: done=%v, err=%v", done, err) if err != nil { // End condition: EOF indicates that the task has completed successfully if errors.Is(err, io.EOF) { diff --git a/src/pkg/clouds/gcp/logging.go b/src/pkg/clouds/gcp/logging.go index c7b107e3d..992611e4f 100644 --- a/src/pkg/clouds/gcp/logging.go +++ b/src/pkg/clouds/gcp/logging.go @@ -62,7 +62,9 @@ func (t *gcpLoggingTailer) Next(ctx context.Context) (*loggingpb.LogEntry, error } t.cache = resp.GetEntries() if len(t.cache) == 0 { - return nil, errors.New("no log entries found") + // GCP may send empty responses (heartbeats, suppression info); return nil + // so the caller can continue looping without treating this as an error. + return nil, nil } } diff --git a/src/pkg/clouds/gcp/logging_test.go b/src/pkg/clouds/gcp/logging_test.go new file mode 100644 index 000000000..f88a49eba --- /dev/null +++ b/src/pkg/clouds/gcp/logging_test.go @@ -0,0 +1,89 @@ +package gcp + +import ( + "context" + "io" + "testing" + + "cloud.google.com/go/logging/apiv2/loggingpb" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// mockTailLogEntriesClient implements loggingpb.LoggingServiceV2_TailLogEntriesClient +// for unit testing gcpLoggingTailer.Next(). +type mockTailLogEntriesClient struct { + responses []*loggingpb.TailLogEntriesResponse + err error +} + +func (m *mockTailLogEntriesClient) Send(*loggingpb.TailLogEntriesRequest) error { return nil } +func (m *mockTailLogEntriesClient) Recv() (*loggingpb.TailLogEntriesResponse, error) { + if len(m.responses) == 0 { + if m.err != nil { + return nil, m.err + } + return nil, io.EOF + } + resp := m.responses[0] + m.responses = m.responses[1:] + return resp, nil +} +func (m *mockTailLogEntriesClient) Header() (metadata.MD, error) { return nil, nil } +func (m *mockTailLogEntriesClient) Trailer() metadata.MD { return nil } +func (m *mockTailLogEntriesClient) CloseSend() error { return nil } +func (m *mockTailLogEntriesClient) Context() context.Context { return context.Background() } +func (m *mockTailLogEntriesClient) SendMsg(any) error { return nil } +func (m *mockTailLogEntriesClient) RecvMsg(any) error { return nil } + +var _ grpc.ClientStream = (*mockTailLogEntriesClient)(nil) + +func TestGcpLoggingTailerNext_EmptyResponse(t *testing.T) { + // An empty-entries response (heartbeat or suppression info) must return nil, nil + // so the caller can continue looping without treating it as an error. + client := &mockTailLogEntriesClient{ + responses: []*loggingpb.TailLogEntriesResponse{ + {Entries: nil}, // empty — heartbeat + }, + } + tailer := &gcpLoggingTailer{tleClient: client} + + entry, err := tailer.Next(context.Background()) + if err != nil { + t.Fatalf("Next() error = %v, want nil", err) + } + if entry != nil { + t.Fatalf("Next() entry = %v, want nil", entry) + } +} + +func TestGcpLoggingTailerNext_WithEntries(t *testing.T) { + // A response with entries should return the first entry and cache the rest. + entries := []*loggingpb.LogEntry{ + {InsertId: "entry1"}, + {InsertId: "entry2"}, + } + client := &mockTailLogEntriesClient{ + responses: []*loggingpb.TailLogEntriesResponse{ + {Entries: entries}, + }, + } + tailer := &gcpLoggingTailer{tleClient: client} + + entry, err := tailer.Next(context.Background()) + if err != nil { + t.Fatalf("Next() error = %v, want nil", err) + } + if entry == nil || entry.InsertId != "entry1" { + t.Fatalf("Next() entry = %v, want entry1", entry) + } + + // Second call should return cached entry without calling Recv again. + entry, err = tailer.Next(context.Background()) + if err != nil { + t.Fatalf("Next() error = %v, want nil", err) + } + if entry == nil || entry.InsertId != "entry2" { + t.Fatalf("Next() entry = %v, want entry2", entry) + } +}