diff --git a/go.mod b/go.mod index 78acfab2..39c95907 100644 --- a/go.mod +++ b/go.mod @@ -11,15 +11,15 @@ require ( github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731 github.com/livekit/media-sdk v0.0.0-20260612175532-3d4d26d136c9 github.com/livekit/mediatransportutil v0.0.0-20260608063931-a3417d38cda0 - github.com/livekit/protocol v1.46.7-0.20260610064410-e286afe70eb0 + github.com/livekit/protocol v1.47.1-0.20260618140803-db77a56cf894 github.com/livekit/psrpc v0.7.2 - github.com/livekit/server-sdk-go/v2 v2.16.7-0.20260608025623-a5da15b13baa + github.com/livekit/server-sdk-go/v2 v2.16.7-0.20260618140743-3776341a116e github.com/livekit/sipgo v0.13.2-0.20260519205735-a5b4a38b6ceb github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12 github.com/ory/dockertest/v3 v3.12.0 github.com/pion/rtp v1.10.2 github.com/pion/sdp/v3 v3.0.18 - github.com/pion/webrtc/v4 v4.2.11 + github.com/pion/webrtc/v4 v4.2.14 github.com/prometheus/client_golang v1.23.2 github.com/sirupsen/logrus v1.9.4 github.com/stretchr/testify v1.11.1 @@ -102,7 +102,7 @@ require ( github.com/pion/mdns/v2 v2.1.0 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/rtcp v1.2.16 // indirect - github.com/pion/sctp v1.9.5 // indirect + github.com/pion/sctp v1.10.0 // indirect github.com/pion/srtp/v3 v3.0.11 // indirect github.com/pion/stun/v3 v3.1.4 // indirect github.com/pion/transport/v4 v4.0.2 // indirect diff --git a/go.sum b/go.sum index 0afa59ee..8183abb0 100644 --- a/go.sum +++ b/go.sum @@ -134,12 +134,12 @@ github.com/livekit/media-sdk v0.0.0-20260612175532-3d4d26d136c9 h1:GVMkuNwXQ74kV github.com/livekit/media-sdk v0.0.0-20260612175532-3d4d26d136c9/go.mod h1:TuYRjSepaakL6ATsM9V2VMuksewW1PlhA32BG7Pxty0= github.com/livekit/mediatransportutil v0.0.0-20260608063931-a3417d38cda0 h1:XHNNzebIKZRkLimla/hFGrAIX5EMWHctrgt3hLw7s+I= github.com/livekit/mediatransportutil v0.0.0-20260608063931-a3417d38cda0/go.mod h1:o8CFmAdrVwzJNOCsQCLUzXRjokkufNshnQHOe4fRaqU= -github.com/livekit/protocol v1.46.7-0.20260610064410-e286afe70eb0 h1:aNazCl+gTEmF88tVsISOvtnfZM/K9IbqAn2WvZVmh4Y= -github.com/livekit/protocol v1.46.7-0.20260610064410-e286afe70eb0/go.mod h1:jO+y05AU9Ec4JswDyuzKCZ4bhziOS0CzMqgnbj60Dzs= +github.com/livekit/protocol v1.47.1-0.20260618140803-db77a56cf894 h1:OH1Fejt3yDQXG2bYs1LkaSxifdVlm61eG3yrzrLW6Jo= +github.com/livekit/protocol v1.47.1-0.20260618140803-db77a56cf894/go.mod h1:jO+y05AU9Ec4JswDyuzKCZ4bhziOS0CzMqgnbj60Dzs= github.com/livekit/psrpc v0.7.2 h1:6oZ+NODJ2pLyaT6VqDq1F4Qc/3TpDUSpyphj/P9MhQc= github.com/livekit/psrpc v0.7.2/go.mod h1:rAI+m2+/cb4x9RXhLRtUx5ZwdfjjXOl4zi46IjEetaw= -github.com/livekit/server-sdk-go/v2 v2.16.7-0.20260608025623-a5da15b13baa h1:B19yilP7+JjekKMD0WejMh1Kvypdxpr5yxQZiFStRD0= -github.com/livekit/server-sdk-go/v2 v2.16.7-0.20260608025623-a5da15b13baa/go.mod h1:SWJD68Rfcwrhze09EYaRiur7ESCBuu0u4fpK+0BGEYo= +github.com/livekit/server-sdk-go/v2 v2.16.7-0.20260618140743-3776341a116e h1:PJZ+9COhAT8sCIo6zJCtYaDeJBQcCUN8H0GyEe2xMMM= +github.com/livekit/server-sdk-go/v2 v2.16.7-0.20260618140743-3776341a116e/go.mod h1:fuOvpz1rjH2XgsaXiVKSzW1tPw7es5dmBjr4GrX7xd8= github.com/livekit/sipgo v0.13.2-0.20260519205735-a5b4a38b6ceb h1:HmgaJMGs0Nco/Z+XMc9f+xFgrbood9yJsIBtl1OY76M= github.com/livekit/sipgo v0.13.2-0.20260519205735-a5b4a38b6ceb/go.mod h1:aDa6mbFktNzA1D917RhFlIB5IOfNBTmrwt+/lX960j0= github.com/mackerelio/go-osstat v0.2.7 h1:TCavZi10wF49bT6iQZ9eT2keGZQpC69MTDfdJej5e94= @@ -196,8 +196,8 @@ github.com/pion/rtcp v1.2.16 h1:fk1B1dNW4hsI78XUCljZJlC4kZOPk67mNRuQ0fcEkSo= github.com/pion/rtcp v1.2.16/go.mod h1:/as7VKfYbs5NIb4h6muQ35kQF/J0ZVNz2Z3xKoCBYOo= github.com/pion/rtp v1.10.2 h1:l+f6tTDcAH6xwepaAoW791ddhuYsJlqRATOzirO04Mo= github.com/pion/rtp v1.10.2/go.mod h1:Au8fc6cEByy8RLTwKTQTEeQqDB/SJDxwL4mZuxYA5Pk= -github.com/pion/sctp v1.9.5 h1:QoSFB/drmAsmSeSFNQNI3xx010nW4HsycCZckRVWWag= -github.com/pion/sctp v1.9.5/go.mod h1:N20Dq6LY+JvJDAh9VVh1JELngb2rQ8dPgds5yBWiPgw= +github.com/pion/sctp v1.10.0 h1:qeoD6swF/2M5bYRcAGayqSbTKX3m4AW29CiQxG1+Pfg= +github.com/pion/sctp v1.10.0/go.mod h1:N20Dq6LY+JvJDAh9VVh1JELngb2rQ8dPgds5yBWiPgw= github.com/pion/sdp/v3 v3.0.18 h1:l0bAXazKHpepazVdp+tPYnrsy9dfh7ZbT8DxesH5ZnI= github.com/pion/sdp/v3 v3.0.18/go.mod h1:ZREGo6A9ZygQ9XkqAj5xYCQtQpif0i6Pa81HOiAdqQ8= github.com/pion/srtp/v3 v3.0.11 h1:GiESUr54/K4UuPigfq/CvWUed80JenQAHXn0C2MQQIQ= @@ -208,12 +208,10 @@ github.com/pion/transport/v3 v3.1.1 h1:Tr684+fnnKlhPceU+ICdrw6KKkTms+5qHMgw6bIkY github.com/pion/transport/v3 v3.1.1/go.mod h1:+c2eewC5WJQHiAA46fkMMzoYZSuGzA/7E2FPrOYHctQ= github.com/pion/transport/v4 v4.0.2 h1:ifYlPqNwsy6aKQ9y8yzxXlHae5431ZrH2avkD/Rn6Tk= github.com/pion/transport/v4 v4.0.2/go.mod h1:06hFI+jCFcok2X2MekVufNZ/uzNZXivGBPfviSVcjgM= -github.com/pion/turn/v4 v4.1.4 h1:EU11yMXKIsK43FhcUnjLlrhE4nboHZq+TXBIi3QpcxQ= -github.com/pion/turn/v4 v4.1.4/go.mod h1:ES1DXVFKnOhuDkqn9hn5VJlSWmZPaRJLyBXoOeO/BmQ= github.com/pion/turn/v5 v5.0.8 h1:pZUCtmwWCMkrRKqh/8pL3WoGADXBe0/lOPkN7oqFjK8= github.com/pion/turn/v5 v5.0.8/go.mod h1:1VwvxElZaOdJU0liJ/WUSm/Tsh+n2OxS5ISSDxgOWxU= -github.com/pion/webrtc/v4 v4.2.11 h1:QUX1QZKlNIn4O7U5JxLPGP0sV5RTncZkzu9SPR3jVNU= -github.com/pion/webrtc/v4 v4.2.11/go.mod h1:s/rAiyy77GyRFrZMx+Ls6aua26dIBPudH8/ZHYbIRWY= +github.com/pion/webrtc/v4 v4.2.14 h1:Q6zMs+fSDsYuhZcNlvFGBxCOMHVV9oYcDa6O9/HIGTc= +github.com/pion/webrtc/v4 v4.2.14/go.mod h1:87NVKP86+g4OMrRxWhjWfUjeXP4JrV6RTlUrIW+/Jak= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/pkg/sip/client.go b/pkg/sip/client.go index fabc964a..65f163ea 100644 --- a/pkg/sip/client.go +++ b/pkg/sip/client.go @@ -350,7 +350,11 @@ func (c *Client) onBye(req *sip.Request, tx sip.ServerTransaction) bool { call.log.Infow("BYE from remote") go func(call *outboundCall) { call.cc.AcceptBye(req, tx) - call.CloseWithReason(ctx, CallHangup, stats.Success("bye"), livekit.DisconnectReason_CLIENT_INITIATED) + call.CloseWith(ctx, EndCall{ + Status: CallHangup, + Term: stats.Success("bye"), + Reason: livekit.DisconnectReason_CLIENT_INITIATED, + }) }(call) return true } diff --git a/pkg/sip/errors.go b/pkg/sip/errors.go index 9a6d4325..d96c516c 100644 --- a/pkg/sip/errors.go +++ b/pkg/sip/errors.go @@ -21,10 +21,7 @@ import ( // inviteFailure is the verdict for a failed outbound INVITE: how to record // the call, how to bucket the SLI, and which error to surface back. type inviteFailure struct { - status CallStatus - term stats.Termination - reason livekit.DisconnectReason - reportErr error // nil skips writing SIPCallInfo.Error + EndCall returnErr error } @@ -54,18 +51,20 @@ func (e SDPError) GRPCStatus() *status.Status { func (e SDPError) ClassifyInvite() inviteFailure { res := inviteFailure{ - status: callRejected, - reason: livekit.DisconnectReason_MEDIA_FAILURE, - reportErr: e.Err, + EndCall: EndCall{ + Status: callRejected, + Reason: livekit.DisconnectReason_MEDIA_FAILURE, + Report: e.Err, + }, returnErr: e, } switch { case errors.Is(e.Err, sdp.ErrNoCommonMedia): - res.term = stats.ClientError("no-common-codec") + res.Term = stats.ClientError("no-common-codec") case errors.Is(e.Err, sdp.ErrNoCommonCrypto): - res.term = stats.ClientError("encryption-required") + res.Term = stats.ClientError("encryption-required") default: - res.term = stats.ClientError("sdp-error") + res.Term = stats.ClientError("sdp-error") } return res } @@ -89,10 +88,12 @@ func (e transactionTimeoutError) ClassifyInvite() inviteFailure { reason = "no-final-response" } return inviteFailure{ - status: callUnavailable, - term: stats.ClientError(reason), - reason: livekit.DisconnectReason_SIP_TRUNK_FAILURE, - reportErr: e, // keep so the customer sees their destination didn't complete + EndCall: EndCall{ + Status: callUnavailable, + Term: stats.ClientError(reason), + Reason: livekit.DisconnectReason_SIP_TRUNK_FAILURE, + Report: e, // keep so the customer sees their destination didn't complete + }, returnErr: psrpc.NewError(psrpc.Canceled, e), } } @@ -107,10 +108,12 @@ func classifyInviteError(err error) inviteFailure { } res := inviteFailure{ - status: callDropped, - term: stats.ServerError("invite-failed"), - reason: livekit.DisconnectReason_UNKNOWN_REASON, - reportErr: err, + EndCall: EndCall{ + Status: callDropped, + Term: stats.ServerError("invite-failed"), + Reason: livekit.DisconnectReason_UNKNOWN_REASON, + Report: err, + }, returnErr: err, } @@ -118,31 +121,31 @@ func classifyInviteError(err error) inviteFailure { code := int(sipStatus.Code) switch code { case int(sip.StatusUnauthorized), int(sip.StatusProxyAuthRequired): - res.status, res.term, res.reason = callRejected, stats.ClientError("auth-required"), livekit.DisconnectReason_USER_REJECTED - res.reportErr = nil + res.Status, res.Term, res.Reason = callRejected, stats.ClientError("auth-required"), livekit.DisconnectReason_USER_REJECTED + res.Report = nil case int(sip.StatusForbidden): - res.status, res.term, res.reason = callRejected, stats.ClientError("forbidden"), livekit.DisconnectReason_USER_REJECTED - res.reportErr = nil + res.Status, res.Term, res.Reason = callRejected, stats.ClientError("forbidden"), livekit.DisconnectReason_USER_REJECTED + res.Report = nil case int(sip.StatusNotFound): - res.status, res.term, res.reason = callUnavailable, stats.ClientError("not-found"), livekit.DisconnectReason_USER_UNAVAILABLE - res.reportErr = nil + res.Status, res.Term, res.Reason = callUnavailable, stats.ClientError("not-found"), livekit.DisconnectReason_USER_UNAVAILABLE + res.Report = nil case int(sip.StatusRequestTimeout): - res.status, res.term, res.reason = callUnavailable, stats.ClientError("request-timeout"), livekit.DisconnectReason_USER_UNAVAILABLE - res.reportErr = nil + res.Status, res.Term, res.Reason = callUnavailable, stats.ClientError("request-timeout"), livekit.DisconnectReason_USER_UNAVAILABLE + res.Report = nil case int(sip.StatusTemporarilyUnavailable): - res.status, res.term, res.reason = callUnavailable, stats.ClientError("unavailable"), livekit.DisconnectReason_USER_UNAVAILABLE - res.reportErr = nil + res.Status, res.Term, res.Reason = callUnavailable, stats.ClientError("unavailable"), livekit.DisconnectReason_USER_UNAVAILABLE + res.Report = nil case int(sip.StatusBusyHere): - res.status, res.term, res.reason = callRejected, stats.ClientError("busy"), livekit.DisconnectReason_USER_REJECTED - res.reportErr = nil + res.Status, res.Term, res.Reason = callRejected, stats.ClientError("busy"), livekit.DisconnectReason_USER_REJECTED + res.Report = nil case int(sip.StatusNotAcceptableHere): - res.status, res.term, res.reason = callRejected, stats.ClientError("not-acceptable"), livekit.DisconnectReason_USER_REJECTED - res.reportErr = nil + res.Status, res.Term, res.Reason = callRejected, stats.ClientError("not-acceptable"), livekit.DisconnectReason_USER_REJECTED + res.Report = nil default: switch { case code >= 400 && code < 500: - res.status, res.term, res.reason = callRejected, stats.ClientError(fmt.Sprintf("client-error-%d", code)), livekit.DisconnectReason_USER_UNAVAILABLE - res.reportErr = nil + res.Status, res.Term, res.Reason = callRejected, stats.ClientError(fmt.Sprintf("client-error-%d", code)), livekit.DisconnectReason_USER_UNAVAILABLE + res.Report = nil case code >= 500 && code < 600: // Some upstreams (notably Twilio) return a 5xx when the customer's own trunk exceeds its configured CPS or // concurrent-call cap. That's a customer-side rate limit, not upstream infrastructure breakage, so it must not count @@ -150,26 +153,26 @@ func classifyInviteError(err error) inviteFailure { body := strings.ToLower(sipStatus.GetStatus()) switch { case strings.Contains(body, "cps limit exceeded"): - res.status, res.term, res.reason = callRejected, stats.ClientError("cps-limit-exceeded"), livekit.DisconnectReason_SIP_TRUNK_FAILURE + res.Status, res.Term, res.Reason = callRejected, stats.ClientError("cps-limit-exceeded"), livekit.DisconnectReason_SIP_TRUNK_FAILURE // keep reportErr so the customer can see they hit their cap case strings.Contains(body, "concurrent call limit exceeded"): - res.status, res.term, res.reason = callRejected, stats.ClientError("concurrent-limit-exceeded"), livekit.DisconnectReason_SIP_TRUNK_FAILURE + res.Status, res.Term, res.Reason = callRejected, stats.ClientError("concurrent-limit-exceeded"), livekit.DisconnectReason_SIP_TRUNK_FAILURE // keep reportErr so the customer can see they hit their cap default: // Carrier-side 5xx; keep reportErr for the detail. - res.status, res.term, res.reason = callDropped, stats.UpstreamError(fmt.Sprintf("upstream-server-error-%d", code)), livekit.DisconnectReason_SIP_TRUNK_FAILURE + res.Status, res.Term, res.Reason = callDropped, stats.UpstreamError(fmt.Sprintf("upstream-server-error-%d", code)), livekit.DisconnectReason_SIP_TRUNK_FAILURE } case code >= 600 && code < 700: - res.status, res.term, res.reason = callRejected, stats.ClientError(fmt.Sprintf("global-decline-%d", code)), livekit.DisconnectReason_USER_REJECTED - res.reportErr = nil + res.Status, res.Term, res.Reason = callRejected, stats.ClientError(fmt.Sprintf("global-decline-%d", code)), livekit.DisconnectReason_USER_REJECTED + res.Report = nil } } return res } if errors.Is(err, ErrSIPRequestTimeout) { - res.status, res.term, res.reason = callUnavailable, stats.ClientError("no-answer"), livekit.DisconnectReason_USER_UNAVAILABLE - res.reportErr = nil + res.Status, res.Term, res.Reason = callUnavailable, stats.ClientError("no-answer"), livekit.DisconnectReason_USER_UNAVAILABLE + res.Report = nil return res } @@ -177,36 +180,36 @@ func classifyInviteError(err error) inviteFailure { // op error can wrap a context error, and the context cause is more // informative. if errors.Is(err, context.DeadlineExceeded) { - res.status, res.term, res.reason = callDropped, stats.ServerError("deadline-exceeded"), livekit.DisconnectReason_UNKNOWN_REASON + res.Status, res.Term, res.Reason = callDropped, stats.ServerError("deadline-exceeded"), livekit.DisconnectReason_UNKNOWN_REASON res.returnErr = psrpc.NewError(psrpc.DeadlineExceeded, err) return res } if errors.Is(err, context.Canceled) { - res.status, res.term, res.reason = callRejected, stats.ClientError("canceled"), livekit.DisconnectReason_USER_UNAVAILABLE - res.reportErr = nil + res.Status, res.Term, res.Reason = callRejected, stats.ClientError("canceled"), livekit.DisconnectReason_USER_UNAVAILABLE + res.Report = nil res.returnErr = psrpc.NewError(psrpc.Canceled, err) return res } // Specific net error types before *net.OpError (which wraps them). if _, ok := errors.AsType[*net.AddrError](err); ok { - res.status, res.term, res.reason = callDropped, stats.ClientError("address-error"), livekit.DisconnectReason_SIP_TRUNK_FAILURE + res.Status, res.Term, res.Reason = callDropped, stats.ClientError("address-error"), livekit.DisconnectReason_SIP_TRUNK_FAILURE res.returnErr = psrpc.NewError(psrpc.InvalidArgument, err) return res } if _, ok := errors.AsType[*net.DNSError](err); ok { - res.status, res.term, res.reason = callDropped, stats.ClientError("dns-resolution"), livekit.DisconnectReason_SIP_TRUNK_FAILURE + res.Status, res.Term, res.Reason = callDropped, stats.ClientError("dns-resolution"), livekit.DisconnectReason_SIP_TRUNK_FAILURE res.returnErr = psrpc.NewError(psrpc.InvalidArgument, err) return res } if _, ok := errors.AsType[*net.OpError](err); ok { - res.status, res.term, res.reason = callDropped, stats.ServerError("network-error"), livekit.DisconnectReason_SIP_TRUNK_FAILURE + res.Status, res.Term, res.Reason = callDropped, stats.ServerError("network-error"), livekit.DisconnectReason_SIP_TRUNK_FAILURE res.returnErr = psrpc.NewError(psrpc.Unavailable, err) return res } if errors.Is(err, ErrAuthMaxRetry) || errors.Is(err, ErrAuthMissingCreds) || errors.Is(err, ErrAuthNoHeader) { - res.status, res.term, res.reason = callRejected, stats.ClientError("auth-failed"), livekit.DisconnectReason_USER_REJECTED + res.Status, res.Term, res.Reason = callRejected, stats.ClientError("auth-failed"), livekit.DisconnectReason_USER_REJECTED // keep reportErr so the auth detail is recorded return res } diff --git a/pkg/sip/errors_test.go b/pkg/sip/errors_test.go index caf69300..b73d0796 100644 --- a/pkg/sip/errors_test.go +++ b/pkg/sip/errors_test.go @@ -95,13 +95,13 @@ func TestClassifyInviteError(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { res := classifyInviteError(tc.err) - require.Equal(t, tc.wantStatus, res.status, "status") - require.Equal(t, tc.wantTerm, res.term, "termination") - require.Equal(t, tc.wantReason, res.reason, "disconnect reason") + require.Equal(t, tc.wantStatus, res.Status, "status") + require.Equal(t, tc.wantTerm, res.Term, "termination") + require.Equal(t, tc.wantReason, res.Reason, "disconnect reason") if tc.wantReport { - require.NotNil(t, res.reportErr, "reportErr expected non-nil") + require.NotNil(t, res.Report, "reportErr expected non-nil") } else { - require.Nil(t, res.reportErr, "reportErr expected nil") + require.Nil(t, res.Report, "reportErr expected nil") } }) } diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 2d5f7bd9..b6db9458 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -669,6 +669,7 @@ type inboundCall struct { mmu sync.Mutex media *MediaPort dtmf chan dtmf.Event // buffered + endCall chan EndCall // buffered lkRoom RoomInterface // LiveKit room; only active after correct pin is entered callDur func() time.Duration joinDur func() time.Duration @@ -705,6 +706,7 @@ func (s *Server) newInboundCall( state: state, extraAttrs: extra, dtmf: make(chan dtmf.Event, 10), + endCall: make(chan EndCall, 1), jitterBuf: SelectValueBool(s.conf.EnableJitterBuffer, s.conf.EnableJitterBufferProb), projectID: "", // Will be set in handleInvite when available } @@ -755,7 +757,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip c.mon.InviteAccept() c.mon.CallStart() defer c.mon.CallEnd() - defer c.close(ctx, callDropped, stats.ServerError("other")) + defer c.closeWithTerm(ctx, stats.ServerError("other")) // Extract and store the SIP call ID from the request if h := req.CallID(); h != nil { @@ -781,7 +783,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip if err != nil { c.log().Errorw("Cannot create media config", err) c.cc.RespondAndDrop(sip.StatusInternalServerError, "") - c.close(ctx, callDropped, stats.ServerError("media-config-error")) + c.closeWithTerm(ctx, stats.ServerError("media-config-error")) return psrpc.NewError(psrpc.Internal, err) } if disp.ProjectID != "" { @@ -818,22 +820,25 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip err := fmt.Errorf("unexpected dispatch result: %v", disp.Result) c.log().Errorw("Rejecting inbound call", err) c.cc.RespondAndDrop(sip.StatusNotImplemented, "") - c.close(ctx, callDropped, stats.ServerError("unexpected-result")) + c.closeWithTerm(ctx, stats.ServerError("unexpected-result")) return psrpc.NewError(psrpc.Unimplemented, err) case DispatchNoRuleDrop: c.log().Debugw("Rejecting inbound flood") c.cc.Drop() - c.close(ctx, callFlood, stats.ClientError("flood")) + c.close(ctx, EndCall{ + Status: callFlood, + Term: stats.ClientError("flood"), + }) return psrpc.NewErrorf(psrpc.PermissionDenied, "call was not authorized by trunk configuration") case DispatchNoRuleReject: c.log().Infow("Rejecting inbound call, doesn't match any Dispatch Rules") c.cc.RespondAndDrop(sip.StatusNotFound, "Does not match Trunks or Dispatch Rules") - c.close(ctx, callDropped, stats.ClientError("no-dispatch")) + c.closeWithTerm(ctx, stats.ClientError("no-dispatch")) return psrpc.NewErrorf(psrpc.NotFound, "no trunk configuration for call") case DispatchServiceUnavailable: c.log().Warnw("Rejecting inbound call, dispatch evaluation failed", nil) c.cc.RespondAndDrop(sip.StatusServiceUnavailable, "Try again later") - c.close(ctx, callDropped, stats.ServerError("dispatch-error")) + c.closeWithTerm(ctx, stats.ServerError("dispatch-error")) return psrpc.NewErrorf(psrpc.Unavailable, "dispatch rule evaluation unavailable") case DispatchAccept: pinPrompt = false @@ -880,7 +885,10 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip log.Warnw("Cannot start media", err) } c.cc.RespondAndDrop(sipReason, "") - c.close(ctx, status, term) + c.close(ctx, EndCall{ + Status: status, + Term: term, + }) return nil, err } return answerData, nil @@ -912,7 +920,10 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip return false, err } else if err != nil { c.log().Errorw("Cannot accept the call", err) - c.close(ctx, callAcceptFailed, stats.ServerError("accept-failed")) + c.close(ctx, EndCall{ + Status: callAcceptFailed, + Term: stats.ServerError("accept-failed"), + }) return false, err } if !c.s.conf.Experimental.InboundWaitACK { @@ -977,7 +988,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip // Publish our own track. if err := c.publishTrack(); err != nil { c.log().Errorw("Cannot publish track", err) - c.close(ctx, callDropped, stats.ServerError("publish-failed")) + c.closeWithTerm(ctx, stats.ServerError("publish-failed")) return fmt.Errorf("publishing track to room failed: %w", err) } tsub := c.mon.StageDurTimer("track-subscribe") @@ -1029,11 +1040,14 @@ func (c *inboundCall) waitForCallEnd(ctx context.Context, ackReceived <-chan str case <-ctx.Done(): c.closeWithHangup(ctx) return nil + case end := <-c.endCall: + c.close(ctx, end) + return nil case <-c.lkRoom.Closed(): c.state.DeferUpdate(func(info *livekit.SIPCallInfo) { info.DisconnectReason = livekit.DisconnectReason_CLIENT_INITIATED }) - c.close(ctx, callDropped, terminationFromRoomDisconnect(c.lkRoom.ClosedReason())) + c.closeWithTerm(ctx, terminationFromRoomDisconnect(c.lkRoom.ClosedReason())) return nil case <-c.media.Timeout(): return c.mediaTimeout(ctx) @@ -1136,6 +1150,9 @@ func (c *inboundCall) waitMedia(ctx context.Context) (bool, error) { return false, psrpc.NewErrorf(psrpc.Canceled, "room closed") case <-c.media.Timeout(): return false, c.mediaTimeout(ctx) + case end := <-c.endCall: + c.close(ctx, end) + return false, nil case <-c.media.Received(): case <-delay.C: } @@ -1160,8 +1177,11 @@ func (c *inboundCall) waitSubscribe(ctx context.Context, timeout time.Duration) return false, psrpc.NewErrorf(psrpc.Canceled, "room closed") case <-c.media.Timeout(): return false, c.mediaTimeout(ctx) + case end := <-c.endCall: + c.close(ctx, end) + return false, psrpc.NewErrorf(psrpc.Canceled, "rpc terminated the call") case <-timer.C: - c.close(ctx, callDropped, stats.ServerError("cannot-subscribe")) + c.closeWithTerm(ctx, stats.ServerError("cannot-subscribe")) return false, psrpc.NewErrorf(psrpc.DeadlineExceeded, "room subscription timed out") case <-c.lkRoom.Subscribed(): return true, nil @@ -1217,13 +1237,13 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD } if disp.Result == DispatchServiceUnavailable { c.log().Warnw("Rejecting call, dispatch evaluation failed", nil, "pin", pin, "noPin", noPin) - c.close(ctx, callDropped, stats.ServerError("dispatch-error")) + c.closeWithTerm(ctx, stats.ServerError("dispatch-error")) return disp, false, psrpc.NewErrorf(psrpc.Unavailable, "dispatch rule evaluation unavailable") } if disp.Result != DispatchAccept || disp.Room.RoomName == "" { c.log().Infow("Rejecting call", "pin", pin, "noPin", noPin) c.playAudio(ctx, c.s.res.wrongPin) - c.close(ctx, callDropped, stats.ClientError("wrong-pin")) + c.closeWithTerm(ctx, stats.ClientError("wrong-pin")) return disp, false, psrpc.NewErrorf(psrpc.PermissionDenied, "wrong pin") } c.playAudio(ctx, c.s.res.roomJoin) @@ -1233,7 +1253,10 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD pin += string(b.Digit) if len(pin) > pinLimit { c.playAudio(ctx, c.s.res.wrongPin) - c.close(ctx, callDropped, stats.ClientError("wrong-pin")) + c.close(ctx, EndCall{ + Status: callDropped, + Term: stats.ClientError("wrong-pin"), + }) return disp, false, psrpc.NewErrorf(psrpc.PermissionDenied, "wrong pin") } } @@ -1244,9 +1267,29 @@ func (c *inboundCall) printStats(log logger.Logger) { c.stats.Log(log, c.callStart) } +func (c *inboundCall) closeWithTerm(ctx context.Context, t stats.Termination) { + c.close(ctx, EndCall{ + Status: callDropped, + Term: t, + }) +} + +func (c *inboundCall) EndCall(ctx context.Context, headers map[string]string) error { + select { + case <-ctx.Done(): + return ctx.Err() + case c.endCall <- EndCall{ + Status: CallHangup, + Term: stats.Success("rpc"), + Headers: headers, + }: + } + return nil +} + // close should only be called from handleInvite. -func (c *inboundCall) close(ctx context.Context, status CallStatus, t stats.Termination) { - termCtx, cancel := context.WithCancel(context.Background()) // Do not use ctx +func (c *inboundCall) close(ctx context.Context, end EndCall) { + termCtx, cancel := context.WithCancel(context.WithoutCancel(ctx)) // Do not use ctx cancellation defer cancel() go func() { select { @@ -1264,22 +1307,37 @@ func (c *inboundCall) close(ctx context.Context, status CallStatus, t stats.Term } defer c.mon.StageDurTimer("close") c.stats.Closed.Store(true) - sipCode, sipStatus := status.SIPStatus() - log := c.log().WithValues("status", sipCode, "result", string(t.Result), "reason", t.Reason) + result := Result{ + Code: sip.StatusBusyHere, + Status: "Rejected", + } + switch end.Status { + case callMediaFailed: + result = Result{ + Code: sip.StatusNotAcceptableHere, + Status: "Media Failed", + } + case CallCancelled: + result = Result{ + Code: sip.StatusRequestTerminated, + Status: "Request Terminated", + } + } + log := c.log().WithValues("status", result.Code, "result", string(end.Term.Result), "reason", end.Reason) defer func() { c.stats.Update() c.printStats(log) c.sigTs.Log(log) }() - c.setStatus(status) - c.mon.CallTerminate(t) - isWarn := t.Result == stats.ResultServerError || status == callHangupMedia + c.setStatus(end.Status) + c.mon.CallTerminate(end.Term) + isWarn := end.Term.Result == stats.ResultServerError || end.Status == callHangupMedia if isWarn { log.Warnw("Closing inbound call with error", nil) } else { log.Infow("Closing inbound call") } - if status != callFlood { + if end.Status != callFlood { defer log.Infow("Inbound call closed") } @@ -1287,7 +1345,7 @@ func (c *inboundCall) close(ctx context.Context, status CallStatus, t stats.Term // This ensures participant attributes are still available for // attributes_to_headers mapping in the setHeaders callback. // See: https://github.com/livekit/sip/issues/404 - c.cc.CloseWithStatus(ctx, sipCode, sipStatus) + c.cc.CloseWithStatus(ctx, result, end.Headers) c.closeMedia() if callDurFn := c.callDur; callDurFn != nil { callDurFn() @@ -1309,7 +1367,7 @@ func (c *inboundCall) close(ctx context.Context, status CallStatus, t stats.Term ProjectID: c.projectID, CallID: c.call.LkCallId, SipCallID: c.call.SipCallId, - }, state, t.Reason) + }, state, end.Term.Reason) }(c.tid) } @@ -1331,11 +1389,17 @@ func (c *inboundCall) closeWithTimeout(ctx context.Context, isError bool) { } }) } - c.close(ctx, status, stats.Indeterminate("media-timeout")) + c.close(ctx, EndCall{ + Status: status, + Term: stats.Indeterminate("media-timeout"), + }) } func (c *inboundCall) closeWithNoACK(ctx context.Context) { - c.close(ctx, callNoACK, stats.ServerError("no-ack")) + c.close(ctx, EndCall{ + Status: callNoACK, + Term: stats.ServerError("no-ack"), + }) } func (c *inboundCall) closeWithCancelled(ctx context.Context) { @@ -1343,7 +1407,7 @@ func (c *inboundCall) closeWithCancelled(ctx context.Context) { if p := c.closeReason.Load(); p != nil { reason = *p } - c.closeWithReason(ctx, CallCancelled, stats.Success("cancelled"), reason) + c.closeWith(ctx, CallCancelled, stats.Success("cancelled"), reason) } func (c *inboundCall) closeWithHangup(ctx context.Context) { @@ -1351,10 +1415,10 @@ func (c *inboundCall) closeWithHangup(ctx context.Context) { if p := c.closeReason.Load(); p != nil { reason = *p } - c.closeWithReason(ctx, CallHangup, stats.Success("hangup"), reason) + c.closeWith(ctx, CallHangup, stats.Success("hangup"), reason) } -func (c *inboundCall) closeWithReason(ctx context.Context, status CallStatus, t stats.Termination, reason ReasonHeader) { +func (c *inboundCall) closeWith(ctx context.Context, status CallStatus, t stats.Termination, reason ReasonHeader) { ctx = context.WithoutCancel(ctx) c.state.DeferUpdate(func(info *livekit.SIPCallInfo) { info.DisconnectReason = livekit.DisconnectReason_CLIENT_INITIATED @@ -1369,7 +1433,10 @@ func (c *inboundCall) closeWithReason(ctx context.Context, status CallStatus, t t.Reason = fmt.Sprintf("bye-%s-%d", strings.ToLower(reason.Type), reason.Cause) } } - c.close(ctx, status, t) + c.close(ctx, EndCall{ + Status: status, + Term: t, + }) } func (c *inboundCall) Bye(reason ReasonHeader) { @@ -1386,7 +1453,7 @@ func (c *inboundCall) Close() error { // server_error termination so the call is counted in the SLI denominator. // close() is idempotent via c.done, so concurrent paths cannot double-emit. func (c *inboundCall) Shutdown(ctx context.Context) { - c.close(ctx, callDropped, stats.ServerError("shutdown")) + c.closeWithTerm(ctx, stats.ServerError("shutdown")) } func (c *inboundCall) closeMedia() { @@ -1450,6 +1517,9 @@ func (c *inboundCall) createLiveKitParticipant(ctx context.Context, rconf RoomCo if err := registerSignalingRPC(c.lkRoom, c.cc); err != nil { return err } + if err := registerCallRPC(c.lkRoom, c); err != nil { + return err + } return nil } @@ -1478,7 +1548,7 @@ func (c *inboundCall) joinRoom(ctx context.Context, rconf RoomConfig, status Cal c.log().Infow("Joining room") if err := c.createLiveKitParticipant(ctx, rconf, status); err != nil { c.log().Errorw("Cannot create LiveKit participant", err) - c.close(ctx, callDropped, stats.ServerError("participant-failed")) + c.closeWithTerm(ctx, stats.ServerError("participant-failed")) return fmt.Errorf("cannot create LiveKit participant: %w", err) } return nil @@ -2017,7 +2087,7 @@ func (c *sipInbound) setCSeq(req *sip.Request) { c.nextRequestCSeq++ } -func (c *sipInbound) sendBye(ctx context.Context) { +func (c *sipInbound) sendBye(ctx context.Context, headers map[string]string) { ctx = context.WithoutCancel(ctx) if c.inviteOk == nil { return // call wasn't established @@ -2032,6 +2102,9 @@ func (c *sipInbound) sendBye(ctx context.Context) { for k, v := range c.fillHeaders(nil) { r.AppendHeader(sip.NewHeader(k, v)) } + for k, v := range headers { + r.AppendHeader(sip.NewHeader(k, v)) + } c.setCSeq(r) c.swapSrcDst(r) @@ -2039,7 +2112,7 @@ func (c *sipInbound) sendBye(ctx context.Context) { sendAndACK(ctx, c, r) } -func (c *sipInbound) sendStatus(ctx context.Context, code sip.StatusCode, status string) { +func (c *sipInbound) sendStatus(ctx context.Context, result Result, headers map[string]string) { ctx = context.WithoutCancel(ctx) if c.inviteOk != nil { return // call already established @@ -2050,13 +2123,13 @@ func (c *sipInbound) sendStatus(ctx context.Context, code sip.StatusCode, status ctx, span := Tracer.Start(ctx, "sip.inbound.sendStatus") defer span.End() - if status == "" { - status = sipStatus(code) - } - r := sip.NewResponseFromRequest(c.invite, code, status, nil) + r := result.NewResponse(c.invite) for k, v := range c.fillHeaders(nil) { r.AppendHeader(sip.NewHeader(k, v)) } + for k, v := range headers { + r.AppendHeader(sip.NewHeader(k, v)) + } _ = c.inviteTx.Respond(r) c.drop() } @@ -2144,19 +2217,22 @@ func (c *sipInbound) handleNotify(req *sip.Request, tx sip.ServerTransaction) er // Close the inbound call cleanly. Depending on the call state it either sends BYE or terminates INVITE with busy status. func (c *sipInbound) Close(ctx context.Context) { ctx = context.WithoutCancel(ctx) - c.CloseWithStatus(ctx, sip.StatusBusyHere, "Rejected") + c.CloseWithStatus(ctx, Result{ + Code: sip.StatusBusyHere, + Status: "Rejected", + }, nil) } // CloseWithStatus the inbound call cleanly. Depending on the call state it either sends BYE or terminates INVITE with a specified status. -func (c *sipInbound) CloseWithStatus(ctx context.Context, code sip.StatusCode, status string) { +func (c *sipInbound) CloseWithStatus(ctx context.Context, result Result, headers map[string]string) { ctx = context.WithoutCancel(ctx) c.mu.Lock() defer c.mu.Unlock() if c.inviteOk != nil { // TODO: add cause for a failure, if any - c.sendBye(ctx) + c.sendBye(ctx, headers) } else if c.inviteTx != nil { - c.sendStatus(ctx, code, status) + c.sendStatus(ctx, result, headers) } else { c.drop() } diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 991e55df..d474459a 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -149,14 +149,22 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi IgnorePreanswerData: true, }, RoomSampleRate) if err != nil { - call.close(ctx, fmt.Errorf("media failed: %w", err), callDropped, stats.ServerError("media-failed"), livekit.DisconnectReason_UNKNOWN_REASON) + call.close(ctx, EndCall{ + Report: fmt.Errorf("media failed: %w", err), + Status: callDropped, + Term: stats.ServerError("media-failed"), + }) return nil, err } call.media.SetDTMFAudio(conf.AudioDTMF) call.media.EnableTimeout(false) call.media.DisableOut() // disabled until we get 200 if err := call.connectToRoom(ctx, room, c.getRoom); err != nil { - call.close(ctx, fmt.Errorf("room join failed: %w", err), callDropped, stats.ServerError("join-failed"), livekit.DisconnectReason_UNKNOWN_REASON) + call.close(ctx, EndCall{ + Report: fmt.Errorf("room join failed: %w", err), + Status: callDropped, + Term: stats.ServerError("join-failed"), + }) return nil, psrpc.NewError(psrpc.Internal, fmt.Errorf("update room failed: %w", err)) } @@ -258,7 +266,11 @@ func (c *outboundCall) waitClose(ctx context.Context, tid traceid.ID) error { c.state.ForceFlush() case <-c.Disconnected(): term := terminationFromRoomDisconnect(c.lkRoom.ClosedReason()) - c.CloseWithReason(ctx, callDropped, term, livekit.DisconnectReason_CLIENT_INITIATED) + c.CloseWith(ctx, EndCall{ + Status: callDropped, + Term: term, + Reason: livekit.DisconnectReason_CLIENT_INITIATED, + }) return nil case <-c.media.Timeout(): c.closeWithTimeout(ctx) @@ -294,62 +306,86 @@ func (c *outboundCall) Disconnected() <-chan struct{} { func (c *outboundCall) Close(ctx context.Context) error { c.mu.Lock() defer c.mu.Unlock() - c.close(ctx, nil, callDropped, stats.ServerError("shutdown"), livekit.DisconnectReason_SERVER_SHUTDOWN) + c.close(ctx, EndCall{ + Status: callDropped, + Term: stats.ServerError("shutdown"), + Reason: livekit.DisconnectReason_SERVER_SHUTDOWN, + }) return nil } -func (c *outboundCall) CloseWithReason(ctx context.Context, status CallStatus, t stats.Termination, reason livekit.DisconnectReason) { +func (c *outboundCall) CloseWith(ctx context.Context, end EndCall) bool { c.mu.Lock() defer c.mu.Unlock() - c.close(ctx, nil, status, t, reason) + return c.close(ctx, end) } func (c *outboundCall) closeWithTimeout(ctx context.Context) { c.mu.Lock() defer c.mu.Unlock() - c.close(ctx, psrpc.NewErrorf(psrpc.DeadlineExceeded, "media-timeout"), callDropped, stats.Indeterminate("media-timeout"), livekit.DisconnectReason_UNKNOWN_REASON) + c.close(ctx, EndCall{ + Report: psrpc.NewErrorf(psrpc.DeadlineExceeded, "media-timeout"), + Status: callDropped, + Term: stats.Indeterminate("media-timeout"), + Reason: livekit.DisconnectReason_UNKNOWN_REASON, + }) +} + +// EndCall implements CallInterface. +func (c *outboundCall) EndCall(ctx context.Context, headers map[string]string) error { + if !c.CloseWith(ctx, EndCall{ + Status: CallHangup, + Term: stats.Success("rpc"), + Reason: livekit.DisconnectReason_CLIENT_INITIATED, + Headers: headers, + }) { + return errors.New("call already ended") + } + return nil } func (c *outboundCall) printStats() { c.stats.Log(c.log, c.callStart) } -func (c *outboundCall) close(ctx context.Context, err error, status CallStatus, t stats.Termination, reason livekit.DisconnectReason) { +func (c *outboundCall) close(ctx context.Context, end EndCall) bool { c.closing.Break() ctx = context.WithoutCancel(ctx) + done := false c.stopped.Once(func() { + done = true c.stats.Closed.Store(true) - log := c.log.WithValues("status", status, "result", string(t.Result), "reason", t.Reason) + log := c.log.WithValues("status", end.Status, "result", string(end.Term.Result), "reason", end.Term.Reason) defer func() { c.stats.Update() c.printStats() c.sigTs.Log(log) }() - c.setStatus(status) - if err != nil { - log.Warnw("Closing outbound call with error", nil) + c.setStatus(end.Status) + if err := end.Report; err != nil { + log.Warnw("Closing outbound call with error", err) } else { log.Infow("Closing outbound call") } c.state.Update(func(info *livekit.SIPCallInfo) { - if err != nil && info.Error == "" { + if err := end.Report; err != nil && info.Error == "" { info.Error = err.Error() info.CallStatus = livekit.SIPCallStatus_SCS_ERROR } - info.DisconnectReason = reason + info.DisconnectReason = end.Reason }) // Send BYE _before_ closing media/room connection. // This ensures participant attributes are still available for // attributes_to_headers mapping in the setHeaders callback. // See: https://github.com/livekit/sip/issues/404 - c.stopSIP(ctx, t) + c.stopSIP(ctx, end.Term, end.Headers) c.media.Close() if r := c.lkRoom; r != nil { _ = r.CloseOutput() - _ = r.CloseWithReason(status.DisconnectReason()) + _ = r.CloseWithReason(end.Status.DisconnectReason()) } c.lkRoomIn = nil @@ -372,10 +408,11 @@ func (c *outboundCall) close(ctx context.Context, err error, status CallStatus, ProjectID: c.projectID, CallID: callID, SipCallID: c.cc.SIPCallID(), - }, state, t.Reason) + }, state, end.Term.Reason) }(c.tid) } }) + return done } func (c *outboundCall) Participant() ParticipantInfo { @@ -392,7 +429,7 @@ func (c *outboundCall) connectSIP(ctx context.Context, tid traceid.ID) error { if err := c.dialSIP(ctx, tid); err != nil { c.log.Infow("SIP call failed", "error", err) res := classifyInviteError(err) - c.close(ctx, res.reportErr, res.status, res.term, res.reason) + c.close(ctx, res.EndCall) return res.returnErr } c.connectMedia() @@ -434,6 +471,9 @@ func (c *outboundCall) connectToRoom(ctx context.Context, lkNew RoomConfig, getR if err := registerSignalingRPC(c.lkRoom, c.cc); err != nil { return err } + if err := registerCallRPC(c.lkRoom, c); err != nil { + return err + } return nil } @@ -517,7 +557,7 @@ func sipResponse(ctx context.Context, tx sip.ClientTransaction, stop <-chan stru } } -func (c *outboundCall) stopSIP(ctx context.Context, t stats.Termination) { +func (c *outboundCall) stopSIP(ctx context.Context, t stats.Termination, headers map[string]string) { termCtx, cancel := context.WithCancel(context.Background()) // Do not use ctx defer cancel() go func() { @@ -531,7 +571,7 @@ func (c *outboundCall) stopSIP(ctx context.Context, t stats.Termination) { }() c.mon.CallTerminate(t) - c.cc.Close(ctx) + c.cc.Close(ctx, headers) } func (c *outboundCall) setStatus(v CallStatus) { @@ -641,7 +681,7 @@ func (c *outboundCall) sipSignal(ctx context.Context, tid traceid.ID) error { } else { c.mon.InviteError("other") } - c.cc.Close(ctx) + c.cc.Close(ctx, nil) c.log.Infow("SIP invite failed", "error", err) return err } @@ -738,7 +778,11 @@ func (c *outboundCall) transferCall(ctx context.Context, transferTo string, head // Give time for the peer to hang up first, but hang up ourselves if this doesn't happen within 1 second time.AfterFunc(referByeTimeout, func() { - c.CloseWithReason(ctx, CallHangup, stats.Success("call transferred"), livekit.DisconnectReason_CLIENT_INITIATED) + c.CloseWith(ctx, EndCall{ + Status: CallHangup, + Term: stats.Success("call transferred"), + Reason: livekit.DisconnectReason_CLIENT_INITIATED, + }) }) return nil @@ -1095,7 +1139,7 @@ func (c *sipOutbound) setCSeq(req *sip.Request) { c.nextCSeq++ } -func (c *sipOutbound) sendBye(ctx context.Context) { +func (c *sipOutbound) sendBye(ctx context.Context, headers map[string]string) { ctx = context.WithoutCancel(ctx) if c.invite == nil || c.inviteOk == nil { return // call wasn't established @@ -1109,6 +1153,9 @@ func (c *sipOutbound) sendBye(ctx context.Context) { r.AppendHeader(sip.NewHeader(k, v)) } } + for k, v := range headers { + r.AppendHeader(sip.NewHeader(k, v)) + } if c.c.closing.IsBroken() { // do not wait for a response _ = c.WriteRequest(r) @@ -1119,7 +1166,7 @@ func (c *sipOutbound) sendBye(ctx context.Context) { sendAndACK(ctx, c, r) } -func (c *sipOutbound) sendCancel(ctx context.Context) { +func (c *sipOutbound) sendCancel(ctx context.Context, headers map[string]string) { ctx = context.WithoutCancel(ctx) if c.invite == nil { return @@ -1133,6 +1180,9 @@ func (c *sipOutbound) sendCancel(ctx context.Context) { r.AppendHeader(sip.NewHeader(k, v)) } } + for k, v := range headers { + r.AppendHeader(sip.NewHeader(k, v)) + } _ = c.WriteRequest(r) c.drop() } @@ -1219,14 +1269,14 @@ func (c *sipOutbound) handleNotify(req *sip.Request, tx sip.ServerTransaction) e } } -func (c *sipOutbound) Close(ctx context.Context) { +func (c *sipOutbound) Close(ctx context.Context, headers map[string]string) { ctx = context.WithoutCancel(ctx) c.mu.Lock() defer c.mu.Unlock() if c.inviteOk != nil { - c.sendBye(ctx) + c.sendBye(ctx, headers) } else if c.invite != nil { - c.sendCancel(ctx) + c.sendCancel(ctx, headers) } else { c.drop() } diff --git a/pkg/sip/outbound_utilities_test.go b/pkg/sip/outbound_utilities_test.go index 57029cbe..9aa3929c 100644 --- a/pkg/sip/outbound_utilities_test.go +++ b/pkg/sip/outbound_utilities_test.go @@ -265,8 +265,8 @@ func (r *testRoom) NewTrack() *mixer.Input { return r.room.NewTrack() } -func (r *testRoom) RegisterRPC(method string, handler lksdk.RpcHandlerFunc) error { - return r.room.RegisterRPC(method, handler) +func (r *testRoom) RegisterRpcCtxMethod(method string, handler lksdk.RpcHandlerCtxFunc) error { + return r.room.RegisterRpcCtxMethod(method, handler) } type testSIPClientTransaction struct { diff --git a/pkg/sip/participant.go b/pkg/sip/participant.go index 661bf65b..fa59414c 100644 --- a/pkg/sip/participant.go +++ b/pkg/sip/participant.go @@ -18,8 +18,6 @@ import ( "time" "github.com/livekit/protocol/livekit" - "github.com/livekit/sipgo/sip" - "github.com/livekit/sip/pkg/stats" ) @@ -131,17 +129,6 @@ func (v CallStatus) DisconnectReason() livekit.DisconnectReason { } } -func (v CallStatus) SIPStatus() (sip.StatusCode, string) { - switch v { - case callMediaFailed: - return sip.StatusNotAcceptableHere, "MediaFailed" - case CallCancelled: - return sip.StatusRequestTerminated, "Request Terminated" - default: - return sip.StatusBusyHere, "Rejected" - } -} - const ( callDropped = CallStatus(iota) callFlood diff --git a/pkg/sip/protocol.go b/pkg/sip/protocol.go index bb1ab583..c02d804d 100644 --- a/pkg/sip/protocol.go +++ b/pkg/sip/protocol.go @@ -26,6 +26,7 @@ import ( "github.com/livekit/protocol/livekit" "github.com/livekit/psrpc" + "github.com/livekit/sip/pkg/stats" "github.com/livekit/sipgo/sip" "github.com/livekit/sip/pkg/config" @@ -40,6 +41,29 @@ var ( referIdRegexp = regexp.MustCompile(`^refer(;id=(\d+))?$`) ) +type Result struct { + Code sip.StatusCode + Status string +} + +func (r Result) NewResponse(req *sip.Request) *sip.Response { + if r.Code == 0 { + r.Code = sip.StatusServiceUnavailable + } + if r.Status == "" { + r.Status = sipStatus(r.Code) + } + return sip.NewResponseFromRequest(req, r.Code, r.Status, nil) +} + +type EndCall struct { + Report error // reported to LiveKit analytics + Status CallStatus // TODO: legacy + Term stats.Termination + Reason livekit.DisconnectReason // disconnect reason for LiveKit participant + Headers map[string]string // extra headers to send to SIP peer +} + var statusNamesMap = map[int]string{ 100: "Trying", 180: "Ringing", diff --git a/pkg/sip/room.go b/pkg/sip/room.go index a7f00613..d58f06a0 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -163,7 +163,7 @@ type RoomInterface interface { NewParticipantTrack(sampleRate int) (msdk.WriteCloser[msdk.PCM16Sample], error) SendData(data lksdk.DataPacket, opts ...lksdk.DataPublishOption) error NewTrack() *mixer.Input - RegisterRPC(method string, handler lksdk.RpcHandlerFunc) error + lksdk.RoomRPCInterface } type GetRoomFunc func(log logger.Logger, st *RoomStats) RoomInterface @@ -472,8 +472,8 @@ func (r *Room) Connect(ctx context.Context, conf *config.Config, rconf RoomConfi return nil } -func (r *Room) RegisterRPC(method string, handler lksdk.RpcHandlerFunc) error { - return r.room.RegisterRpcMethod(method, handler) +func (r *Room) RegisterRpcCtxMethod(method string, handler lksdk.RpcHandlerCtxFunc) error { + return r.room.RegisterRpcCtxMethod(method, handler) } func (r *Room) Subscribe() { diff --git a/pkg/sip/rpc.go b/pkg/sip/rpc.go index 0e4b4953..af11dd7d 100644 --- a/pkg/sip/rpc.go +++ b/pkg/sip/rpc.go @@ -1,38 +1,13 @@ package sip import ( - "encoding/json" + "context" "strings" + "github.com/livekit/protocol/livekit/roomrpc/siprpc" lksdk "github.com/livekit/server-sdk-go/v2" ) -func registerRPCMethodJSON[Req any, Resp any](r RoomInterface, name string, fnc func(r Req) (Resp, error)) error { - return r.RegisterRPC(name, func(data lksdk.RpcInvocationData) (string, error) { - var req Req - if len(data.Payload) != 0 { - if err := json.Unmarshal([]byte(data.Payload), &req); err != nil { - return "", err - } - } - resp, err := fnc(req) - if err != nil { - return "", err - } - out, err := json.Marshal(&resp) - return string(out), err - }) -} - -type getHeadersReq struct { - Include []string `json:"include,omitempty"` // list of headers to include - Exclude []string `json:"exclude,omitempty"` // list of headers to exclude -} - -type getHeadersResp struct { - Headers map[string]string `json:"headers"` -} - // getHeadersIgnore is a set of header that are filtered out of the participant RPC response. var getHeadersIgnore = map[string]struct{}{ "route": {}, @@ -54,7 +29,7 @@ var getHeadersIgnore = map[string]struct{}{ "max-forwards": {}, } -func rpcGetHeaders(cc Signaling, r getHeadersReq) (getHeadersResp, error) { +func rpcGetHeaders(ctx context.Context, cc Signaling, r *siprpc.GetRemoteHeadersV1Request) (*siprpc.GetRemoteHeadersV1Response, error) { var ( only map[string]struct{} skip map[string]struct{} @@ -85,14 +60,36 @@ func rpcGetHeaders(cc Signaling, r getHeadersReq) (getHeadersResp, error) { } out[h.Name()] = h.Value() } - return getHeadersResp{ + return &siprpc.GetRemoteHeadersV1Response{ Headers: out, }, nil } +func rpcEndCall(ctx context.Context, call CallInterface, r *siprpc.EndCallV1Request) (*siprpc.EndCallV1Response, error) { + err := call.EndCall(ctx, r.Headers) + if err != nil { + return nil, err + } + return &siprpc.EndCallV1Response{}, nil +} + func registerSignalingRPC(r RoomInterface, cc Signaling) error { - err := registerRPCMethodJSON(r, "lk.sip.GetRemoteHeaders", func(r getHeadersReq) (getHeadersResp, error) { - return rpcGetHeaders(cc, r) + err := lksdk.RegisterRPCMethodJSON(r, "lk.sip.GetRemoteHeaders", func(ctx context.Context, r *siprpc.GetRemoteHeadersV1Request) (*siprpc.GetRemoteHeadersV1Response, error) { + return rpcGetHeaders(ctx, cc, r) + }) + if err != nil { + return err + } + return nil +} + +type CallInterface interface { + EndCall(ctx context.Context, headers map[string]string) error +} + +func registerCallRPC(r RoomInterface, call CallInterface) error { + err := lksdk.RegisterRPCMethodJSON(r, "lk.sip.EndCall", func(ctx context.Context, r *siprpc.EndCallV1Request) (*siprpc.EndCallV1Response, error) { + return rpcEndCall(ctx, call, r) }) if err != nil { return err