Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -554,6 +555,198 @@ func (*noopDataClient) Send(*fnpb.Elements) error {
return nil
}

func TestScopedDataManager_Open_Closed(t *testing.T) {
mgr := &DataChannelManager{}
s := NewScopedDataManager(mgr, "inst")
s.closed = true

port := exec.Port{URL: "test:1234"}
_, err := s.open(context.Background(), port)
if err == nil {
t.Error("expected error when opening closed ScopedDataManager")
}
}

func TestScopedDataManager_CloseEmpty(t *testing.T) {
mgr := &DataChannelManager{}
s := NewScopedDataManager(mgr, "inst")

err := s.Close()
if err != nil {
t.Errorf("Close on empty ScopedDataManager returned error: %v", err)
}
if s.mgr != nil {
t.Error("expected mgr to be nil after Close")
}
}

func TestElementsChan_InstructionEnded(t *testing.T) {
ec := &elementsChan{
ch: make(chan exec.Elements, 1),
done: make(chan struct{}),
}
ec.InstructionEnded()

select {
case <-ec.done:
// ok, channel was closed
default:
t.Error("InstructionEnded did not close done channel")
}
}

func TestElementsChan_Closed(t *testing.T) {
ec := &elementsChan{}
if ec.Closed() {
t.Error("Closed returned true for zero-value elementsChan")
}
ec.ch = make(chan exec.Elements, 1)
close(ec.ch)
atomic.StoreUint32(&ec.closed, 1)
if !ec.Closed() {
t.Error("Closed returned false after setting closed flag")
}
}

func TestElementsChan_PTransformDone(t *testing.T) {
t.Run("singleTransform", func(t *testing.T) {
ec := &elementsChan{
ch: make(chan exec.Elements, 1),
want: 1,
}
ec.PTransformDone()
if !ec.Closed() {
t.Error("expected channel to be closed after all transforms done")
}
})
t.Run("multipleTransforms_notYetDone", func(t *testing.T) {
ec := &elementsChan{
ch: make(chan exec.Elements, 1),
want: 2,
}
ec.PTransformDone()
if ec.Closed() {
t.Error("expected channel to stay open before all transforms done")
}
})
t.Run("multipleTransforms_allDone", func(t *testing.T) {
ec := &elementsChan{
ch: make(chan exec.Elements, 1),
want: 2,
}
ec.PTransformDone()
ec.PTransformDone()
if !ec.Closed() {
t.Error("expected channel to be closed after all transforms done")
}
})
}

func TestDataChannel_terminateStreamOnError(t *testing.T) {
ctx, cancelFn := context.WithCancel(context.Background())
recreated := false
expectedErr := fmt.Errorf("test error")
// Use a noopDataClient as the underlying client to avoid nil pointer panics
c := makeDataChannel(ctx, "id", &noopDataClient{}, cancelFn)
c.forceRecreate = func(id string, err error) {
recreated = true
}
c.terminateStreamOnError(expectedErr)

// The cancelFn should have been called, so the context should be cancelled.
select {
case <-ctx.Done():
// expected
case <-time.After(time.Second):
t.Error("context not cancelled after terminateStreamOnError")
}
if !recreated {
t.Error("forceRecreate was not called")
}
}

func TestScopedDataManager_OpenWrite_CachedPort(t *testing.T) {
dc := &DataChannel{
id: "test:1234",
writers: map[instructionID]map[string]*dataWriter{},
}
mgr := &DataChannelManager{
ports: map[string]*DataChannel{"test:1234": dc},
}
s := NewScopedDataManager(mgr, "inst")
w, err := s.OpenWrite(context.Background(), exec.StreamID{
Port: exec.Port{URL: "test:1234"},
PtransformID: "ptr",
})
if err != nil {
t.Errorf("OpenWrite returned error: %v", err)
}
if w == nil {
t.Error("OpenWrite returned nil writer")
}
}

func TestScopedDataManager_OpenElementChan_CachedPort(t *testing.T) {
dc := &DataChannel{
id: "test:1234",
channels: map[instructionID]*elementsChan{},
endedInstructions: map[instructionID]struct{}{},
}
mgr := &DataChannelManager{
ports: map[string]*DataChannel{"test:1234": dc},
}
s := NewScopedDataManager(mgr, "inst")
ch, err := s.OpenElementChan(context.Background(), exec.StreamID{
Port: exec.Port{URL: "test:1234"},
PtransformID: "ptr",
}, nil)
if err != nil {
t.Errorf("OpenElementChan returned error: %v", err)
}
if ch == nil {
t.Error("OpenElementChan returned nil channel")
}
}

func TestScopedDataManager_OpenTimerWrite_CachedPort(t *testing.T) {
dc := &DataChannel{
id: "test:1234",
timerWriters: map[instructionID]map[timerKey]*timerWriter{},
}
mgr := &DataChannelManager{
ports: map[string]*DataChannel{"test:1234": dc},
}
s := NewScopedDataManager(mgr, "inst")
w, err := s.OpenTimerWrite(context.Background(), exec.StreamID{
Port: exec.Port{URL: "test:1234"},
PtransformID: "ptr",
}, "family1")
if err != nil {
t.Errorf("OpenTimerWrite returned error: %v", err)
}
if w == nil {
t.Error("OpenTimerWrite returned nil writer")
}
}

