diff --git a/pkg/sip/protocol.go b/pkg/sip/protocol.go index fb17e908..bb1ab583 100644 --- a/pkg/sip/protocol.go +++ b/pkg/sip/protocol.go @@ -40,73 +40,72 @@ var ( referIdRegexp = regexp.MustCompile(`^refer(;id=(\d+))?$`) ) -// TODO: Add String method to sipgo.StatusCode var statusNamesMap = map[int]string{ 100: "Trying", 180: "Ringing", - 181: "CallIsForwarded", + 181: "Call Is Forwarded", 182: "Queued", - 183: "SessionInProgress", + 183: "Session In Progress", 200: "OK", 202: "Accepted", - 301: "MovedPermanently", - 302: "MovedTemporarily", - 305: "UseProxy", + 301: "Moved Permanently", + 302: "Moved Temporarily", + 305: "Use Proxy", - 400: "BadRequest", + 400: "Bad Request", 401: "Unauthorized", - 402: "PaymentRequired", + 402: "Payment Required", 403: "Forbidden", - 404: "NotFound", - 405: "MethodNotAllowed", - 406: "NotAcceptable", - 407: "ProxyAuthRequired", - 408: "RequestTimeout", + 404: "Not Found", + 405: "Method Not Allowed", + 406: "Not Acceptable", + 407: "Proxy Auth Required", + 408: "Request Timeout", 409: "Conflict", 410: "Gone", - 413: "RequestEntityTooLarge", - 414: "RequestURITooLong", - 415: "UnsupportedMediaType", - 416: "RequestedRangeNotSatisfiable", - 420: "BadExtension", - 421: "ExtensionRequired", - 423: "IntervalToBrief", - 480: "TemporarilyUnavailable", - 481: "CallTransactionDoesNotExists", - 482: "LoopDetected", - 483: "TooManyHops", - 484: "AddressIncomplete", + 413: "Request Entity Too Large", + 414: "Request URI Too Long", + 415: "Unsupported Media Type", + 416: "Requested Range Not Satisfiable", + 420: "Bad Extension", + 421: "Extension Required", + 423: "Interval Too Brief", + 480: "Temporarily Unavailable", + 481: "Call Transaction Does Not Exists", + 482: "Loop Detected", + 483: "Too Many Hops", + 484: "Address Incomplete", 485: "Ambiguous", - 486: "BusyHere", - 487: "RequestTerminated", - 488: "NotAcceptableHere", - - 500: "InternalServerError", - 501: "NotImplemented", - 502: "BadGateway", - 503: "ServiceUnavailable", - 504: "GatewayTimeout", - 505: "VersionNotSupported", - 513: "MessageTooLarge", - - 600: "GlobalBusyEverywhere", - 603: "GlobalDecline", - 604: "GlobalDoesNotExistAnywhere", - 606: "GlobalNotAcceptable", + 486: "Busy Here", + 487: "Request Terminated", + 488: "Not Acceptable Here", + + 500: "Internal Server Error", + 501: "Not Implemented", + 502: "Bad Gateway", + 503: "Service Unavailable", + 504: "Gateway Timeout", + 505: "Version Not Supported", + 513: "Message Too Large", + + 600: "Global Busy Everywhere", + 603: "Global Decline", + 604: "Global Does Not Exist Anywhere", + 606: "Global Not Acceptable", } func sipStatus(code sip.StatusCode) string { if name := statusNamesMap[int(code)]; name != "" { return name } - return fmt.Sprintf("Status%d", int(code)) + return fmt.Sprintf("Status %d", int(code)) } func statusName(status int) string { if name := statusNamesMap[status]; name != "" { - return fmt.Sprintf("%d-%s", status, name) + return fmt.Sprintf("%d-%s", status, strings.ReplaceAll(name, " ", "")) } return fmt.Sprintf("status-%d", status) } diff --git a/pkg/sip/service.go b/pkg/sip/service.go index cf47df1b..fb86c3a4 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -28,6 +28,7 @@ import ( "slices" "strings" "sync" + "sync/atomic" "time" "github.com/livekit/sipgo/transport" @@ -50,6 +51,7 @@ import ( type PendingTransfer struct { CallID string TransferTo string + Error atomic.Pointer[error] Done chan error } @@ -372,8 +374,9 @@ func (s *Service) transferSIPParticipant(ctx context.Context, req *rpc.InternalT select { case pending.Done <- err: default: - s.log.Errorw("pending transfer channel is full, dropping error", err, "callID", req.SipCallId, "transferTo", req.TransferTo) + s.log.Errorw("pending transfer received more than one error", err, "callID", req.SipCallId, "transferTo", req.TransferTo) } + pending.Error.Store(&err) close(pending.Done) s.mu.Lock() @@ -384,6 +387,13 @@ func (s *Service) transferSIPParticipant(ctx context.Context, req *rpc.InternalT select { case err := <-pending.Done: + if err == nil { + // If there is more than one RPC call waiting on the result, + // this ensures we return the same error to all callers. + if pErr := pending.Error.Load(); pErr != nil { + err = *pErr + } + } return &emptypb.Empty{}, err case <-ctx.Done(): return &emptypb.Empty{}, psrpc.NewError(psrpc.Canceled, ctx.Err()) diff --git a/pkg/sip/signaling_test.go b/pkg/sip/signaling_test.go index 092faa95..389f4255 100644 --- a/pkg/sip/signaling_test.go +++ b/pkg/sip/signaling_test.go @@ -16,12 +16,14 @@ import ( "errors" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/durationpb" "github.com/livekit/media-sdk/sdp" "github.com/livekit/mediatransportutil/pkg/rtcconfig" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" "github.com/livekit/protocol/rpc" + "github.com/livekit/psrpc" "github.com/livekit/sipgo" "github.com/livekit/sipgo/sip" @@ -197,7 +199,7 @@ func (s *sipUATest) serveTransport(t *testing.T, transport string) error { return nil } -func (s *sipUATest) RegisterSink(localTag string, method string) chan *sipUARequest { +func (s *sipUATest) RegisterSink(localTag string, method string) <-chan *sipUARequest { s.mu.Lock() defer s.mu.Unlock() dialogMethods, ok := s.sinks[localTag] @@ -296,6 +298,7 @@ var index atomic.Int32 type sipUADialogTest struct { TestUA *sipUATest + isUAS bool callID string localUser string remoteUser string @@ -308,10 +311,11 @@ type sipUADialogTest struct { routeSet []sip.Uri } -func newTestCall(testUA *sipUATest, forceRemoteTag bool) *sipUADialogTest { +func newTestCall(testUA *sipUATest, isUAS bool) *sipUADialogTest { i := index.Add(1) d := &sipUADialogTest{ TestUA: testUA, + isUAS: isUAS, callID: fmt.Sprintf("callID-%d", i), localUser: fmt.Sprintf("localTestUser-%d", i), remoteUser: fmt.Sprintf("remoteTestUser-%d", i), @@ -320,7 +324,8 @@ func newTestCall(testUA *sipUATest, forceRemoteTag bool) *sipUADialogTest { localCseq: 1, remoteCseq: 0, // Needs to be set by remote } - if forceRemoteTag { + if isUAS { + // sipUADialogTest generates CreateSipParticipantRequest, which needs the tag as input. d.remoteTag = LocalTag(fmt.Sprintf("remoteTestTag-%d", i)) } return d @@ -396,11 +401,24 @@ func (d *sipUADialogTest) CreateSipParticipantRequest() *rpc.InternalCreateSIPPa return req } +func (d *sipUADialogTest) TransactionRequest(t *testing.T, req *sip.Request) *sip.Response { + return d.TestUA.TransactionRequest(t, req, !d.isUAS) +} + +func (d *sipUADialogTest) RegisterRequestChannel(method string) <-chan *sipUARequest { + return d.TestUA.RegisterSink(d.localTag, method) +} + +func (d *sipUADialogTest) UnregisterRequestChannel(method string) { + d.TestUA.UnregisterSink(d.localTag, method) +} + type serviceTest struct { TestUA *sipUATest Server *Server // Processing inbound calls and SIP Client *Client // Processing outbound calls Handler Handler + Service *Service } type serviceTestConfig struct { @@ -450,7 +468,9 @@ func NewServiceTest(t *testing.T, options *serviceTestConfig) *serviceTest { conf, log, mon, - func(projectID string, _ *rpc.SIPCallObservability, _ *livekit.SIPCallInfo) StateHandler { return NewRPCStateHandler(&MockIOInfoClient{}) }, + func(projectID string, _ *rpc.SIPCallObservability, _ *livekit.SIPCallInfo) StateHandler { + return NewRPCStateHandler(&MockIOInfoClient{}) + }, WithGetRoomClient(options.GetRoom), ) srv := NewServer( @@ -458,7 +478,9 @@ func NewServiceTest(t *testing.T, options *serviceTestConfig) *serviceTest { conf, log, mon, - func(projectID string, _ *rpc.SIPCallObservability, _ *livekit.SIPCallInfo) StateHandler { return NewRPCStateHandler(&MockIOInfoClient{}) }, + func(projectID string, _ *rpc.SIPCallObservability, _ *livekit.SIPCallInfo) StateHandler { + return NewRPCStateHandler(&MockIOInfoClient{}) + }, WithGetRoomServer(options.GetRoom), WithClient(cli), ) @@ -484,11 +506,20 @@ func NewServiceTest(t *testing.T, options *serviceTestConfig) *serviceTest { addr := netip.AddrPortFrom(loopback, uint16(sipPort)) + service := &Service{ + conf: conf, + log: log, + mon: mon, + cli: cli, + srv: srv, + pendingTransfers: make(map[LocalTag]*PendingTransfer), + } return &serviceTest{ TestUA: newUATest(t, srv.log, addr, withUATestBuffer(3)), Server: srv, Client: cli, Handler: handler, + Service: service, } } @@ -541,7 +572,7 @@ func (st *serviceTest) CreateOutboundCall(t *testing.T, opts ...createCallTestOp t.Helper() newInviteSink := st.TestUA.RegisterSink("", "INVITE") - var reqSink chan *sipUARequest + var reqSink <-chan *sipUARequest defer st.TestUA.UnregisterSink("", "INVITE") call := newTestCall(st.TestUA, true) @@ -703,272 +734,332 @@ func TestReinvite(t *testing.T) { } func TestTransfer(t *testing.T) { - t.Run("inbound", func(t *testing.T) { - prepFunc := func(t *testing.T) (*serviceTest, *sipUADialogTest, *inboundCall) { - t.Helper() - st := NewServiceTest(t, nil) - call, ic := st.CreateInboundCall(t) - return st, call, ic + const referTo = "tel:+15551234567" + + expectHeaders := func(referTo string, headers map[string]string) []sip.Header { + expected := []sip.Header{sip.NewHeader("Refer-To", fmt.Sprintf("<%s>", referTo))} + for name, value := range headers { + expected = append(expected, sip.NewHeader(name, value)) } + return expected + } - startTransfer := func(ctx context.Context, ic *inboundCall, to string, headers map[string]string, dialtone bool) <-chan error { - transferRes := make(chan error, 1) - go func() { - defer close(transferRes) - err := ic.transferCall(ctx, to, headers, dialtone) - if err != nil { - transferRes <- err + handleRefer := func(t *testing.T, ctx context.Context, refChan <-chan *sipUARequest, call *sipUADialogTest, referStatus int, validateHeaders []sip.Header) error { + t.Helper() + select { + case <-ctx.Done(): + return fmt.Errorf("test aborted without receiving a REFER request: %v", ctx.Err()) + case msg := <-refChan: + require.NotNil(t, msg) + require.Equal(t, sip.REFER, msg.req.Method) + require.Equal(t, call.localTag, msg.req.To().Params.GetOr("tag", "")) + for _, expectedHeader := range validateHeaders { + reqHeaders := msg.req.GetHeaders(expectedHeader.Name()) + found := false + for _, reqHeader := range reqHeaders { + if reqHeader.Value() == expectedHeader.Value() { + found = true + break + } } - }() - return transferRes + require.True(t, found, "REFER request should contain the expected header %s: %s, instead got: %v", expectedHeader.Name(), expectedHeader.Value(), reqHeaders) + } + t.Logf("Received REFER request, responding REFER-%d %s", referStatus, sipStatus(referStatus)) + resp := sip.NewResponseFromRequest(msg.req, referStatus, sipStatus(referStatus), nil) + return msg.tx.Respond(resp) } + } - t.Run("normal", func(t *testing.T) { - st, call, ic := prepFunc(t) - reqChan := st.TestUA.RegisterSink(call.localTag, "") - ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) - defer cancel() - - transferRes := startTransfer(ctx, ic, "tel:+15551234567", nil, false) - + sendNotify := func(t *testing.T, ctx context.Context, call *sipUADialogTest, notifyStatuses []int) error { + t.Helper() + require.NotEmpty(t, notifyStatuses) + for i, status := range notifyStatuses { select { - case msg := <-reqChan: - require.Equal(t, sip.REFER, msg.req.Method) - require.Equal(t, call.localTag, msg.req.To().Params.GetOr("tag", "")) - hdrs := msg.req.GetHeaders("Refer-To") - require.Equal(t, 1, len(hdrs), "should be 1 Refer-To header") - require.Equal(t, "", hdrs[0].Value()) - resp := sip.NewResponseFromRequest(msg.req, 202, "Accepted", nil) - msg.tx.Respond(resp) - // OK, proceed - case err := <-transferRes: - t.Fatal("error transferring call, unexpected transfer API response", err) case <-ctx.Done(): - t.Fatal("timeout waiting for REFER to arrive") + return fmt.Errorf("test aborted without ending remaining %d NOTIFY requests: %v", len(notifyStatuses)-i, ctx.Err()) + case <-time.After(5 * time.Millisecond): } - + t.Logf("Sending NOTIFY request carrying INVITE-%d response", status) notifyReq := call.NewRequest(sip.NOTIFY) notifyReq.AppendHeader(sip.NewHeader("Event", "refer")) notifyReq.AppendHeader(sip.NewHeader("Content-Type", "message/sipfrag")) - notifyReq.SetBody([]byte(sip.NewResponse(200, "OK").String())) - - resp := st.TestUA.TransactionRequest(t, notifyReq, true) - require.Equal(t, sip.StatusCode(200), resp.StatusCode, "Expecting 200 OK") + notifyReq.SetBody([]byte(sip.NewResponse(status, sipStatus(status)).String())) + notifyResp := call.TransactionRequest(t, notifyReq) + t.Logf("Received NOTIFY-%d %s response", notifyResp.StatusCode, notifyResp.Reason) + require.Equal(t, 200, notifyResp.StatusCode, "Expecting 200 OK response to NOTIFY") + } + t.Log("All NOTIFY requests sent") + return nil + } - select { - case err := <-transferRes: - if err != nil { - t.Fatal("error transferring call, unexpected transfer API response", err) - } - case <-ctx.Done(): - t.Fatal("timeout waiting for BYE to arrive") - } - }) + handleBye := func(t *testing.T, ctx context.Context, reqChan <-chan *sipUARequest, call *sipUADialogTest) error { + // This will receive the BYE request, respond with 200 OK. + t.Helper() + select { + case <-ctx.Done(): + return fmt.Errorf("test aborted without receiving a BYE request: %v", ctx.Err()) + case msg := <-reqChan: + require.NotNil(t, msg) + require.Equal(t, sip.BYE, msg.req.Method) + require.Equal(t, call.localTag, msg.req.To().Params.GetOr("tag", "")) + t.Logf("Received BYE") + resp := sip.NewResponseFromRequest(msg.req, 200, sipStatus(200), nil) + return msg.tx.Respond(resp) + } + } - t.Run("with headers", func(t *testing.T) { - st, call, ic := prepFunc(t) - reqChan := st.TestUA.RegisterSink(call.localTag, "") - ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) - defer cancel() + sendBye := func(t *testing.T, call *sipUADialogTest) error { + t.Helper() + t.Logf("Sending BYE") + resp := call.TransactionRequest(t, call.NewRequest(sip.BYE)) + t.Logf("Received BYE-%d %s response", resp.StatusCode, resp.Reason) + require.Equal(t, sip.StatusCode(200), resp.StatusCode, "Expecting BYE-200 OK") + return nil + } - headers := map[string]string{ - "X-Custom-Header": "custom-value", + startTransfer := func(t *testing.T, ctx context.Context, st *serviceTest, call *sipUADialogTest, to string, headers map[string]string, dialtone bool) <-chan error { + t.Helper() + done := make(chan error, 1) + go func() { + defer close(done) + transferReq := &rpc.InternalTransferSIPParticipantRequest{ + SipCallId: string(call.remoteTag), + TransferTo: to, + Headers: headers, + PlayDialtone: dialtone, } - transferRes := startTransfer(ctx, ic, "tel:+15551234567", headers, false) - - select { - case msg := <-reqChan: - require.Equal(t, sip.REFER, msg.req.Method) - require.Equal(t, call.localTag, msg.req.To().Params.GetOr("tag", "")) - hdrs := msg.req.GetHeaders("Refer-To") - require.Equal(t, 1, len(hdrs), "should be 1 Refer-To header") - require.Equal(t, "", hdrs[0].Value()) - hdr := msg.req.GetHeaders("X-Custom-Header") - require.Equal(t, 1, len(hdr), "should be 1 X-Custom-Header") - require.Equal(t, "custom-value", hdr[0].Value()) - resp := sip.NewResponseFromRequest(msg.req, 480, "Temporarily Unavailable", nil) - msg.tx.Respond(resp) - case err := <-transferRes: - t.Fatal("error transferring call, unexpected transfer API response", err) - case <-ctx.Done(): - t.Fatal("timeout waiting for REFER to arrive") + if deadline, ok := ctx.Deadline(); ok { + transferReq.RingingTimeout = durationpb.New(time.Until(deadline) + (5 * time.Millisecond)) } - }) + _, err := st.Service.TransferSIPParticipant(ctx, transferReq) + if err != nil { + done <- err + } + }() + return done + } - t.Run("bye", func(t *testing.T) { - // After REFER gets 202 we wait on NOTIFY; remote hangs up with BYE instead. - st, call, ic := prepFunc(t) - reqChan := st.TestUA.RegisterSink(call.localTag, "") - ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) - defer cancel() + directions := map[string]func(t *testing.T, st *serviceTest) *sipUADialogTest{ + "inbound": func(t *testing.T, st *serviceTest) *sipUADialogTest { + call, _ := st.CreateInboundCall(t) + return call + }, + "outbound": func(t *testing.T, st *serviceTest) *sipUADialogTest { + call, _, _ := st.CreateOutboundCall(t) + return call + }, + } - transferRes := startTransfer(ctx, ic, "tel:+15551234567", nil, false) + for direction, setupCall := range directions { + t.Run(direction, func(t *testing.T) { + t.Run("normal", func(t *testing.T) { + st := NewServiceTest(t, nil) + call := setupCall(t, st) + + reqChan := call.RegisterRequestChannel("") + defer call.UnregisterRequestChannel("") + + ctx, cancel := context.WithTimeout(t.Context(), time.Second*3) + defer cancel() + + transferRes := startTransfer(t, ctx, st, call, referTo, nil, false) + + err := handleRefer(t, ctx, reqChan, call, 202, expectHeaders(referTo, nil)) + require.NoError(t, err, "Failed to process REFER request") + err = sendNotify(t, ctx, call, []int{100, 180, 200}) + require.NoError(t, err, "Failed to send NOTIFY requests") + err = handleBye(t, ctx, reqChan, call) // Expexting BYE after successful transfer + require.NoError(t, err, "Failed to process BYE request") + select { + case err := <-transferRes: + require.NoError(t, err, "error transferring call, unexpected transfer API response") + case <-ctx.Done(): + require.NoError(t, ctx.Err(), "timeout waiting for BYE to arrive") + } + }) - select { - case msg := <-reqChan: - require.Equal(t, sip.REFER, msg.req.Method) - require.Equal(t, call.localTag, msg.req.To().Params.GetOr("tag", "")) - hdrs := msg.req.GetHeaders("Refer-To") - require.Equal(t, 1, len(hdrs), "should be 1 Refer-To header") - require.Equal(t, "", hdrs[0].Value()) - resp := sip.NewResponseFromRequest(msg.req, 202, "Accepted", nil) - msg.tx.Respond(resp) - // OK, proceed - case err := <-transferRes: - t.Fatal("error transferring call, unexpected transfer API response", err) - case <-ctx.Done(): - t.Fatal("timeout waiting for REFER to arrive") - } + t.Run("noraml_concurrent", func(t *testing.T) { + // Same as normal, but with multiple concurrent transfers API requests. + st := NewServiceTest(t, nil) + call := setupCall(t, st) - byeReq := call.NewRequest(sip.BYE) - resp := st.TestUA.TransactionRequest(t, byeReq, true) - require.Equal(t, sip.StatusCode(200), resp.StatusCode, "Expecting 200 OK") + reqChan := call.RegisterRequestChannel("") + defer call.UnregisterRequestChannel("") - select { - case err := <-transferRes: - if err != nil { - t.Fatal("error transferring call, unexpected transfer API response", err) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*3) + defer cancel() + + transferResults := []<-chan error{ + startTransfer(t, ctx, st, call, referTo, nil, false), + startTransfer(t, ctx, st, call, referTo, nil, false), + startTransfer(t, ctx, st, call, referTo, nil, false), } - case <-ctx.Done(): - t.Fatal("timeout waiting for BYE to arrive") - } - }) - }) - t.Run("outbound", func(t *testing.T) { - prepFunc := func(t *testing.T) (*serviceTest, *sipUADialogTest, *outboundCall) { - t.Helper() - st := NewServiceTest(t, nil) - call, oc, _ := st.CreateOutboundCall(t) - return st, call, oc - } + err := handleRefer(t, ctx, reqChan, call, 202, expectHeaders(referTo, nil)) + require.NoError(t, err, "Failed to process REFER request") + err = sendNotify(t, ctx, call, []int{100, 180, 200}) + require.NoError(t, err, "Failed to send NOTIFY requests") + err = handleBye(t, ctx, reqChan, call) // Expexting BYE after successful transfer + require.NoError(t, err, "Failed to process BYE request") + for _, transferRes := range transferResults { + select { + case err := <-transferRes: + require.NoError(t, err, "error transferring call, unexpected transfer API response") + case <-ctx.Done(): + require.NoError(t, ctx.Err(), "timeout waiting for BYE to arrive") + } + } + }) - startTransfer := func(t *testing.T, ctx context.Context, oc *outboundCall, to string, headers map[string]string, dialtone bool) <-chan error { - transferRes := make(chan error, 1) - go func() { - defer close(transferRes) - t.Logf("startTransfer") - err := oc.transferCall(ctx, to, headers, dialtone) - t.Logf("startTransfer done") - if err != nil { - t.Logf("startTransfer error") - transferRes <- err + t.Run("with headers", func(t *testing.T) { + st := NewServiceTest(t, nil) + call := setupCall(t, st) + + reqChan := call.RegisterRequestChannel("") + defer call.UnregisterRequestChannel("") + + ctx, cancel := context.WithTimeout(t.Context(), time.Second*3) + defer cancel() + + headers := map[string]string{ + "X-Custom-Header": "custom-value", } - }() - return transferRes - } + transferRes := startTransfer(t, ctx, st, call, referTo, headers, false) + + err := handleRefer(t, ctx, reqChan, call, 202, expectHeaders(referTo, headers)) + require.NoError(t, err, "Failed to process REFER request") + err = sendNotify(t, ctx, call, []int{100, 180, 200}) + require.NoError(t, err, "Failed to send NOTIFY requests") + err = handleBye(t, ctx, reqChan, call) // Expexting BYE after successful transfer + require.NoError(t, err, "Failed to process BYE request") + select { + case err := <-transferRes: + require.NoError(t, err, "error transferring call, unexpected transfer API response") + case <-ctx.Done(): + require.NoError(t, ctx.Err(), "timeout waiting for BYE to arrive") + } + }) - t.Run("normal", func(t *testing.T) { - st, call, oc := prepFunc(t) - reqChan := st.TestUA.RegisterSink(call.localTag, "") - ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) - defer cancel() + t.Run("failed", func(t *testing.T) { + // Transfer fails, we don't receive a BYE + const finalNotifyStatus = 480 - transferRes := startTransfer(t, ctx, oc, "tel:+15551234567", nil, false) + st := NewServiceTest(t, nil) + call := setupCall(t, st) - select { - case msg := <-reqChan: - require.Equal(t, sip.REFER, msg.req.Method) - require.Equal(t, call.localTag, msg.req.To().Params.GetOr("tag", "")) - hdrs := msg.req.GetHeaders("Refer-To") - require.Equal(t, 1, len(hdrs), "should be 1 Refer-To header") - require.Equal(t, "", hdrs[0].Value()) - resp := sip.NewResponseFromRequest(msg.req, 202, "Accepted", nil) - msg.tx.Respond(resp) - // OK, proceed - case err := <-transferRes: - t.Fatal("error transferring call, unexpected transfer API response", err) - case <-ctx.Done(): - t.Fatal("timeout waiting for REFER to arrive") - } + reqChan := call.RegisterRequestChannel("") + defer call.UnregisterRequestChannel("") - notifyReq := call.NewRequest(sip.NOTIFY) - notifyReq.AppendHeader(sip.NewHeader("Event", "refer")) - notifyReq.AppendHeader(sip.NewHeader("Content-Type", "message/sipfrag")) - notifyReq.SetBody([]byte(sip.NewResponse(200, "OK").String())) + ctx, cancel := context.WithTimeout(t.Context(), time.Second*3) + defer cancel() - resp := st.TestUA.TransactionRequest(t, notifyReq, false) - require.Equal(t, sip.StatusCode(200), resp.StatusCode, "Expecting 200 OK") + transferRes := startTransfer(t, ctx, st, call, referTo, nil, false) - select { - case err := <-transferRes: - if err != nil { - t.Fatal("error transferring call, unexpected transfer API response", err) + err := handleRefer(t, ctx, reqChan, call, 202, expectHeaders(referTo, nil)) + require.NoError(t, err, "Failed to process REFER request") + err = sendNotify(t, ctx, call, []int{100, 180, finalNotifyStatus}) + require.NoError(t, err, "Failed to send NOTIFY requests") + select { + case <-time.After(time.Millisecond * 250): + t.Logf("No BYE received, as expected") + case msg := <-reqChan: + t.Fatalf("Received unexpected request: %+v", msg) } - case <-ctx.Done(): - t.Fatal("timeout waiting for BYE to arrive") - } - }) + err = sendBye(t, call) + require.NoError(t, err, "Failed to send BYE request") + select { + case err := <-transferRes: + t.Logf("Received error: %v", err) + require.Error(t, err) + var psErr psrpc.Error + require.ErrorAs(t, err, &psErr) + var sipErr *livekit.SIPStatus + require.ErrorAs(t, err, &sipErr) + require.Equal(t, livekit.SIPStatusCode(finalNotifyStatus), sipErr.Code) + require.Equal(t, sipStatus(finalNotifyStatus), sipErr.Status) + case <-ctx.Done(): + require.NoError(t, ctx.Err(), "timeout waiting for BYE to arrive") + } + }) - t.Run("with headers", func(t *testing.T) { - st, call, oc := prepFunc(t) - reqChan := st.TestUA.RegisterSink(call.localTag, "") - ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) - defer cancel() + t.Run("failed_concurrent", func(t *testing.T) { + // Transfer fails, we don't receive a BYE + const finalNotifyStatus = 480 - headers := map[string]string{ - "X-Custom-Header": "custom-value", - } - transferRes := startTransfer(t, ctx, oc, "tel:+15551234567", headers, false) + st := NewServiceTest(t, nil) + call := setupCall(t, st) - select { - case msg := <-reqChan: - require.Equal(t, sip.REFER, msg.req.Method) - require.Equal(t, call.localTag, msg.req.To().Params.GetOr("tag", "")) - hdrs := msg.req.GetHeaders("Refer-To") - require.Equal(t, 1, len(hdrs), "should be 1 Refer-To header") - require.Equal(t, "", hdrs[0].Value()) - hdr := msg.req.GetHeaders("X-Custom-Header") - require.Equal(t, 1, len(hdr), "should be 1 X-Custom-Header") - require.Equal(t, "custom-value", hdr[0].Value()) - resp := sip.NewResponseFromRequest(msg.req, 480, "Temporarily Unavailable", nil) - msg.tx.Respond(resp) - case err := <-transferRes: - t.Fatal("error transferring call, unexpected transfer API response", err) - case <-ctx.Done(): - t.Fatal("timeout waiting for REFER to arrive") - } - }) + reqChan := call.RegisterRequestChannel("") + defer call.UnregisterRequestChannel("") - t.Run("bye", func(t *testing.T) { - // After REFER gets 202 we wait on NOTIFY; remote hangs up with BYE instead. - st, call, oc := prepFunc(t) - reqChan := st.TestUA.RegisterSink(call.localTag, "") - ctx, cancel := context.WithTimeout(t.Context(), time.Second*5) - defer cancel() + ctx, cancel := context.WithTimeout(t.Context(), time.Second*3) + defer cancel() - transferRes := startTransfer(t, ctx, oc, "tel:+15551234567", nil, false) + transferResults := []<-chan error{ + startTransfer(t, ctx, st, call, referTo, nil, false), + startTransfer(t, ctx, st, call, referTo, nil, false), + startTransfer(t, ctx, st, call, referTo, nil, false), + } - select { - case msg := <-reqChan: - require.Equal(t, sip.REFER, msg.req.Method) - require.Equal(t, call.localTag, msg.req.To().Params.GetOr("tag", "")) - hdrs := msg.req.GetHeaders("Refer-To") - require.Equal(t, 1, len(hdrs), "should be 1 Refer-To header") - require.Equal(t, "", hdrs[0].Value()) - resp := sip.NewResponseFromRequest(msg.req, 202, "Accepted", nil) - msg.tx.Respond(resp) - // OK, proceed - case err := <-transferRes: - t.Fatal("error transferring call, unexpected transfer API response", err) - case <-ctx.Done(): - t.Fatal("timeout waiting for REFER to arrive") - } + err := handleRefer(t, ctx, reqChan, call, 202, expectHeaders(referTo, nil)) + require.NoError(t, err, "Failed to process REFER request") + err = sendNotify(t, ctx, call, []int{100, 180, finalNotifyStatus}) + require.NoError(t, err, "Failed to send NOTIFY requests") + select { + case <-time.After(time.Millisecond * 250): + t.Logf("No BYE received, as expected") + case msg := <-reqChan: + t.Fatalf("Received unexpected request: %+v", msg) + } + err = sendBye(t, call) + require.NoError(t, err, "Failed to send BYE request") + for i, transferRes := range transferResults { + select { + case err := <-transferRes: + t.Logf("Received error: %v for transfer %d", err, i) + require.Error(t, err) + var psErr psrpc.Error + require.ErrorAs(t, err, &psErr) + var sipErr *livekit.SIPStatus + require.ErrorAs(t, err, &sipErr) + require.Equal(t, livekit.SIPStatusCode(finalNotifyStatus), sipErr.Code) + require.Equal(t, sipStatus(finalNotifyStatus), sipErr.Status) + case <-ctx.Done(): + require.NoError(t, ctx.Err(), "timeout waiting for BYE to arrive") + } + } + }) - byeReq := call.NewRequest(sip.BYE) - resp := st.TestUA.TransactionRequest(t, byeReq, false) - require.Equal(t, sip.StatusCode(200), resp.StatusCode, "Expecting 200 OK") + t.Run("bye", func(t *testing.T) { + // After REFER gets 202 we wait on NOTIFY; remote hangs up with BYE instead. + st := NewServiceTest(t, nil) + call := setupCall(t, st) - select { - case err := <-transferRes: - if err != nil { - t.Fatal("error transferring call, unexpected transfer API response", err) + reqChan := call.RegisterRequestChannel("") + defer call.UnregisterRequestChannel("") + + ctx, cancel := context.WithTimeout(t.Context(), time.Second*3) + defer cancel() + + headers := map[string]string{ + "X-Custom-Header": "custom-value", } - case <-ctx.Done(): - t.Fatal("timeout waiting for API to time return") - } + transferRes := startTransfer(t, ctx, st, call, referTo, headers, false) + + err := handleRefer(t, ctx, reqChan, call, 202, expectHeaders(referTo, headers)) + require.NoError(t, err, "Failed to process REFER request") + err = sendBye(t, call) + require.NoError(t, err, "Failed to send BYE request") + + select { + case err := <-transferRes: + require.NoError(t, err, "error transferring call, unexpected transfer API response") + case <-ctx.Done(): + require.NoError(t, ctx.Err(), "timeout waiting for BYE to arrive") + } + }) }) - }) + } } func TestRouteSet(t *testing.T) {