Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ type Config struct {
AudioDTMF bool `yaml:"audio_dtmf"`
EnableJitterBuffer bool `yaml:"enable_jitter_buffer"`
EnableJitterBufferProb float64 `yaml:"enable_jitter_buffer_prob"`
// MixerInputBufferFrames is the number of frames the SIP audio mixer buffers per input (each frame = 20ms).
// 0 (default) keeps the library default (5 frames / 100ms). Raising it (e.g. 15 = 300ms) absorbs bursty input
// (e.g. realtime speech models) that starves the small default and causes choppy audio, at the cost of
// ~frames*20ms added agent->callee latency.
MixerInputBufferFrames int `yaml:"mixer_input_buffer_frames"`

// internal
ServiceName string `yaml:"-"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewClient(region string, conf *config.Config, log logger.Logger, mon *stats
mon: mon,
getStateHandler: getStateHandler,
getSipClient: DefaultGetSipClientFunc,
getRoom: DefaultGetRoomFunc,
getRoom: getRoomFuncForConfig(conf),
activeCalls: make(map[LocalTag]*outboundCall),
}
for _, option := range options {
Expand Down
21 changes: 11 additions & 10 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,16 +1058,17 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, mconf *sipM
logSignalChanges := false
logSignalChanges, _ = strconv.ParseBool(featureFlags[signalLoggingFeatureFlag])
mp, err := NewMediaPort(tid, c.log(), c.mon, &MediaOptions{
IP: c.s.sconf.MediaIP,
Ports: conf.RTPPort,
MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial,
MediaTimeout: mconf.MediaTimeout,
SymmetricRTP: conf.SymmetricRTP,
IgnoreLocalAddrInSDP: c.s.conf.IgnoreLocalAddrInSDP,
EnableJitterBuffer: c.jitterBuf,
LogSignalChanges: logSignalChanges,
Stats: &c.stats.Port,
NoInputResample: !RoomResample,
IP: c.s.sconf.MediaIP,
Ports: conf.RTPPort,
MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial,
MediaTimeout: mconf.MediaTimeout,
SymmetricRTP: conf.SymmetricRTP,
IgnoreLocalAddrInSDP: c.s.conf.IgnoreLocalAddrInSDP,
EnableJitterBuffer: c.jitterBuf,
LogSignalChanges: logSignalChanges,
MixerInputBufferFrames: conf.MixerInputBufferFrames,
Stats: &c.stats.Port,
NoInputResample: !RoomResample,
}, RoomSampleRate)
if err != nil {
return nil, err
Expand Down
26 changes: 14 additions & 12 deletions pkg/sip/media_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,17 +332,18 @@ type MediaConf struct {
}

type MediaOptions struct {
IP netip.Addr
Ports rtcconfig.PortRange
MediaTimeoutInitial time.Duration
MediaTimeout time.Duration
SymmetricRTP bool
IgnoreLocalAddrInSDP bool // enable symmetric RTP if local IP is specified in SDP
Stats *PortStats
EnableJitterBuffer bool
NoInputResample bool
IgnorePreanswerData bool
LogSignalChanges bool
IP netip.Addr
Ports rtcconfig.PortRange
MediaTimeoutInitial time.Duration
MediaTimeout time.Duration
SymmetricRTP bool
IgnoreLocalAddrInSDP bool // enable symmetric RTP if local IP is specified in SDP
Stats *PortStats
EnableJitterBuffer bool
NoInputResample bool
IgnorePreanswerData bool
LogSignalChanges bool
MixerInputBufferFrames int
}

func NewMediaPort(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, opts *MediaOptions, sampleRate int) (*MediaPort, error) {
Expand Down Expand Up @@ -872,7 +873,8 @@ func (p *MediaPort) setupOutput(tid traceid.ID) error {
if p.dtmfAudioEnabled {
// Add separate mixer for DTMF audio.
// TODO: optimize, if we'll ever need this code path
mix, err := mixer.NewMixer(audioOut, rtp.DefFrameDur, 1, mixer.WithOutputChannel())
mixerOpts := withMixerInputBufferFrames(p.opts.MixerInputBufferFrames, mixer.WithOutputChannel())
mix, err := mixer.NewMixer(audioOut, rtp.DefFrameDur, 1, mixerOpts...)
if err != nil {
return err
}
Expand Down
118 changes: 118 additions & 0 deletions pkg/sip/media_port_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math"
"net"
"net/netip"
"reflect"
"slices"
"strconv"
"strings"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/stretchr/testify/require"

msdk "github.com/livekit/media-sdk"
"github.com/livekit/media-sdk/mixer"
"github.com/livekit/media-sdk/rtp"
"github.com/livekit/media-sdk/sdp"
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
Expand Down Expand Up @@ -165,6 +167,122 @@ func newIP(v string) netip.Addr {
return ip
}

func TestConfigMixerInputBufferFrames(t *testing.T) {
t.Run("parsed", func(t *testing.T) {
conf, err := config.NewConfig(`
redis:
address: localhost:6379
mixer_input_buffer_frames: 15
`)
require.NoError(t, err)
require.Equal(t, 15, conf.MixerInputBufferFrames)
})

t.Run("default", func(t *testing.T) {
conf, err := config.NewConfig(`
redis:
address: localhost:6379
`)
require.NoError(t, err)
require.Zero(t, conf.MixerInputBufferFrames)
})
}

func TestRoomMixerInputBufferFrames(t *testing.T) {
t.Run("default", func(t *testing.T) {
room := NewRoom(logger.NewTestLogger(t), nil)
t.Cleanup(func() { require.NoError(t, room.Close()) })

require.Equal(t, mixer.DefaultInputBufferFrames, mixerInputBufferFrames(t, room.mix))
})

t.Run("configured", func(t *testing.T) {
room := NewRoom(logger.NewTestLogger(t), nil, WithRoomMixerInputBufferFrames(15))
t.Cleanup(func() { require.NoError(t, room.Close()) })

require.Equal(t, 15, mixerInputBufferFrames(t, room.mix))
})
}

func TestConfiguredRoomFactoryMixerInputBufferFrames(t *testing.T) {
room, ok := getRoomFuncForConfig(&config.Config{
MixerInputBufferFrames: 15,
})(logger.NewTestLogger(t), nil).(*Room)
require.True(t, ok)
t.Cleanup(func() { require.NoError(t, room.Close()) })

require.Equal(t, 15, mixerInputBufferFrames(t, room.mix))
}

func TestMediaPortDTMFMixerInputBufferFrames(t *testing.T) {
const frames = 15
const rate = 16000

c1, c2 := newUDPPipe()
log := logger.NewTestLogger(t)

caller, err := NewMediaPortWith(1, log.WithName("caller"), newTestCallMonitor(t), c1, &MediaOptions{
IP: newIP("1.1.1.1"),
Ports: rtcconfig.PortRange{Start: 10000},
MixerInputBufferFrames: frames,
}, rate)
require.NoError(t, err)
t.Cleanup(caller.Close)
caller.SetDTMFAudio(true)

callee, err := NewMediaPortWith(2, log.WithName("callee"), newTestCallMonitor(t), c2, &MediaOptions{
IP: newIP("2.2.2.2"),
Ports: rtcconfig.PortRange{Start: 20000},
}, rate)
require.NoError(t, err)
t.Cleanup(callee.Close)

offer, err := caller.NewOffer(defaultCodecs, sdp.EncryptionNone)
require.NoError(t, err)
offerData, err := offer.SDP.Marshal()
require.NoError(t, err)

answer, _, err := callee.SetOffer(offerData, defaultCodecs, sdp.EncryptionNone)
require.NoError(t, err)
answerData, err := answer.SDP.Marshal()
require.NoError(t, err)

callerConf, _, err := caller.SetAnswer(offer, answerData, defaultCodecs, sdp.EncryptionNone)
require.NoError(t, err)
require.NoError(t, caller.SetConfig(callerConf))
require.NotNil(t, caller.dtmfOutAudio)

require.Equal(t, frames, mixerInputBufferFramesFromInput(t, caller.dtmfOutAudio))
}

func mixerInputBufferFrames(t testing.TB, mix *mixer.Mixer) int {
t.Helper()

v := reflect.ValueOf(mix)
require.Equal(t, reflect.Ptr, v.Kind())
require.False(t, v.IsNil())

field := v.Elem().FieldByName("inputBufferFrames")
require.True(t, field.IsValid())
return int(field.Int())
}

func mixerInputBufferFramesFromInput(t testing.TB, w msdk.PCM16Writer) int {
t.Helper()

v := reflect.ValueOf(w)
require.Equal(t, reflect.Ptr, v.Kind())
require.False(t, v.IsNil())

mix := v.Elem().FieldByName("m")
require.True(t, mix.IsValid())
require.False(t, mix.IsNil())

field := mix.Elem().FieldByName("inputBufferFrames")
require.True(t, field.IsValid())
return int(field.Int())
}

func TestMediaPort(t *testing.T) {
// Main resampler has unpredictable (although tiny) output delay
// and other randomness in the generated samples.
Expand Down
23 changes: 12 additions & 11 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,18 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi
var err error

call.media, err = NewMediaPort(tid, call.log, call.mon, &MediaOptions{
IP: c.sconf.MediaIP,
Ports: conf.RTPPort,
MediaTimeoutInitial: c.conf.MediaTimeoutInitial,
MediaTimeout: sipConf.mediaConfig.MediaTimeout,
SymmetricRTP: c.conf.SymmetricRTP,
IgnoreLocalAddrInSDP: c.conf.IgnoreLocalAddrInSDP,
EnableJitterBuffer: call.jitterBuf,
LogSignalChanges: signalLoggingEnabled,
Stats: &call.stats.Port,
NoInputResample: !RoomResample,
IgnorePreanswerData: true,
IP: c.sconf.MediaIP,
Ports: conf.RTPPort,
MediaTimeoutInitial: c.conf.MediaTimeoutInitial,
MediaTimeout: sipConf.mediaConfig.MediaTimeout,
SymmetricRTP: c.conf.SymmetricRTP,
IgnoreLocalAddrInSDP: c.conf.IgnoreLocalAddrInSDP,
EnableJitterBuffer: call.jitterBuf,
LogSignalChanges: signalLoggingEnabled,
MixerInputBufferFrames: conf.MixerInputBufferFrames,
Stats: &call.stats.Port,
NoInputResample: !RoomResample,
IgnorePreanswerData: true,
}, RoomSampleRate)
if err != nil {
call.close(ctx, fmt.Errorf("media failed: %w", err), callDropped, stats.ServerError("media-failed"), livekit.DisconnectReason_UNKNOWN_REASON)
Expand Down
38 changes: 36 additions & 2 deletions pkg/sip/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,23 @@ func DefaultGetRoomFunc(log logger.Logger, st *RoomStats) RoomInterface {
return NewRoom(log, st)
}

func getRoomFuncForConfig(conf *config.Config) GetRoomFunc {
if conf == nil || conf.MixerInputBufferFrames <= 0 {
return DefaultGetRoomFunc
}
mixerInputBufferFrames := conf.MixerInputBufferFrames
return func(log logger.Logger, st *RoomStats) RoomInterface {
return NewRoom(log, st, WithRoomMixerInputBufferFrames(mixerInputBufferFrames))
}
}

func withMixerInputBufferFrames(frames int, opts ...mixer.MixerOptions) []mixer.MixerOptions {
if frames > 0 {
opts = append(opts, mixer.WithInputBufferFrames(frames))
}
return opts
}

type Room struct {
log logger.Logger
roomLog logger.Logger // deferred logger
Expand Down Expand Up @@ -206,14 +223,31 @@ type RoomConfig struct {
LogSignalChanges bool
}

func NewRoom(log logger.Logger, st *RoomStats) *Room {
type RoomOptions struct {
MixerInputBufferFrames int
}

type RoomOption func(*RoomOptions)

func WithRoomMixerInputBufferFrames(frames int) RoomOption {
return func(opts *RoomOptions) {
opts.MixerInputBufferFrames = frames
}
}

func NewRoom(log logger.Logger, st *RoomStats, options ...RoomOption) *Room {
if st == nil {
st = &RoomStats{}
}
opts := RoomOptions{}
for _, option := range options {
option(&opts)
}
r := &Room{log: log, stats: st, out: msdk.NewSwitchWriter(RoomSampleRate)}

var err error
r.mix, err = mixer.NewMixer(r.out, rtp.DefFrameDur, 1, mixer.WithStats(&st.Mixer), mixer.WithOutputChannel())
mixerOpts := withMixerInputBufferFrames(opts.MixerInputBufferFrames, mixer.WithStats(&st.Mixer), mixer.WithOutputChannel())
r.mix, err = mixer.NewMixer(r.out, rtp.DefFrameDur, 1, mixerOpts...)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func NewServer(region string, conf *config.Config, log logger.Logger, mon *stats
region: region,
mon: mon,
getStateHandler: getStateHandler,
getRoom: DefaultGetRoomFunc,
getRoom: getRoomFuncForConfig(conf),
byLocalTag: make(map[LocalTag]*inboundCall),
provisionalInvites: expirable.NewLRU[[2]string, LocalTag](maxCallCache, nil, callCacheTTL),
}
Expand Down