func TestDataChannelManager_CloseInstruction(t *testing.T) {
t.Run("empty", func(t *testing.T) {
mgr := &DataChannelManager{}
err := mgr.closeInstruction("inst", nil)
if err != nil {
t.Errorf("CloseInstruction on empty returned error: %v", err)
}
})
t.Run("unknownPort", func(t *testing.T) {
mgr := &DataChannelManager{
ports: map[string]*DataChannel{},
}
err := mgr.closeInstruction("inst", []exec.Port{{URL: "unknown"}})
if err != nil {
t.Errorf("CloseInstruction on unknown port returned error: %v", err)
}
})
}
func BenchmarkDataWriter(b *testing.B) {
fourB := []byte{42, 23, 78, 159}
sixteenB := bytes.Repeat(fourB, 4)
Expand Down
128 changes: 128 additions & 0 deletions sdks/go/pkg/beam/core/runtime/harness/harness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
Expand Down Expand Up @@ -166,6 +167,24 @@ func TestControl_getOrCreatePlan(t *testing.T) {

}

func TestFail(t *testing.T) {
ctx := context.Background()
resp := fail(ctx, "test-id", "error %s %d", "msg", 42)

if resp == nil {
t.Fatal("fail returned nil")
}
if got, want := resp.GetInstructionId(), "test-id"; got != want {
t.Errorf("got InstructionId %v, want %v", got, want)
}
if !strings.Contains(resp.GetError(), "error msg 42") {
t.Errorf("got Error %v, want to contain 'error msg 42'", resp.GetError())
}
if resp.GetRegister() == nil {
t.Error("expected Register dummy response to be non-nil")
}
}

func TestCircleBuffer(t *testing.T) {
expected1 := instructionID("expected1")
expected2 := instructionID("expected2")
Expand Down Expand Up @@ -258,3 +277,112 @@ func TestElementProcessingTimeoutParsing(t *testing.T) {
}
}
}

func TestControl_MetStoreToString(t *testing.T) {
ctx := metrics.SetBundleID(context.Background(), "test-bundle")
store := metrics.GetStore(ctx)
if store == nil {
t.Fatal("GetStore returned nil")
}
ctrl := &control{
metStore: map[instructionID]*metrics.Store{
"inst1": store,
},
}
b := &strings.Builder{}
ctrl.metStoreToString(b)
out := b.String()
if !strings.Contains(out, "Bundle ID: inst1") {
t.Errorf("metStoreToString output missing bundle ID, got: %s", out)
}
}

func TestControl_GetPlanOrResponse(t *testing.T) {
tests := []struct {
name string
active map[instructionID]*exec.Plan
awaitFinalize map[instructionID]awaitingFinalization
failed map[instructionID]error
inactive circleBuffer
wantErr bool // response has Error field set (non-nil response)
wantNilPlan bool // response is non-nil and plan is nil -> response is returned
wantEmpty bool // both plan and response are nil - empty response needed
}{
{
name: "active",
active: map[instructionID]*exec.Plan{
"ref": {},
},
},
{
name: "awaitingFinalization",
awaitFinalize: map[instructionID]awaitingFinalization{
"ref": {plan: &exec.Plan{}},
},
},
{
name: "failed",
failed: map[instructionID]error{"ref": fmt.Errorf("test failure")},
wantErr: true,
},
{
name: "inactive",
inactive: func() circleBuffer {
c := newCircleBuffer()
c.Add("ref")
return c
}(),
wantEmpty: true,
},
{
name: "notFound",
wantErr: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctrl := &control{
active: make(map[instructionID]*exec.Plan),
awaitingFinalization: make(map[instructionID]awaitingFinalization),
failed: make(map[instructionID]error),
inactive: newCircleBuffer(),
}
if test.active != nil {
ctrl.active = test.active
}
if test.awaitFinalize != nil {
ctrl.awaitingFinalization = test.awaitFinalize
}
if test.failed != nil {
ctrl.failed = test.failed
}
if len(test.inactive.buf) > 0 {
ctrl.inactive = test.inactive
}
Comment on lines +360 to +362

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of checking the length of the unexported buf field of circleBuffer to decide whether to assign it, you can directly assign ctrl.inactive = test.inactive unconditionally. The zero-value of circleBuffer is safe to assign and this avoids coupling the test runner to the internal implementation details of circleBuffer.

			ctrl.inactive = test.inactive


plan, store, resp := ctrl.getPlanOrResponse(context.Background(), "test", "instID", "ref")

if test.wantEmpty {
if plan != nil || store != nil || resp != nil {
t.Error("expected all nil for inactive instruction")
}
return
}

if test.wantErr {
if resp == nil {
t.Fatal("expected non-nil error response")
}
if resp.Error == "" {
t.Error("expected error in response")
}
return
}

if plan == nil {
t.Error("expected non-nil plan")
}
})
}
}
20 changes: 20 additions & 0 deletions sdks/go/pkg/beam/core/runtime/harness/logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,23 @@ func TestLogger_connect(t *testing.T) {
t.Errorf("missing messages: got %v, want %v", got, want)
}
}

func TestConvertSeverity(t *testing.T) {
tests := []struct {
in log.Severity
out fnpb.LogEntry_Severity_Enum
}{
{log.SevDebug, fnpb.LogEntry_Severity_DEBUG},
{log.SevInfo, fnpb.LogEntry_Severity_INFO},
{log.SevWarn, fnpb.LogEntry_Severity_WARN},
{log.SevError, fnpb.LogEntry_Severity_ERROR},
{log.SevFatal, fnpb.LogEntry_Severity_CRITICAL},
{log.Severity(999), fnpb.LogEntry_Severity_INFO}, // default case
}
for _, test := range tests {
got := convertSeverity(test.in)
if got != test.out {
t.Errorf("convertSeverity(%v) = %v, want %v", test.in, got, test.out)
}
}
}
Loading
Loading