From e37c27ac22daef3b3dedc175f358911dc07f2325 Mon Sep 17 00:00:00 2001 From: Deep Shah <217648652+dshah1333@users.noreply.github.com> Date: Fri, 19 Jun 2026 15:06:15 -0700 Subject: [PATCH] Make the SIP audio mixer input buffer size configurable The mixer's per-input buffer defaults to media-sdk's 5 frames (100 ms), which can be too small for bursty inputs: chunks larger than the buffer overflow on arrival and underrun between them, restarting the mixer and injecting silence. Add a `mixer_input_buffer_frames` config option that passes `mixer.WithInputBufferFrames(n)` to the room and DTMF mixers when n > 0. Default unchanged (0 = library default), so existing deployments are unaffected. --- pkg/config/config.go | 5 ++ pkg/sip/client.go | 2 +- pkg/sip/inbound.go | 21 +++---- pkg/sip/media_port.go | 26 ++++---- pkg/sip/media_port_test.go | 118 +++++++++++++++++++++++++++++++++++++ pkg/sip/outbound.go | 23 ++++---- pkg/sip/room.go | 38 +++++++++++- pkg/sip/server.go | 2 +- 8 files changed, 198 insertions(+), 37 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 585b6dde..0e57f71e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -127,6 +127,11 @@ type Config struct { AudioDTMF bool `yaml:"audio_dtmf"` EnableJitterBuffer bool `yaml:"enable_jitter_buffer"` EnableJitterBufferProb float64 `yaml:"enable_jitter_buffer_prob"` + // MixerInputBufferFrames is the number of frames the SIP audio mixer buffers per input (each frame = 20ms). + // 0 (default) keeps the library default (5 frames / 100ms). Raising it (e.g. 15 = 300ms) absorbs bursty input + // (e.g. realtime speech models) that starves the small default and causes choppy audio, at the cost of + // ~frames*20ms added agent->callee latency. + MixerInputBufferFrames int `yaml:"mixer_input_buffer_frames"` // internal ServiceName string `yaml:"-"` diff --git a/pkg/sip/client.go b/pkg/sip/client.go index fabc964a..4c5cf8ef 100644 --- a/pkg/sip/client.go +++ b/pkg/sip/client.go @@ -101,7 +101,7 @@ func NewClient(region string, conf *config.Config, log logger.Logger, mon *stats mon: mon, getStateHandler: getStateHandler, getSipClient: DefaultGetSipClientFunc, - getRoom: DefaultGetRoomFunc, + getRoom: getRoomFuncForConfig(conf), activeCalls: make(map[LocalTag]*outboundCall), } for _, option := range options { diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 2d5f7bd9..d64d2e1c 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -1058,16 +1058,17 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, mconf *sipM logSignalChanges := false logSignalChanges, _ = strconv.ParseBool(featureFlags[signalLoggingFeatureFlag]) mp, err := NewMediaPort(tid, c.log(), c.mon, &MediaOptions{ - IP: c.s.sconf.MediaIP, - Ports: conf.RTPPort, - MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial, - MediaTimeout: mconf.MediaTimeout, - SymmetricRTP: conf.SymmetricRTP, - IgnoreLocalAddrInSDP: c.s.conf.IgnoreLocalAddrInSDP, - EnableJitterBuffer: c.jitterBuf, - LogSignalChanges: logSignalChanges, - Stats: &c.stats.Port, - NoInputResample: !RoomResample, + IP: c.s.sconf.MediaIP, + Ports: conf.RTPPort, + MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial, + MediaTimeout: mconf.MediaTimeout, + SymmetricRTP: conf.SymmetricRTP, + IgnoreLocalAddrInSDP: c.s.conf.IgnoreLocalAddrInSDP, + EnableJitterBuffer: c.jitterBuf, + LogSignalChanges: logSignalChanges, + MixerInputBufferFrames: conf.MixerInputBufferFrames, + Stats: &c.stats.Port, + NoInputResample: !RoomResample, }, RoomSampleRate) if err != nil { return nil, err diff --git a/pkg/sip/media_port.go b/pkg/sip/media_port.go index 00567906..1f34c3b1 100644 --- a/pkg/sip/media_port.go +++ b/pkg/sip/media_port.go @@ -332,17 +332,18 @@ type MediaConf struct { } type MediaOptions struct { - IP netip.Addr - Ports rtcconfig.PortRange - MediaTimeoutInitial time.Duration - MediaTimeout time.Duration - SymmetricRTP bool - IgnoreLocalAddrInSDP bool // enable symmetric RTP if local IP is specified in SDP - Stats *PortStats - EnableJitterBuffer bool - NoInputResample bool - IgnorePreanswerData bool - LogSignalChanges bool + IP netip.Addr + Ports rtcconfig.PortRange + MediaTimeoutInitial time.Duration + MediaTimeout time.Duration + SymmetricRTP bool + IgnoreLocalAddrInSDP bool // enable symmetric RTP if local IP is specified in SDP + Stats *PortStats + EnableJitterBuffer bool + NoInputResample bool + IgnorePreanswerData bool + LogSignalChanges bool + MixerInputBufferFrames int } func NewMediaPort(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, opts *MediaOptions, sampleRate int) (*MediaPort, error) { @@ -872,7 +873,8 @@ func (p *MediaPort) setupOutput(tid traceid.ID) error { if p.dtmfAudioEnabled { // Add separate mixer for DTMF audio. // TODO: optimize, if we'll ever need this code path - mix, err := mixer.NewMixer(audioOut, rtp.DefFrameDur, 1, mixer.WithOutputChannel()) + mixerOpts := withMixerInputBufferFrames(p.opts.MixerInputBufferFrames, mixer.WithOutputChannel()) + mix, err := mixer.NewMixer(audioOut, rtp.DefFrameDur, 1, mixerOpts...) if err != nil { return err } diff --git a/pkg/sip/media_port_test.go b/pkg/sip/media_port_test.go index f8d5823c..e3256d5b 100644 --- a/pkg/sip/media_port_test.go +++ b/pkg/sip/media_port_test.go @@ -20,6 +20,7 @@ import ( "math" "net" "net/netip" + "reflect" "slices" "strconv" "strings" @@ -31,6 +32,7 @@ import ( "github.com/stretchr/testify/require" msdk "github.com/livekit/media-sdk" + "github.com/livekit/media-sdk/mixer" "github.com/livekit/media-sdk/rtp" "github.com/livekit/media-sdk/sdp" "github.com/livekit/mediatransportutil/pkg/rtcconfig" @@ -165,6 +167,122 @@ func newIP(v string) netip.Addr { return ip } +func TestConfigMixerInputBufferFrames(t *testing.T) { + t.Run("parsed", func(t *testing.T) { + conf, err := config.NewConfig(` +redis: + address: localhost:6379 +mixer_input_buffer_frames: 15 +`) + require.NoError(t, err) + require.Equal(t, 15, conf.MixerInputBufferFrames) + }) + + t.Run("default", func(t *testing.T) { + conf, err := config.NewConfig(` +redis: + address: localhost:6379 +`) + require.NoError(t, err) + require.Zero(t, conf.MixerInputBufferFrames) + }) +} + +func TestRoomMixerInputBufferFrames(t *testing.T) { + t.Run("default", func(t *testing.T) { + room := NewRoom(logger.NewTestLogger(t), nil) + t.Cleanup(func() { require.NoError(t, room.Close()) }) + + require.Equal(t, mixer.DefaultInputBufferFrames, mixerInputBufferFrames(t, room.mix)) + }) + + t.Run("configured", func(t *testing.T) { + room := NewRoom(logger.NewTestLogger(t), nil, WithRoomMixerInputBufferFrames(15)) + t.Cleanup(func() { require.NoError(t, room.Close()) }) + + require.Equal(t, 15, mixerInputBufferFrames(t, room.mix)) + }) +} + +func TestConfiguredRoomFactoryMixerInputBufferFrames(t *testing.T) { + room, ok := getRoomFuncForConfig(&config.Config{ + MixerInputBufferFrames: 15, + })(logger.NewTestLogger(t), nil).(*Room) + require.True(t, ok) + t.Cleanup(func() { require.NoError(t, room.Close()) }) + + require.Equal(t, 15, mixerInputBufferFrames(t, room.mix)) +} + +func TestMediaPortDTMFMixerInputBufferFrames(t *testing.T) { + const frames = 15 + const rate = 16000 + + c1, c2 := newUDPPipe() + log := logger.NewTestLogger(t) + + caller, err := NewMediaPortWith(1, log.WithName("caller"), newTestCallMonitor(t), c1, &MediaOptions{ + IP: newIP("1.1.1.1"), + Ports: rtcconfig.PortRange{Start: 10000}, + MixerInputBufferFrames: frames, + }, rate) + require.NoError(t, err) + t.Cleanup(caller.Close) + caller.SetDTMFAudio(true) + + callee, err := NewMediaPortWith(2, log.WithName("callee"), newTestCallMonitor(t), c2, &MediaOptions{ + IP: newIP("2.2.2.2"), + Ports: rtcconfig.PortRange{Start: 20000}, + }, rate) + require.NoError(t, err) + t.Cleanup(callee.Close) + + offer, err := caller.NewOffer(defaultCodecs, sdp.EncryptionNone) + require.NoError(t, err) + offerData, err := offer.SDP.Marshal() + require.NoError(t, err) + + answer, _, err := callee.SetOffer(offerData, defaultCodecs, sdp.EncryptionNone) + require.NoError(t, err) + answerData, err := answer.SDP.Marshal() + require.NoError(t, err) + + callerConf, _, err := caller.SetAnswer(offer, answerData, defaultCodecs, sdp.EncryptionNone) + require.NoError(t, err) + require.NoError(t, caller.SetConfig(callerConf)) + require.NotNil(t, caller.dtmfOutAudio) + + require.Equal(t, frames, mixerInputBufferFramesFromInput(t, caller.dtmfOutAudio)) +} + +func mixerInputBufferFrames(t testing.TB, mix *mixer.Mixer) int { + t.Helper() + + v := reflect.ValueOf(mix) + require.Equal(t, reflect.Ptr, v.Kind()) + require.False(t, v.IsNil()) + + field := v.Elem().FieldByName("inputBufferFrames") + require.True(t, field.IsValid()) + return int(field.Int()) +} + +func mixerInputBufferFramesFromInput(t testing.TB, w msdk.PCM16Writer) int { + t.Helper() + + v := reflect.ValueOf(w) + require.Equal(t, reflect.Ptr, v.Kind()) + require.False(t, v.IsNil()) + + mix := v.Elem().FieldByName("m") + require.True(t, mix.IsValid()) + require.False(t, mix.IsNil()) + + field := mix.Elem().FieldByName("inputBufferFrames") + require.True(t, field.IsValid()) + return int(field.Int()) +} + func TestMediaPort(t *testing.T) { // Main resampler has unpredictable (although tiny) output delay // and other randomness in the generated samples. diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 991e55df..c66cb990 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -136,17 +136,18 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi var err error call.media, err = NewMediaPort(tid, call.log, call.mon, &MediaOptions{ - IP: c.sconf.MediaIP, - Ports: conf.RTPPort, - MediaTimeoutInitial: c.conf.MediaTimeoutInitial, - MediaTimeout: sipConf.mediaConfig.MediaTimeout, - SymmetricRTP: c.conf.SymmetricRTP, - IgnoreLocalAddrInSDP: c.conf.IgnoreLocalAddrInSDP, - EnableJitterBuffer: call.jitterBuf, - LogSignalChanges: signalLoggingEnabled, - Stats: &call.stats.Port, - NoInputResample: !RoomResample, - IgnorePreanswerData: true, + IP: c.sconf.MediaIP, + Ports: conf.RTPPort, + MediaTimeoutInitial: c.conf.MediaTimeoutInitial, + MediaTimeout: sipConf.mediaConfig.MediaTimeout, + SymmetricRTP: c.conf.SymmetricRTP, + IgnoreLocalAddrInSDP: c.conf.IgnoreLocalAddrInSDP, + EnableJitterBuffer: call.jitterBuf, + LogSignalChanges: signalLoggingEnabled, + MixerInputBufferFrames: conf.MixerInputBufferFrames, + Stats: &call.stats.Port, + NoInputResample: !RoomResample, + IgnorePreanswerData: true, }, RoomSampleRate) if err != nil { call.close(ctx, fmt.Errorf("media failed: %w", err), callDropped, stats.ServerError("media-failed"), livekit.DisconnectReason_UNKNOWN_REASON) diff --git a/pkg/sip/room.go b/pkg/sip/room.go index a7f00613..be528fab 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -172,6 +172,23 @@ func DefaultGetRoomFunc(log logger.Logger, st *RoomStats) RoomInterface { return NewRoom(log, st) } +func getRoomFuncForConfig(conf *config.Config) GetRoomFunc { + if conf == nil || conf.MixerInputBufferFrames <= 0 { + return DefaultGetRoomFunc + } + mixerInputBufferFrames := conf.MixerInputBufferFrames + return func(log logger.Logger, st *RoomStats) RoomInterface { + return NewRoom(log, st, WithRoomMixerInputBufferFrames(mixerInputBufferFrames)) + } +} + +func withMixerInputBufferFrames(frames int, opts ...mixer.MixerOptions) []mixer.MixerOptions { + if frames > 0 { + opts = append(opts, mixer.WithInputBufferFrames(frames)) + } + return opts +} + type Room struct { log logger.Logger roomLog logger.Logger // deferred logger @@ -206,14 +223,31 @@ type RoomConfig struct { LogSignalChanges bool } -func NewRoom(log logger.Logger, st *RoomStats) *Room { +type RoomOptions struct { + MixerInputBufferFrames int +} + +type RoomOption func(*RoomOptions) + +func WithRoomMixerInputBufferFrames(frames int) RoomOption { + return func(opts *RoomOptions) { + opts.MixerInputBufferFrames = frames + } +} + +func NewRoom(log logger.Logger, st *RoomStats, options ...RoomOption) *Room { if st == nil { st = &RoomStats{} } + opts := RoomOptions{} + for _, option := range options { + option(&opts) + } r := &Room{log: log, stats: st, out: msdk.NewSwitchWriter(RoomSampleRate)} var err error - r.mix, err = mixer.NewMixer(r.out, rtp.DefFrameDur, 1, mixer.WithStats(&st.Mixer), mixer.WithOutputChannel()) + mixerOpts := withMixerInputBufferFrames(opts.MixerInputBufferFrames, mixer.WithStats(&st.Mixer), mixer.WithOutputChannel()) + r.mix, err = mixer.NewMixer(r.out, rtp.DefFrameDur, 1, mixerOpts...) if err != nil { panic(err) } diff --git a/pkg/sip/server.go b/pkg/sip/server.go index 9f1a43fb..58947c9e 100644 --- a/pkg/sip/server.go +++ b/pkg/sip/server.go @@ -211,7 +211,7 @@ func NewServer(region string, conf *config.Config, log logger.Logger, mon *stats region: region, mon: mon, getStateHandler: getStateHandler, - getRoom: DefaultGetRoomFunc, + getRoom: getRoomFuncForConfig(conf), byLocalTag: make(map[LocalTag]*inboundCall), provisionalInvites: expirable.NewLRU[[2]string, LocalTag](maxCallCache, nil, callCacheTTL), }