From dcbfae189212012a70e22d0faa7c3b1edfc56292 Mon Sep 17 00:00:00 2001 From: "kanimuthumaran.t" Date: Wed, 3 Jun 2026 15:45:53 +0530 Subject: [PATCH 1/4] Support Opus codec (optional). Adds opt-in Opus (opus/48000/2) for SIP media behind the enable_opus config flag (default off). When enabled, Opus is offered and preferred over G722/G711; when disabled it never appears in SDP offers and the existing G.711/G.722/DTMF paths are unchanged. The LiveKit/WebRTC opus path (pkg/media/opus/opus.go) is untouched; SIP-side encode/decode lives in a separate, gated file with configurable bitrate/complexity/FEC. Co-Authored-By: Claude Opus 4.8 (1M context) --- pkg/config/config.go | 22 +++ pkg/media/opus/opus_codec.go | 309 +++++++++++++++++++++++++++++ pkg/media/opus/opus_test.go | 196 ++++++++++++++++++ pkg/sip/media_codecs.go | 1 + pkg/sip/media_codecs_opus.go | 119 +++++++++++ pkg/sip/media_codecs_opus_test.go | 317 ++++++++++++++++++++++++++++++ pkg/sip/media_port_test.go | 14 +- pkg/sip/service.go | 19 ++ 8 files changed, 995 insertions(+), 2 deletions(-) create mode 100644 pkg/media/opus/opus_codec.go create mode 100644 pkg/media/opus/opus_test.go create mode 100644 pkg/sip/media_codecs_opus.go create mode 100644 pkg/sip/media_codecs_opus_test.go diff --git a/pkg/config/config.go b/pkg/config/config.go index 585b6dde2..54f0023ba 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -73,6 +73,20 @@ type TCPConfig struct { DialPort rtcconfig.PortRange `yaml:"dial_port"` } +// OpusConfig configures the Opus audio encoder for SIP media. All fields are +// optional; the zero value keeps libopus defaults. +type OpusConfig struct { + // Bitrate is the target encoder bitrate in bits per second (e.g. 24000). + // 0 leaves libopus automatic bitrate selection in place. + Bitrate int `yaml:"bitrate"` + // Complexity is the encoder computational complexity, 1-10. 0 keeps the default. + Complexity int `yaml:"complexity"` + // FEC enables in-band Forward Error Correction. + FEC bool `yaml:"fec"` + // PacketLossPercent is the expected packet loss (0-100) used to tune FEC. + PacketLossPercent int `yaml:"packet_loss_percent"` +} + type Config struct { Redis *redis.RedisConfig `yaml:"redis"` // required ApiKey string `yaml:"api_key"` // required (env LIVEKIT_API_KEY) @@ -128,6 +142,14 @@ type Config struct { EnableJitterBuffer bool `yaml:"enable_jitter_buffer"` EnableJitterBufferProb float64 `yaml:"enable_jitter_buffer_prob"` + // EnableOpus opts into offering the Opus codec for SIP media. It is + // disabled by default so existing deployments are unaffected. When enabled, + // Opus is offered and preferred over G722/G711. + EnableOpus bool `yaml:"enable_opus"` + // Opus tunes the Opus encoder used for SIP media. The zero value keeps + // libopus defaults (full-complexity VoIP). + Opus OpusConfig `yaml:"opus"` + // internal ServiceName string `yaml:"-"` NodeID string // Do not provide, will be overwritten diff --git a/pkg/media/opus/opus_codec.go b/pkg/media/opus/opus_codec.go new file mode 100644 index 000000000..e7f80d251 --- /dev/null +++ b/pkg/media/opus/opus_codec.go @@ -0,0 +1,309 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file holds the SIP-side Opus codec implementation: a configurable +// encoder (bitrate/complexity/FEC) and a decoder that adapts to the actual +// channel count of each packet. It is intentionally separate from the +// LiveKit/WebRTC encode/decode in opus.go (Decode/Encode), which other call +// paths (e.g. the room track) depend on and which must not change. Nothing here +// runs unless the SIP Opus codec is negotiated, which only happens when Opus is +// explicitly enabled. + +package opus + +import ( + "errors" + "fmt" + "sync" + + "gopkg.in/hraban/opus.v2" + + msdk "github.com/livekit/media-sdk" + "github.com/livekit/media-sdk/rtp" + "github.com/livekit/protocol/logger" +) + +/* +#cgo pkg-config: opus +#include +*/ +import "C" + +// maxOpusFrameMs is the largest frame duration a single Opus packet can +// represent. Used to size the decode buffer so over-long packets don't fail. +const maxOpusFrameMs = 120 + +// maxOpusFrameSamples returns the per-channel sample count of the largest +// possible Opus frame at the given sample rate. +func maxOpusFrameSamples(sampleRate int) int { + return sampleRate / 1000 * maxOpusFrameMs +} + +// EncodeOptions controls how the SIP Opus encoder is configured. The zero value +// keeps libopus defaults, so callers only set the fields they care about. +type EncodeOptions struct { + // Channels is the number of channels to encode (1 for mono, 2 for stereo). + // Defaults to 1 (mono) when unset. + Channels int + // Bitrate is the target bitrate in bits per second. 0 leaves the libopus + // automatic bitrate selection in place. + Bitrate int + // Complexity is the computational complexity, 1-10. 0 keeps the libopus default. + Complexity int + // FEC enables in-band Forward Error Correction. + FEC bool + // PacketLossPercent is the expected packet loss percentage (0-100) used to + // tune FEC. Only meaningful when FEC is enabled. + PacketLossPercent int +} + +func (o EncodeOptions) channels() int { + if o.Channels <= 0 { + return 1 + } + return o.Channels +} + +// DecodeWith creates a SIP Opus decoder writing PCM to w. targetChannels is the +// channel layout the downstream pipeline expects (1 or 2); the decoder detects +// each packet's actual channel count and converts to targetChannels. +func DecodeWith(w msdk.PCM16Writer, targetChannels int, log logger.Logger) (Writer, error) { + if targetChannels != 1 && targetChannels != 2 { + return nil, fmt.Errorf("unsupported channel count: %d", targetChannels) + } + log = log.WithValues("codec", "opus-sip", "targetChannels", targetChannels) + return &sipDecoder{ + w: w, + targetChannels: targetChannels, + log: log, + }, nil +} + +// EncodeWith creates a SIP Opus encoder using the provided options. +func EncodeWith(w Writer, opts EncodeOptions, log logger.Logger) (msdk.PCM16Writer, error) { + channels := opts.channels() + rate := w.SampleRate() + log = log.WithValues("codec", "opus-sip", "rate", rate, "channels", channels) + enc, err := opus.NewEncoder(rate, channels, opus.AppVoIP) + if err != nil { + log.Errorw("cannot initialize opus encoder", err) + return nil, err + } + if opts.Bitrate > 0 { + if err := enc.SetBitrate(opts.Bitrate); err != nil { + log.Errorw("cannot set opus bitrate", err, "bitrate", opts.Bitrate) + return nil, err + } + } + if opts.Complexity > 0 { + if err := enc.SetComplexity(opts.Complexity); err != nil { + log.Errorw("cannot set opus complexity", err, "complexity", opts.Complexity) + return nil, err + } + } + if opts.FEC { + if err := enc.SetInBandFEC(true); err != nil { + log.Errorw("cannot enable opus FEC", err) + return nil, err + } + if opts.PacketLossPercent > 0 { + if err := enc.SetPacketLossPerc(opts.PacketLossPercent); err != nil { + log.Errorw("cannot set opus packet loss percentage", err, "loss", opts.PacketLossPercent) + return nil, err + } + } + } + samples := rate / rtp.DefFramesPerSec + return &sipEncoder{ + w: w, + channels: channels, + enc: enc, + samples: samples, + inbuf: make(msdk.PCM16Sample, 0, samples*channels), + buf: make([]byte, 4*channels*samples), + log: log, + }, nil +} + +type sipDecoder struct { + log logger.Logger + + mu sync.Mutex + w msdk.PCM16Writer + dec *opus.Decoder + targetChannels int + lastChannels int + buf msdk.PCM16Sample + buf2 msdk.PCM16Sample + + successiveErrorCount int +} + +func (d *sipDecoder) String() string { + return fmt.Sprintf("OPUS(decode) -> %s", d.w) +} + +func (d *sipDecoder) SampleRate() int { + return d.w.SampleRate() +} + +// resetForSample inspects the incoming Opus packet and (re)creates the decoder +// to match the actual channel count of the bitstream. This lets us decode +// streams whose channel count differs from what we advertised in SDP. +func (d *sipDecoder) resetForSample(in Sample) (int, error) { + channels := int(C.opus_packet_get_nb_channels((*C.uchar)(&in[0]))) + if channels != 1 && channels != 2 { + return 0, fmt.Errorf("unexpected opus packet channel count: %d", channels) + } + if d.dec == nil || d.lastChannels != channels { + dec, err := opus.NewDecoder(d.w.SampleRate(), channels) + if err != nil { + d.log.Errorw("cannot initialize opus decoder", err, "channels", channels) + return 0, err + } + d.dec = dec + // Size for the largest possible Opus frame (120 ms) so we can decode + // packets longer than our 20 ms ptime if a peer sends them. + d.buf = make([]int16, maxOpusFrameSamples(d.w.SampleRate())*channels) + d.lastChannels = channels + } + return channels, nil +} + +func (d *sipDecoder) WriteSample(in Sample) error { + if len(in) == 0 { + return nil + } + d.mu.Lock() + defer d.mu.Unlock() + channels, err := d.resetForSample(in) + if err != nil { + return err + } + n, err := d.dec.Decode(in, d.buf) + if err != nil { + // Some workflows can cause a spurious decoding error, so ignore a small + // number of corruption errors before giving up. + if !errors.Is(err, opus.ErrInvalidPacket) || d.successiveErrorCount >= 5 { + d.log.Warnw("error decoding opus sample", err) + return err + } + d.log.Debugw("opus decoder failed decoding a sample", "error", err) + d.successiveErrorCount++ + return nil + } + d.successiveErrorCount = 0 + + // Decoded interleaved PCM; convert to the target channel layout if needed. + out := d.buf[:n*channels] + if channels < d.targetChannels { + n2 := len(out) * 2 + if len(d.buf2) < n2 { + d.buf2 = make(msdk.PCM16Sample, n2) + } + msdk.MonoToStereo(d.buf2, out) + out = d.buf2[:n2] + } else if channels > d.targetChannels { + n2 := len(out) / 2 + if len(d.buf2) < n2 { + d.buf2 = make(msdk.PCM16Sample, n2) + } + msdk.StereoToMono(d.buf2, out) + out = d.buf2[:n2] + } + return d.w.WriteSample(out) +} + +func (d *sipDecoder) Close() error { + d.mu.Lock() + defer d.mu.Unlock() + return d.w.Close() +} + +type sipEncoder struct { + log logger.Logger + channels int + samples int + + mu sync.Mutex + w Writer + enc *opus.Encoder + inbuf msdk.PCM16Sample + buf Sample + + successiveErrorCount int +} + +func (e *sipEncoder) String() string { + return fmt.Sprintf("OPUS(encode) -> %s", e.w) +} + +func (e *sipEncoder) SampleRate() int { + return e.w.SampleRate() +} + +func (e *sipEncoder) WriteSample(in msdk.PCM16Sample) error { + e.mu.Lock() + defer e.mu.Unlock() + e.inbuf = append(e.inbuf, in...) + // One Opus frame is `samples` PCM samples per channel, interleaved. + frame := e.samples * e.channels + for len(e.inbuf) >= frame { + n, err := e.enc.Encode(e.inbuf[:frame], e.buf) + + sz := copy(e.inbuf, e.inbuf[frame:]) + e.inbuf = e.inbuf[:sz] + if err != nil { + if e.successiveErrorCount < 5 { + e.log.Errorw("error encoding opus sample", err, "frame", frame, "buf", len(e.buf), "n", n) + e.successiveErrorCount++ + } + return err + } + e.successiveErrorCount = 0 + if err = e.w.WriteSample(e.buf[:n]); err != nil { + return err + } + } + return nil +} + +func (e *sipEncoder) flush() error { + if len(e.inbuf) == 0 { + return nil + } + // libopus only accepts valid frame sizes, so zero-pad the trailing partial + // buffer up to one full frame before encoding. Encoding the raw remainder + // would return OPUS_BAD_ARG. + frame := e.samples * e.channels + if len(e.inbuf) < frame { + e.inbuf = append(e.inbuf, make(msdk.PCM16Sample, frame-len(e.inbuf))...) + } + n, err := e.enc.Encode(e.inbuf[:frame], e.buf) + if err != nil { + return err + } + return e.w.WriteSample(e.buf[:n]) +} + +func (e *sipEncoder) Close() error { + e.mu.Lock() + defer e.mu.Unlock() + err1 := e.flush() + err2 := e.w.Close() + if err1 != nil { + return err1 + } + return err2 +} diff --git a/pkg/media/opus/opus_test.go b/pkg/media/opus/opus_test.go new file mode 100644 index 000000000..01989583a --- /dev/null +++ b/pkg/media/opus/opus_test.go @@ -0,0 +1,196 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opus + +import ( + "testing" + + "github.com/stretchr/testify/require" + hopus "gopkg.in/hraban/opus.v2" + + msdk "github.com/livekit/media-sdk" + "github.com/livekit/protocol/logger" +) + +// A single Opus packet longer than our 20 ms ptime (here 60 ms) must decode +// without overflowing the decode buffer. +func TestDecodeLongFrame(t *testing.T) { + const rate = 48000 + // 60 ms mono frame = 2880 samples, larger than one 20 ms frame (960). + const frameSamples = rate / 1000 * 60 + + enc, err := hopus.NewEncoder(rate, 1, hopus.AppVoIP) + require.NoError(t, err) + + pcmIn := make([]int16, frameSamples) + for i := range pcmIn { + pcmIn[i] = int16(3000 * (i % 100)) + } + pkt := make([]byte, 4000) + n, err := enc.Encode(pcmIn, pkt) + require.NoError(t, err) + pkt = pkt[:n] + + var out msdk.PCM16Sample + sink := msdk.NewPCM16BufferWriter(&out, rate) + dec, err := DecodeWith(sink, 1, logger.GetLogger()) + require.NoError(t, err) + + require.NoError(t, dec.WriteSample(pkt)) + require.Equal(t, frameSamples, len(out), "60ms packet should decode to 2880 mono samples") +} + +// Encoder options must be applied without error and produce decodable output. +func TestEncodeWithOptionsRoundTrip(t *testing.T) { + const rate = 48000 + var out msdk.PCM16Sample + sink := msdk.NewPCM16BufferWriter(&out, rate) + + dec, err := DecodeWith(sink, 1, logger.GetLogger()) + require.NoError(t, err) + enc, err := EncodeWith(dec, EncodeOptions{ + Channels: 1, + Bitrate: 16000, + Complexity: 5, + FEC: true, + PacketLossPercent: 10, + }, logger.GetLogger()) + require.NoError(t, err) + + in := make(msdk.PCM16Sample, rate/50) // one 20ms frame + for i := range in { + in[i] = int16(2000 * (i % 50)) + } + require.NoError(t, enc.WriteSample(in)) + require.NoError(t, enc.Close()) + require.NotEmpty(t, out) +} + +// An invalid channel count must be rejected by Decode. +func TestDecodeRejectsBadChannels(t *testing.T) { + var out msdk.PCM16Sample + sink := msdk.NewPCM16BufferWriter(&out, 48000) + _, err := DecodeWith(sink, 3, logger.GetLogger()) + require.Error(t, err) +} + +// sampleCollector is a WriteCloser[Sample] sink that counts encoded packets and +// total bytes, for asserting on encoder output. +type sampleCollector struct { + rate int + bytes int + packets int +} + +func (c *sampleCollector) String() string { return "collector" } +func (c *sampleCollector) SampleRate() int { return c.rate } +func (c *sampleCollector) Close() error { return nil } +func (c *sampleCollector) WriteSample(s Sample) error { + c.bytes += len(s) + c.packets++ + return nil +} + +// DTX (discontinuous transmission) silence packets must not error the decoder: +// empty "no data" frames are skipped, and the tiny packets libopus emits for +// silence decode cleanly into PCM. +func TestDTXPacketsNoError(t *testing.T) { + const rate = 48000 + const frame = rate / 50 // 20ms mono + + enc, err := hopus.NewEncoder(rate, 1, hopus.AppVoIP) + require.NoError(t, err) + require.NoError(t, enc.SetDTX(true)) + + var out msdk.PCM16Sample + sink := msdk.NewPCM16BufferWriter(&out, rate) + dec, err := DecodeWith(sink, 1, logger.GetLogger()) + require.NoError(t, err) + + // An explicit "no data" gap (e.g. DTX with nothing to send) is a no-op. + require.NoError(t, dec.WriteSample(Sample(nil))) + require.NoError(t, dec.WriteSample(Sample{})) + require.Empty(t, out, "no-data frames must not produce PCM") + + // Encode many frames of silence with DTX on; libopus emits short (1-2 byte) + // DTX packets once it detects sustained silence. Each must decode cleanly. + silence := make([]int16, frame) + buf := make([]byte, 4000) + sawDTX := false + decodedFrames := 0 + for i := 0; i < 50; i++ { + n, err := enc.Encode(silence, buf) + require.NoError(t, err) + if n > 0 && n <= 2 { + sawDTX = true // tiny packet == DTX/comfort-noise frame + } + before := len(out) + require.NoError(t, dec.WriteSample(Sample(buf[:n]))) + if len(out) > before { + decodedFrames++ + } + } + require.True(t, sawDTX, "DTX should have produced at least one tiny packet") + require.Positive(t, decodedFrames, "DTX/silence packets should decode to PCM") +} + +// Enabling in-band FEC must take effect end-to-end: it changes the encoded +// bitstream (vs FEC off) and every packet still decodes without being dropped. +func TestFECPassthrough(t *testing.T) { + const rate = 48000 + const frame = rate / 50 // 20ms mono + const frames = 20 + + in := make(msdk.PCM16Sample, frame) + for i := range in { + in[i] = int16(4000 * (i % 80)) + } + + encodeAll := func(t *testing.T, opts EncodeOptions) *sampleCollector { + t.Helper() + c := &sampleCollector{rate: rate} + enc, err := EncodeWith(c, opts, logger.GetLogger()) + require.NoError(t, err) + for i := 0; i < frames; i++ { + require.NoError(t, enc.WriteSample(in)) + } + require.NoError(t, enc.Close()) + return c + } + + // VBR (no fixed bitrate). Enabling FEC + packet-loss tuning makes libopus + // re-budget the bitstream (redundant LBRR data, altered frame sizes), so the + // encoded output must differ from the FEC-off stream — proving our FEC + // option actually reaches the encoder. + off := encodeAll(t, EncodeOptions{Channels: 1}) + on := encodeAll(t, EncodeOptions{Channels: 1, FEC: true, PacketLossPercent: 30}) + + require.Equal(t, frames, off.packets, "no packets dropped without FEC") + require.Equal(t, frames, on.packets, "no packets dropped with FEC") + require.NotEqual(t, off.bytes, on.bytes, "enabling in-band FEC must change the encoded bitstream") + + // And the FEC stream decodes fully (passthrough, no drops). + var out msdk.PCM16Sample + sink := msdk.NewPCM16BufferWriter(&out, rate) + dec, err := DecodeWith(sink, 1, logger.GetLogger()) + require.NoError(t, err) + enc, err := EncodeWith(dec, EncodeOptions{Channels: 1, FEC: true, PacketLossPercent: 30}, logger.GetLogger()) + require.NoError(t, err) + for i := 0; i < frames; i++ { + require.NoError(t, enc.WriteSample(in)) + } + require.NoError(t, enc.Close()) + require.Equal(t, frames*frame, len(out)) +} diff --git a/pkg/sip/media_codecs.go b/pkg/sip/media_codecs.go index 81b8b28fe..1832c421e 100644 --- a/pkg/sip/media_codecs.go +++ b/pkg/sip/media_codecs.go @@ -39,6 +39,7 @@ func init() { g711.ULawSDPName: true, g722.SDPName: true, amrwb.SDPName: false, // optional + OpusSDPName: false, // opt-in via enable_opus config flag (see SetOpusEnabled) dtmf.SDPName: true, }) } diff --git a/pkg/sip/media_codecs_opus.go b/pkg/sip/media_codecs_opus.go new file mode 100644 index 000000000..345b0f5d4 --- /dev/null +++ b/pkg/sip/media_codecs_opus.go @@ -0,0 +1,119 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sip + +import ( + "sync/atomic" + + msdk "github.com/livekit/media-sdk" + "github.com/livekit/protocol/logger" + + "github.com/livekit/sip/pkg/media/opus" +) + +const ( + // OpusSDPName is the rtpmap encoding name advertised for Opus. Per RFC 7587 + // the channel field MUST be 2 even when the actual media is mono. + OpusSDPName = "opus/48000/2" + // OpusSampleRate is the (and only sensible) RTP clock rate for Opus. + OpusSampleRate = 48000 + // opusChannels is the number of channels we encode/decode internally. SIP + // telephony is mono; we still advertise opus/48000/2 per RFC 7587 and the + // decoder adapts to whatever channel count peers actually send. + opusChannels = 1 + // opusPriority makes Opus the preferred codec during priority-based codec + // selection (higher than AMR-WB -4, G722 -5, PCMU -10, PCMA -20). + opusPriority = 10 +) + +// opusOptions holds the live encoder configuration. It is read when each call's +// encoder is created, so SetOpusOptions can be called after the codec is +// registered (e.g. once the YAML config has been parsed). +var opusOptions atomic.Pointer[opus.EncodeOptions] + +// SetOpusOptions updates the Opus encoder options used for new calls. The +// channel count is always forced to mono for SIP media. +func SetOpusOptions(opts opus.EncodeOptions) { + opts.Channels = opusChannels + opusOptions.Store(&opts) +} + +func currentOpusOptions() opus.EncodeOptions { + if o := opusOptions.Load(); o != nil { + return *o + } + return opus.EncodeOptions{Channels: opusChannels} +} + +func init() { + msdk.RegisterCodec(msdk.NewAudioCodec(msdk.CodecInfo{ + SDPName: OpusSDPName, + SampleRate: OpusSampleRate, + RTPClockRate: OpusSampleRate, + RTPIsStatic: false, // dynamic payload type, assigned during SDP negotiation + Priority: opusPriority, + Disabled: true, // opt-in only; enabled via the enable_opus config flag + FileExt: "opus", + }, opusDecode, opusEncode)) +} + +// SetOpusEnabled toggles Opus end-to-end, making the enable_opus config flag +// authoritative. It must update both: +// - defaultCodecs: the per-call base set used to build SDP offers and answers +// (see codecSet in media_codecs.go), and +// - the media-sdk GlobalCodecs set, used by any global/deprecated code path. +// +// When disabled, Opus appears in neither, so it is never offered or selected. +func SetOpusEnabled(enabled bool) { + defaultCodecs.SetEnabled(OpusSDPName, enabled) + msdk.CodecSetEnabled(OpusSDPName, enabled) +} + +func opusDecode(w msdk.PCM16Writer) msdk.WriteCloser[opus.Sample] { + dec, err := opus.DecodeWith(w, opusChannels, logger.GetLogger()) + if err != nil { + logger.GetLogger().Errorw("cannot create opus decoder", err) + return discardSampleWriter{w: w} + } + return dec +} + +func opusEncode(w msdk.WriteCloser[opus.Sample]) msdk.PCM16Writer { + enc, err := opus.EncodeWith(w, currentOpusOptions(), logger.GetLogger()) + if err != nil { + logger.GetLogger().Errorw("cannot create opus encoder", err) + return discardPCMWriter{w: w} + } + return enc +} + +// discardSampleWriter / discardPCMWriter keep the media pipeline alive if codec +// initialization unexpectedly fails (already logged at error level). They drop +// samples and forward Close to the underlying writer so resources are released. +// Initialization only fails on invalid encoder options (bitrate/complexity out +// of range); with mono/48kHz and validated options this should not happen. +type discardSampleWriter struct{ w msdk.PCM16Writer } + +func (d discardSampleWriter) String() string { return "OPUS(decode-discard)" } +func (d discardSampleWriter) SampleRate() int { return d.w.SampleRate() } +func (d discardSampleWriter) WriteSample(opus.Sample) error { return nil } +func (d discardSampleWriter) Close() error { return d.w.Close() } + +type discardPCMWriter struct{ w msdk.WriteCloser[opus.Sample] } + +func (d discardPCMWriter) String() string { return "OPUS(encode-discard)" } +func (d discardPCMWriter) SampleRate() int { return d.w.SampleRate() } +func (d discardPCMWriter) WriteSample(msdk.PCM16Sample) error { return nil } +func (d discardPCMWriter) Close() error { return d.w.Close() } diff --git a/pkg/sip/media_codecs_opus_test.go b/pkg/sip/media_codecs_opus_test.go new file mode 100644 index 000000000..221e0aafd --- /dev/null +++ b/pkg/sip/media_codecs_opus_test.go @@ -0,0 +1,317 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sip + +import ( + "net/netip" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + msdk "github.com/livekit/media-sdk" + "github.com/livekit/media-sdk/amrwb" + "github.com/livekit/media-sdk/dtmf" + "github.com/livekit/media-sdk/g711" + "github.com/livekit/media-sdk/g722" + "github.com/livekit/media-sdk/sdp" + "github.com/livekit/protocol/logger" + + "github.com/livekit/sip/pkg/audiotest" + "github.com/livekit/sip/pkg/media/opus" +) + +// enableOpusForTest turns Opus on for the duration of a test and restores the +// default (disabled) state afterwards, keeping global codec state isolated. +func enableOpusForTest(t *testing.T) { + t.Helper() + SetOpusEnabled(true) + t.Cleanup(func() { SetOpusEnabled(false) }) +} + +// Opus must be registered as an AudioCodec, use a dynamic payload type, run at +// its native 48 kHz clock, and become available once enabled. +func TestOpusRegisteredAndEnabled(t *testing.T) { + enableOpusForTest(t) + + c := sdp.CodecByNameWith(defaultCodecs, OpusSDPName) + require.NotNil(t, c, "opus codec should be available in defaultCodecs once enabled") + + _, ok := c.(msdk.AudioCodec) + require.True(t, ok, "opus codec should implement AudioCodec") + + info := c.Info() + require.Equal(t, OpusSDPName, info.SDPName) + require.Equal(t, OpusSampleRate, info.SampleRate) + require.Equal(t, OpusSampleRate, info.RTPClockRate) + require.False(t, info.RTPIsStatic, "opus must use a dynamic payload type") +} + +// The generated SDP offer must advertise opus/48000/2 (RFC 7587) with a dynamic +// payload type in the dynamic range. +func TestOpusInSDPOffer(t *testing.T) { + enableOpusForTest(t) + + _, md, err := sdp.OfferMediaWith(defaultCodecs, 12345, sdp.EncryptionNone) + require.NoError(t, err) + + var rtpmap string + for _, a := range md.Attributes { + if a.Key == "rtpmap" && strings.Contains(strings.ToLower(a.Value), OpusSDPName) { + rtpmap = a.Value + } + } + require.NotEmpty(t, rtpmap, "SDP offer should advertise opus/48000/2") + + // rtpmap value looks like " opus/48000/2"; the payload type must be dynamic (96-127). + pt := strings.SplitN(rtpmap, " ", 2)[0] + require.Contains(t, md.MediaName.Formats, pt, "opus payload type must appear in m= formats") +} + +// When a peer offers Opus alongside lower-priority codecs, we must select Opus +// and honor the payload type the peer assigned (e.g. 111). +func TestOpusPreferredInSelection(t *testing.T) { + enableOpusForTest(t) + + opusC := sdp.CodecByNameWith(defaultCodecs, OpusSDPName).(msdk.AudioCodec) + ulawC := sdp.CodecByNameWith(defaultCodecs, g711.ULawSDPName).(msdk.AudioCodec) + g722C := sdp.CodecByNameWith(defaultCodecs, g722.SDPName).(msdk.AudioCodec) + require.NotNil(t, opusC) + require.NotNil(t, ulawC) + require.NotNil(t, g722C) + + desc := sdp.MediaDesc{ + Codecs: []sdp.CodecInfo{ + {Type: 0, Codec: ulawC}, + {Type: 9, Codec: g722C}, + {Type: 111, Codec: opusC}, + }, + } + got, err := sdp.SelectAudio(desc, false) + require.NoError(t, err) + require.Equal(t, OpusSDPName, got.Codec.Info().SDPName, "opus should win priority-based selection") + require.Equal(t, byte(111), got.Type, "peer's opus payload type must be honored") +} + +// A PCM signal encoded to Opus and decoded back should round-trip to a similar +// length of non-silent PCM, exercising encoder options and the dynamic-channel +// decoder. enc -> opus.Sample -> dec -> PCM16. +func TestOpusEncodeDecodeRoundTrip(t *testing.T) { + log := logger.GetLogger() + + var out msdk.PCM16Sample + sink := msdk.NewPCM16BufferWriter(&out, OpusSampleRate) + + dec, err := opus.Decode(sink, opusChannels, log) + require.NoError(t, err) + enc, err := opus.EncodeWith(dec, opus.EncodeOptions{ + Channels: 1, + Bitrate: 24000, + Complexity: 10, + FEC: true, + PacketLossPercent: 5, + }, log) + require.NoError(t, err) + + // 200 ms of 48 kHz mono = 9600 samples = exactly 10 Opus frames of 20 ms. + const n = OpusSampleRate / 5 + in := make(msdk.PCM16Sample, n) + audiotest.GenSignal(in, []audiotest.Wave{{Ind: 5, Amp: 8000}}) + + require.NoError(t, enc.WriteSample(in)) + require.NoError(t, enc.Close()) + + require.NotEmpty(t, out, "decoded PCM should not be empty") + // Each Opus frame decodes back to 960 samples; allow generous tolerance for + // codec delay/framing. + require.InDelta(t, n, len(out), float64(n)/2) + + // Output must carry actual signal energy, not silence. + var energy int64 + for _, s := range out { + energy += int64(s) * int64(s) + } + require.Greater(t, energy, int64(0), "decoded audio should contain signal energy") +} + +// SetOpusOptions always forces mono, regardless of the channel count requested. +func TestSetOpusOptionsForcesMono(t *testing.T) { + t.Cleanup(func() { opusOptions.Store(nil) }) + SetOpusOptions(opus.EncodeOptions{Channels: 2, Bitrate: 32000}) + got := currentOpusOptions() + require.Equal(t, opusChannels, got.Channels) + require.Equal(t, 32000, got.Bitrate) +} + +// offerRtpmaps returns the rtpmap attribute values of an SDP offer built from +// the given codec set. +func offerRtpmaps(t *testing.T, codecs *msdk.CodecSet) []string { + t.Helper() + _, md, err := sdp.OfferMediaWith(codecs, 12345, sdp.EncryptionNone) + require.NoError(t, err) + var out []string + for _, a := range md.Attributes { + if a.Key == "rtpmap" { + out = append(out, strings.ToLower(a.Value)) + } + } + return out +} + +// With enable_opus unset (the default), Opus must not appear in SDP offers. +func TestOpusDisabledByDefault(t *testing.T) { + // Be robust against test ordering: explicitly assert the default-off state. + SetOpusEnabled(false) + + for _, m := range offerRtpmaps(t, defaultCodecs) { + require.NotContains(t, m, "opus", "Opus must not be offered when disabled") + } + + // Selecting from our offered set must yield a non-Opus codec. + desc, _, err := sdp.OfferMediaWith(defaultCodecs, 12345, sdp.EncryptionNone) + require.NoError(t, err) + audio, err := sdp.SelectAudio(desc, false) + require.NoError(t, err) + require.NotEqual(t, OpusSDPName, audio.Codec.Info().SDPName) +} + +// answerCodecFor simulates a peer that offers only the given codec set and +// returns the codec our side negotiates in the SDP answer. +func answerCodecFor(t *testing.T, callerCodecs, ourCodecs *msdk.CodecSet) (*sdp.AudioConfig, error) { + t.Helper() + ip := netip.MustParseAddr("1.2.3.4") + offer, err := sdp.NewOfferWith(callerCodecs, ip, 10000, sdp.EncryptionNone) + require.NoError(t, err) + data, err := offer.SDP.Marshal() + require.NoError(t, err) + + parsed, err := sdp.ParseOfferWith(ourCodecs, data) + require.NoError(t, err) + _, mc, err := parsed.Answer(ip, 20000, sdp.EncryptionNone) + if err != nil { + return nil, err + } + return &mc.Audio, nil +} + +// When Opus is disabled, a caller that offers BOTH Opus and PCMU must be +// answered with PCMU — i.e. we actively exclude Opus from selection, not merely +// happen to pick PCMU because nothing else was offered. +func TestG711FallbackWhenOpusDisabled(t *testing.T) { + SetOpusEnabled(false) + + caller := msdk.NewCodecSet() + caller.SetEnabled(g711.ULawSDPName, true) + caller.SetEnabled(OpusSDPName, true) // caller advertises Opus... + + audio, err := answerCodecFor(t, caller, defaultCodecs) + require.NoError(t, err) + // ...but we must not select it while disabled. + require.Equal(t, g711.ULawSDPName, audio.Codec.Info().SDPName) +} + +// With Opus enabled on our side, a caller offering only PCMU must fall back to +// G.711 cleanly — we never select a codec the peer did not offer. A positive +// control confirms the negotiation is real (Opus is chosen when offered). +func TestG711FallbackWhenCallerNoOpus(t *testing.T) { + enableOpusForTest(t) // Opus enabled on our side + + // Positive control: when the caller offers Opus, we negotiate Opus. + withOpus := msdk.NewCodecSet() + withOpus.SetEnabled(g711.ULawSDPName, true) + withOpus.SetEnabled(OpusSDPName, true) + audio, err := answerCodecFor(t, withOpus, defaultCodecs) + require.NoError(t, err) + require.Equal(t, OpusSDPName, audio.Codec.Info().SDPName, "sanity: Opus is negotiated when the caller offers it") + + // A PCMU-only caller must fall back to PCMU, not a phantom Opus selection. + ulawOnly := msdk.NewCodecSet() + ulawOnly.SetEnabled(g711.ULawSDPName, true) + audio, err = answerCodecFor(t, ulawOnly, defaultCodecs) + require.NoError(t, err) + require.Equal(t, g711.ULawSDPName, audio.Codec.Info().SDPName) +} + +// callerOffering builds a single-codec caller set for scenario tests. +func callerOffering(names ...string) *msdk.CodecSet { + s := msdk.NewCodecSet() + for _, n := range names { + s.SetEnabled(n, true) + } + return s +} + +// Existing-call regression scenarios: with Opus DISABLED (the default), each +// legacy codec must negotiate exactly as before — correct codec + static +// payload type — and Opus must never be selected. +func TestLegacyCodecScenariosOpusDisabled(t *testing.T) { + SetOpusEnabled(false) + + cases := []struct { + name string + offer *msdk.CodecSet + wantSDP string + wantPT byte // RFC 3551 static payload type + }{ + {"PCMU inbound", callerOffering(g711.ULawSDPName), g711.ULawSDPName, 0}, + {"PCMA inbound", callerOffering(g711.ALawSDPName), g711.ALawSDPName, 8}, + {"G722 inbound", callerOffering(g722.SDPName), g722.SDPName, 9}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + audio, err := answerCodecFor(t, c.offer, defaultCodecs) + require.NoError(t, err, "negotiation must succeed") + require.Equal(t, c.wantSDP, audio.Codec.Info().SDPName, "correct codec selected") + require.Equal(t, c.wantPT, audio.Type, "correct static payload type") + require.NotEqual(t, OpusSDPName, audio.Codec.Info().SDPName, "Opus must not be touched") + }) + } +} + +// DTMF (telephone-event) must still be negotiated alongside a voice codec. +func TestDTMFNegotiatedOpusDisabled(t *testing.T) { + SetOpusEnabled(false) + + caller := callerOffering(g711.ULawSDPName, dtmf.SDPName) + audio, err := answerCodecFor(t, caller, defaultCodecs) + require.NoError(t, err) + require.Equal(t, g711.ULawSDPName, audio.Codec.Info().SDPName) + require.NotZero(t, audio.DTMFType, "DTMF (telephone-event) must be negotiated") +} + +// Outbound: with Opus disabled, our SDP offer must advertise the legacy codecs +// and must NOT include Opus. +func TestOutboundOfferExcludesOpusWhenDisabled(t *testing.T) { + SetOpusEnabled(false) + + maps := offerRtpmaps(t, defaultCodecs) + joined := strings.Join(maps, "\n") + require.NotContains(t, joined, "opus", "offer must not include Opus when disabled") + require.Contains(t, joined, strings.ToLower(g711.ULawSDPName), "offer must still include PCMU") + require.Contains(t, joined, strings.ToLower(g711.ALawSDPName), "offer must still include PCMA") + require.Contains(t, joined, strings.ToLower(g722.SDPName), "offer must still include G722") +} + +// A call whose only offered codec we do not support must fail gracefully +// (an error, not a panic) — same behavior as before Opus existed. +func TestNoCommonCodecFailsGracefully(t *testing.T) { + SetOpusEnabled(false) + + // AMR-WB is disabled by default; a caller offering only AMR-WB shares no + // codec with us. + caller := callerOffering(amrwb.SDPName) + _, err := answerCodecFor(t, caller, defaultCodecs) + require.Error(t, err, "no common codec must return an error, not panic") +} diff --git a/pkg/sip/media_port_test.go b/pkg/sip/media_port_test.go index f8d5823cf..d524bd88f 100644 --- a/pkg/sip/media_port_test.go +++ b/pkg/sip/media_port_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" msdk "github.com/livekit/media-sdk" + "github.com/livekit/media-sdk/g722" "github.com/livekit/media-sdk/rtp" "github.com/livekit/media-sdk/sdp" "github.com/livekit/mediatransportutil/pkg/rtcconfig" @@ -184,7 +185,8 @@ func TestMediaPort(t *testing.T) { codecs := msdk.NewCodecSet() codecs.SetEnabled(info.SDPName, true) - sub := strings.SplitN(info.SDPName, "/", 2) + // SDP names are "name/rate" or, for Opus, "name/rate/channels". + sub := strings.Split(info.SDPName, "/") codecName := sub[0] nativeRateSDP, err := strconv.Atoi(sub[1]) nativeRate := nativeRateSDP @@ -192,6 +194,11 @@ func TestMediaPort(t *testing.T) { switch codecName { case "telephone-event": t.SkipNow() + case "opus": + // Opus is a lossy 48kHz codec; the strict waveform-fidelity and + // pipeline-string assertions in this generic harness don't apply to + // it. Opus has dedicated coverage in media_codecs_opus_test.go. + t.SkipNow() case "G722": nativeRate *= 2 // error in RFC } @@ -454,7 +461,10 @@ func newMediaPairWithAddr(t testing.TB, ip1, ip2 netip.Addr, opt1, opt2 *MediaOp } c1, c2 := newUDPPipe() - codecs := defaultCodecs + // Pin G722 so this RTP symmetry/timeout helper stays deterministic and + // independent of the default codec preference (Opus is preferred by default). + codecs := msdk.NewCodecSet() + codecs.SetEnabled(g722.SDPName, true) opt1.IP = ip1 opt1.Ports = rtcconfig.PortRange{Start: 10000} diff --git a/pkg/sip/service.go b/pkg/sip/service.go index d4849ea58..0a43de861 100644 --- a/pkg/sip/service.go +++ b/pkg/sip/service.go @@ -43,6 +43,7 @@ import ( "github.com/livekit/sip/pkg/config" siperrors "github.com/livekit/sip/pkg/errors" + "github.com/livekit/sip/pkg/media/opus" "github.com/livekit/sip/pkg/stats" "github.com/livekit/sip/version" ) @@ -210,6 +211,24 @@ func (s *Service) Start() error { } msdk.CodecsSetEnabled(s.conf.Codecs) + // enable_opus is authoritative: apply it after the generic codecs map so it + // wins over any opus entry there, and update both the per-call and global + // codec sets. + SetOpusEnabled(s.conf.EnableOpus) + if s.conf.EnableOpus { + SetOpusOptions(opus.EncodeOptions{ + Bitrate: s.conf.Opus.Bitrate, + Complexity: s.conf.Opus.Complexity, + FEC: s.conf.Opus.FEC, + PacketLossPercent: s.conf.Opus.PacketLossPercent, + }) + s.log.Infow("opus codec enabled", + "bitrate", s.conf.Opus.Bitrate, + "complexity", s.conf.Opus.Complexity, + "fec", s.conf.Opus.FEC, + ) + } + if err := s.mon.Start(s.conf); err != nil { return err } From 7e430e5a489d73aab930f6dfa5c1a4dab1fd80bc Mon Sep 17 00:00:00 2001 From: "kanimuthumaran.t" Date: Mon, 8 Jun 2026 16:53:38 +0530 Subject: [PATCH 2/4] Add Opus SIP docs and negotiation/RTP tests Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 37 ++++++++ pkg/sip/media_codecs_opus.go | 13 +++ pkg/sip/media_codecs_opus_test.go | 147 ++++++++++++++++++++++++++++++ pkg/sip/media_port_test.go | 88 ++++++++++++++++++ 4 files changed, 285 insertions(+) diff --git a/README.md b/README.md index faed8dfa5..daf2d258a 100644 --- a/README.md +++ b/README.md @@ -62,10 +62,47 @@ prometheus_port: port used to collect prometheus metrics. Used for autoscaling log_level: debug, info, warn, or error (default info) sip_port: port to listen and send SIP traffic (default 5060) rtp_port: port to listen and send RTP traffic (default 10000-20000) +enable_opus: offer the Opus codec for SIP media (default false, experimental) +opus: # optional Opus encoder tuning, only used when enable_opus is true + bitrate: target bitrate in bits/sec, e.g. 24000 (0 = libopus default) + complexity: encoder complexity 1-10 (0 = libopus default) + fec: enable in-band Forward Error Correction (default false) + packet_loss_percent: expected packet loss 0-100, tunes FEC (default 0) ``` The config file can be added to a mounted volume with its location passed in the SIP_CONFIG_FILE env var, or its body can be passed in the SIP_CONFIG_BODY env var. +#### Codecs + +PCMU, PCMA, G722, and DTMF are negotiated by default. Opus support is **disabled +by default and should be enabled only after validating interoperability with +your SIP infrastructure**: set `enable_opus: true` to offer it. + +- Opus is advertised as `opus/48000/2` (48 kHz, RFC 7587 requires the channel + field to be `2`), but SIP media is encoded/decoded internally as **mono** — + appropriate for telephony, and the decoder adapts to whatever channel count a + peer actually sends. +- When enabled, Opus is **preferred**; SIP peers that don't support it fall back + transparently to G722, then G711 (PCMU/PCMA). When disabled, Opus never + appears in SDP offers and the existing PCMU/PCMA/G722/DTMF behavior is + unchanged. +- The LiveKit/WebRTC side always uses Opus and is unaffected by this setting. + +Current limitations to be aware of before a GA rollout: + +- **Live interoperability validation is required** with your SIP stack + (e.g. FreeSWITCH, Asterisk, and any cloud SIP trunk) before relying on Opus in + production. +- **No mid-call codec renegotiation:** the negotiated codec is fixed for the + duration of a call; re-INVITEs are treated as keep-alives that replay the + original SDP. +- **No Opus `fmtp` attributes** (e.g. `useinbandfec`, `minptime`, `stereo`) are + signaled yet; peers use RFC 7587 defaults. +- **No PLC/FEC loss recovery for local packet loss:** missing/suppressed frames + are filled with silence (the same treatment as the other codecs). +- **Fallback:** when Opus is not negotiated, calls use G722 or G711 exactly as + before. + ### Using the SIP service #### Creating Bridge and Dispatch Rule diff --git a/pkg/sip/media_codecs_opus.go b/pkg/sip/media_codecs_opus.go index 345b0f5d4..b7782b1f9 100644 --- a/pkg/sip/media_codecs_opus.go +++ b/pkg/sip/media_codecs_opus.go @@ -57,6 +57,14 @@ func currentOpusOptions() opus.EncodeOptions { return opus.EncodeOptions{Channels: opusChannels} } +// TODO(opus, experimental): before marking Opus GA: +// - Advertise Opus fmtp params for better SBC interop, e.g. +// "a=fmtp: useinbandfec=1;minptime=10;stereo=0". media-sdk's sdp package +// currently emits/parses only rtpmap (+DTMF fmtp), so this needs support +// there or a local post-processing step on the generated SDP. +// - Validate live interop with FreeSWITCH, Asterisk, and at least one cloud +// SIP trunk (Twilio/Telnyx), covering Opus offer/answer, hold/resume, and +// transfer. Unit tests cover negotiation/RTP but not real endpoints. func init() { msdk.RegisterCodec(msdk.NewAudioCodec(msdk.CodecInfo{ SDPName: OpusSDPName, @@ -76,6 +84,11 @@ func init() { // - the media-sdk GlobalCodecs set, used by any global/deprecated code path. // // When disabled, Opus appears in neither, so it is never offered or selected. +// +// TODO(opus): defaultCodecs is a plain map with no mutex. This is safe today +// because SetOpusEnabled is called once during Service.Start, before any call +// is accepted. If config hot-reload is ever added, mutating defaultCodecs while +// calls read it concurrently would be a data race; guard it then. func SetOpusEnabled(enabled bool) { defaultCodecs.SetEnabled(OpusSDPName, enabled) msdk.CodecSetEnabled(OpusSDPName, enabled) diff --git a/pkg/sip/media_codecs_opus_test.go b/pkg/sip/media_codecs_opus_test.go index 221e0aafd..9b8ebc015 100644 --- a/pkg/sip/media_codecs_opus_test.go +++ b/pkg/sip/media_codecs_opus_test.go @@ -15,6 +15,7 @@ package sip import ( + "math" "net/netip" "strings" "testing" @@ -26,6 +27,7 @@ import ( "github.com/livekit/media-sdk/dtmf" "github.com/livekit/media-sdk/g711" "github.com/livekit/media-sdk/g722" + "github.com/livekit/media-sdk/rtp" "github.com/livekit/media-sdk/sdp" "github.com/livekit/protocol/logger" @@ -315,3 +317,148 @@ func TestNoCommonCodecFailsGracefully(t *testing.T) { _, err := answerCodecFor(t, caller, defaultCodecs) require.Error(t, err, "no common codec must return an error, not panic") } + +// An Opus-only offer must be accepted with Opus when Opus is enabled. +func TestOpusOnlyOfferSelectedWhenEnabled(t *testing.T) { + enableOpusForTest(t) + + caller := callerOffering(OpusSDPName) + audio, err := answerCodecFor(t, caller, defaultCodecs) + require.NoError(t, err) + require.Equal(t, OpusSDPName, audio.Codec.Info().SDPName) +} + +// An Opus-only offer must be rejected gracefully when Opus is disabled (no +// common codec) — never crash, never silently pick Opus. +func TestOpusOnlyOfferRejectedWhenDisabled(t *testing.T) { + SetOpusEnabled(false) + + caller := callerOffering(OpusSDPName) + _, err := answerCodecFor(t, caller, defaultCodecs) + require.Error(t, err, "Opus-only offer must be rejected when Opus is disabled") +} + +// Our SDP answer must reuse the payload type the peer assigned to Opus (which +// may be anything in the dynamic range, e.g. 111), not a fixed value. +func TestOpusAnswerEchoesPeerPayloadType(t *testing.T) { + enableOpusForTest(t) + + const peerOpusPT = 111 + // Minimal inbound offer with Opus at PT 111 and PCMU at 0. + offerSDP := "v=0\r\n" + + "o=- 1 1 IN IP4 1.2.3.4\r\n" + + "s=-\r\n" + + "c=IN IP4 1.2.3.4\r\n" + + "t=0 0\r\n" + + "m=audio 5004 RTP/AVP 111 0\r\n" + + "a=rtpmap:111 opus/48000/2\r\n" + + "a=rtpmap:0 PCMU/8000\r\n" + + "a=sendrecv\r\n" + + offer, err := sdp.ParseOfferWith(defaultCodecs, []byte(offerSDP)) + require.NoError(t, err) + + ip := netip.MustParseAddr("1.2.3.4") + answer, mc, err := offer.Answer(ip, 20000, sdp.EncryptionNone) + require.NoError(t, err) + + // Opus selected, and the negotiated type echoes the peer's PT. + require.Equal(t, OpusSDPName, mc.Audio.Codec.Info().SDPName) + require.Equal(t, byte(peerOpusPT), mc.Audio.Type) + + // The emitted answer SDP must carry the same rtpmap PT. + answerBytes, err := answer.SDP.Marshal() + require.NoError(t, err) + require.Contains(t, strings.ToLower(string(answerBytes)), "rtpmap:111 opus/48000/2") +} + +// Our outbound Opus offer must use a dynamic payload type (RFC 3551: 96-127). +func TestOpusOfferIsDynamicPayloadType(t *testing.T) { + enableOpusForTest(t) + + desc, _, err := sdp.OfferMediaWith(defaultCodecs, 12345, sdp.EncryptionNone) + require.NoError(t, err) + + var found bool + for _, c := range desc.Codecs { + if c.Codec != nil && c.Codec.Info().SDPName == OpusSDPName { + found = true + require.GreaterOrEqual(t, c.Type, byte(96), "Opus PT must be in the dynamic range") + require.LessOrEqual(t, c.Type, byte(127), "Opus PT must be in the dynamic range") + } + } + require.True(t, found, "Opus must be present in the offer when enabled") +} + +// captureWriter is a WriteCloser[opus.Sample] sink used to capture encoded Opus +// packets in tests. +type captureWriter struct { + rate int + onWrite func(opus.Sample) +} + +func (c *captureWriter) String() string { return "capture" } +func (c *captureWriter) SampleRate() int { return c.rate } +func (c *captureWriter) Close() error { return nil } +func (c *captureWriter) WriteSample(s opus.Sample) error { + c.onWrite(s) + return nil +} + +// A timestamp gap with no sequence gap (DTX / silence suppression) must be +// filled with silence, and the decoder must recover and decode the next packet. +// Note: true packet *loss* (a sequence-number gap) is not concealed today — the +// silence filler only fills suppressed-silence timestamp gaps. +func TestOpusPacketLossSilenceFill(t *testing.T) { + enableOpusForTest(t) + + const ( + rate = 48000 + frameLen = rate / 50 // 960 samples = 20ms + ) + opusCodec, ok := sdp.CodecByNameWith(defaultCodecs, OpusSDPName).(msdk.AudioCodec) + require.True(t, ok, "opus must be an AudioCodec when enabled") + + // Produce one real Opus packet by encoding a single frame and capturing it. + var pkt opus.Sample + sink := &captureWriter{rate: rate, onWrite: func(s opus.Sample) { + if pkt == nil { + pkt = append(opus.Sample(nil), s...) + } + }} + enc, err := opus.EncodeWith(sink, opus.EncodeOptions{Channels: 1}, logger.GetLogger()) + require.NoError(t, err) + sig := make(msdk.PCM16Sample, frameLen) + for i := range sig { + sig[i] = int16(8000 * math.Sin(2*math.Pi*10*float64(i)/float64(frameLen))) + } + require.NoError(t, enc.WriteSample(sig)) + require.NoError(t, enc.Close()) + require.NotEmpty(t, pkt, "should have captured one Opus packet") + + // Build the decode chain with the silence filler, as MediaPort does. + var out msdk.PCM16Sample + pcm := msdk.NewPCM16BufferWriter(&out, rate) + dec := rtp.DecodePCM(pcm, opusCodec, 111) + filler := newSilenceFiller(dec, pcm, rate, rate, logger.GetLogger()) + + // Packet 1 decodes to one frame. + require.NoError(t, filler.HandleRTP(&rtp.Header{SequenceNumber: 1, Timestamp: 0}, pkt)) + require.Equal(t, frameLen, len(out), "first packet decodes to one frame") + + // Packet 2: sequential seq, but timestamp jumps by two frames (one frame of + // silence suppressed). The filler inserts one silence frame, then decodes. + require.NoError(t, filler.HandleRTP(&rtp.Header{SequenceNumber: 2, Timestamp: uint32(2 * frameLen)}, pkt)) + require.Equal(t, 3*frameLen, len(out), "one silence frame + one decoded frame appended") + + // The inserted gap is silence; the decoded frames carry signal energy. + var silenceEnergy, signalEnergy int64 + for _, s := range out[frameLen : 2*frameLen] { + silenceEnergy += int64(s) * int64(s) + } + for _, s := range out[2*frameLen:] { + signalEnergy += int64(s) * int64(s) + } + require.Zero(t, silenceEnergy, "gap must be filled with silence") + require.Positive(t, signalEnergy, "decoder must recover after silence fill") +} diff --git a/pkg/sip/media_port_test.go b/pkg/sip/media_port_test.go index d524bd88f..4dde885bf 100644 --- a/pkg/sip/media_port_test.go +++ b/pkg/sip/media_port_test.go @@ -760,3 +760,91 @@ func TestSymmetricRTP(t *testing.T) { require.Equal(t, newAddr.String(), curDstPtr.String()) }) } + +// TestMediaPortOpusRoundTrip drives Opus through the full MediaPort RTP path +// (PCM -> encode -> SeqWriter/RTP -> UDP pipe -> depacketize -> decode -> PCM). +// Opus is lossy, so we validate timestamp cadence, sample count, non-silence, +// and absence of errors rather than exact waveforms. +func TestMediaPortOpusRoundTrip(t *testing.T) { + enableOpusForTest(t) + + codecs := msdk.NewCodecSet() + codecs.SetEnabled(OpusSDPName, true) + + c1, c2 := newUDPPipe() + log := logger.GetLogger() + const ( + ip1 = "1.1.1.1" + ip2 = "2.2.2.2" + port1 = 10000 + port2 = 20000 + rate = RoomSampleRate // 48000, Opus's native rate + ) + + alice, err := NewMediaPortWith(1, log.WithName("Alice"), newTestCallMonitor(t), c1, &MediaOptions{ + IP: newIP(ip1), + Ports: rtcconfig.PortRange{Start: port1}, + }, rate) + require.NoError(t, err) + defer alice.Close() + + bob, err := NewMediaPortWith(2, log.WithName("Bob"), newTestCallMonitor(t), c2, &MediaOptions{ + IP: newIP(ip2), + Ports: rtcconfig.PortRange{Start: port2}, + }, rate) + require.NoError(t, err) + defer bob.Close() + + // Negotiate. + offer, err := alice.NewOffer(codecs, sdp.EncryptionNone) + require.NoError(t, err) + offerData, err := offer.SDP.Marshal() + require.NoError(t, err) + answer, bobConf, err := bob.SetOffer(offerData, codecs, sdp.EncryptionNone) + require.NoError(t, err) + answerData, err := answer.SDP.Marshal() + require.NoError(t, err) + aliceConf, _, err := alice.SetAnswer(offer, answerData, codecs, sdp.EncryptionNone) + require.NoError(t, err) + require.NoError(t, alice.SetConfig(aliceConf)) + require.NoError(t, bob.SetConfig(bobConf)) + + require.Equal(t, OpusSDPName, alice.Config().Audio.Codec.Info().SDPName, "Opus must be negotiated") + require.Equal(t, OpusSDPName, bob.Config().Audio.Codec.Info().SDPName) + + var recv msdk.PCM16Sample + bob.WriteAudioTo(msdk.NewPCM16BufferWriter(&recv, rate)) + + w := alice.GetAudioWriter() + const ( + frames = 20 + frameLen = rate / 50 // 960 samples = 20ms @ 48kHz + ) + frame := make(msdk.PCM16Sample, frameLen) + for i := range frame { + frame[i] = int16(8000 * math.Sin(2*math.Pi*10*float64(i)/float64(frameLen))) + } + + tsBefore := alice.audioOutRTP.GetCurrentTimestamp() + for i := 0; i < frames; i++ { + require.NoError(t, w.WriteSample(frame)) + // Pace writes so the 10-slot UDP pipe never overflows (which would + // surface as ErrShortWrite) and Bob's read loop keeps draining. + time.Sleep(10 * time.Millisecond) + } + tsAfter := alice.audioOutRTP.GetCurrentTimestamp() + + // Timestamp cadence: 48kHz clock advances exactly 960 ticks per 20ms packet. + require.Equal(t, uint32(frames*frameLen), tsAfter-tsBefore, "RTP timestamp cadence must be 960/frame") + + time.Sleep(200 * time.Millisecond) // let Bob drain + bob.Close() // flush decode pipeline + + require.NotEmpty(t, recv, "Opus must decode to PCM on the receiver") + require.InDelta(t, frames*frameLen, len(recv), float64(3*frameLen), "decoded sample count should track frames sent") + var energy int64 + for _, s := range recv { + energy += int64(s) * int64(s) + } + require.Positive(t, energy, "decoded Opus audio must be non-silent") +} From b9db5455a2638bee472e11b21218b1d8c89d1422 Mon Sep 17 00:00:00 2001 From: "kanimuthumaran.t" Date: Mon, 8 Jun 2026 17:39:20 +0530 Subject: [PATCH 3/4] Opus: de-flake RTP round-trip test and clarify offer-preference docs - TestMediaPortOpusRoundTrip now paces writes by polling received samples (require.Eventually) instead of fixed sleeps, and uses a thread-safe sink; deterministic and -race clean. - README: Opus is preferred only when answering; outbound offers list codecs static-first, so peers selecting by order may still pick G722/G711. Co-Authored-By: Claude Opus 4.8 (1M context) --- README.md | 12 +++++--- pkg/sip/media_port_test.go | 60 ++++++++++++++++++++++++++++---------- 2 files changed, 53 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index daf2d258a..27b459ae1 100644 --- a/README.md +++ b/README.md @@ -82,10 +82,14 @@ your SIP infrastructure**: set `enable_opus: true` to offer it. field to be `2`), but SIP media is encoded/decoded internally as **mono** — appropriate for telephony, and the decoder adapts to whatever channel count a peer actually sends. -- When enabled, Opus is **preferred**; SIP peers that don't support it fall back - transparently to G722, then G711 (PCMU/PCMA). When disabled, Opus never - appears in SDP offers and the existing PCMU/PCMA/G722/DTMF behavior is - unchanged. +- When enabled, Opus is preferred **when we answer an inbound offer** (codec + selection is priority-based, and Opus has the highest priority). In **outbound + offers** we send, codecs are currently listed static-first, so a peer that + selects by offer order may still pick G722/G711 — Opus is offered but not + guaranteed to be chosen. Either way, peers that don't support Opus fall back + transparently to G722, then G711 (PCMU/PCMA). +- When disabled, Opus never appears in SDP offers and the existing + PCMU/PCMA/G722/DTMF behavior is unchanged. - The LiveKit/WebRTC side always uses Opus and is unaffected by this setting. Current limitations to be aware of before a GA rollout: diff --git a/pkg/sip/media_port_test.go b/pkg/sip/media_port_test.go index 4dde885bf..9b88851eb 100644 --- a/pkg/sip/media_port_test.go +++ b/pkg/sip/media_port_test.go @@ -761,10 +761,40 @@ func TestSymmetricRTP(t *testing.T) { }) } +// countingPCMSink is a thread-safe PCM16Writer that tracks received sample +// count and signal energy, so a test goroutine can poll receive progress +// without racing the media read loop. +type countingPCMSink struct { + rate int + mu sync.Mutex + n int + e int64 +} + +func (s *countingPCMSink) String() string { return "counting" } +func (s *countingPCMSink) SampleRate() int { return s.rate } +func (s *countingPCMSink) Close() error { return nil } +func (s *countingPCMSink) WriteSample(d msdk.PCM16Sample) error { + s.mu.Lock() + s.n += len(d) + for _, v := range d { + s.e += int64(v) * int64(v) + } + s.mu.Unlock() + return nil +} +func (s *countingPCMSink) stats() (samples int, energy int64) { + s.mu.Lock() + defer s.mu.Unlock() + return s.n, s.e +} + // TestMediaPortOpusRoundTrip drives Opus through the full MediaPort RTP path // (PCM -> encode -> SeqWriter/RTP -> UDP pipe -> depacketize -> decode -> PCM). // Opus is lossy, so we validate timestamp cadence, sample count, non-silence, -// and absence of errors rather than exact waveforms. +// and absence of errors rather than exact waveforms. Writes are paced by +// polling received progress (no fixed sleeps) so the bounded UDP pipe never +// overflows and the test is deterministic on CI. func TestMediaPortOpusRoundTrip(t *testing.T) { enableOpusForTest(t) @@ -812,8 +842,8 @@ func TestMediaPortOpusRoundTrip(t *testing.T) { require.Equal(t, OpusSDPName, alice.Config().Audio.Codec.Info().SDPName, "Opus must be negotiated") require.Equal(t, OpusSDPName, bob.Config().Audio.Codec.Info().SDPName) - var recv msdk.PCM16Sample - bob.WriteAudioTo(msdk.NewPCM16BufferWriter(&recv, rate)) + sink := &countingPCMSink{rate: rate} + bob.WriteAudioTo(sink) w := alice.GetAudioWriter() const ( @@ -828,23 +858,23 @@ func TestMediaPortOpusRoundTrip(t *testing.T) { tsBefore := alice.audioOutRTP.GetCurrentTimestamp() for i := 0; i < frames; i++ { require.NoError(t, w.WriteSample(frame)) - // Pace writes so the 10-slot UDP pipe never overflows (which would - // surface as ErrShortWrite) and Bob's read loop keeps draining. - time.Sleep(10 * time.Millisecond) + // Pace writes by waiting for this frame to be received/decoded before + // sending the next. This keeps the bounded (10-slot) UDP pipe from + // overflowing without any fixed sleep, and makes the test deterministic. + want := (i + 1) * frameLen + require.Eventually(t, func() bool { + got, _ := sink.stats() + return got >= want + }, 5*time.Second, time.Millisecond, "timed out waiting for frame %d to be decoded", i+1) } tsAfter := alice.audioOutRTP.GetCurrentTimestamp() // Timestamp cadence: 48kHz clock advances exactly 960 ticks per 20ms packet. + // This is synchronous on the send side and independent of receive timing. require.Equal(t, uint32(frames*frameLen), tsAfter-tsBefore, "RTP timestamp cadence must be 960/frame") - time.Sleep(200 * time.Millisecond) // let Bob drain - bob.Close() // flush decode pipeline - - require.NotEmpty(t, recv, "Opus must decode to PCM on the receiver") - require.InDelta(t, frames*frameLen, len(recv), float64(3*frameLen), "decoded sample count should track frames sent") - var energy int64 - for _, s := range recv { - energy += int64(s) * int64(s) - } + // All frames decoded (contiguous stream, no loss), and the audio is non-silent. + gotSamples, energy := sink.stats() + require.GreaterOrEqual(t, gotSamples, frames*frameLen, "all sent frames should decode to PCM") require.Positive(t, energy, "decoded Opus audio must be non-silent") } From 156504fdfa4f56af59913d664a9209e468b9e00e Mon Sep 17 00:00:00 2001 From: "kanimuthumaran.t" Date: Tue, 9 Jun 2026 10:58:28 +0530 Subject: [PATCH 4/4] Opus: test flush padding, stereo->mono downmix, corrupt-packet tolerance Targeted unit tests for the new SIP Opus logic that previously had no coverage: the encoder flush zero-pad path, the channel-adaptive stereo->mono decode, and the decoder's corrupt-packet tolerance/recovery. Deterministic, no fixed sleeps. Co-Authored-By: Claude Opus 4.8 (1M context) --- pkg/media/opus/opus_test.go | 101 ++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/pkg/media/opus/opus_test.go b/pkg/media/opus/opus_test.go index 01989583a..c07047101 100644 --- a/pkg/media/opus/opus_test.go +++ b/pkg/media/opus/opus_test.go @@ -194,3 +194,104 @@ func TestFECPassthrough(t *testing.T) { require.NoError(t, enc.Close()) require.Equal(t, frames*frame, len(out)) } + +// Closing the encoder with a non-frame-aligned tail must flush a final packet +// (zero-padded up to a full frame) instead of erroring on a partial frame. +func TestOpusFlushPadsPartialFrame(t *testing.T) { + const rate = 48000 + + c := &sampleCollector{rate: rate} + enc, err := EncodeWith(c, EncodeOptions{Channels: 1}, logger.GetLogger()) + require.NoError(t, err) + + // 1000 samples is not a multiple of one 20ms frame (960 @ 48kHz): one full + // frame is emitted immediately, leaving 40 samples buffered. + in := make(msdk.PCM16Sample, 1000) + for i := range in { + in[i] = int16(4000 * (i % 50)) + } + require.NoError(t, enc.WriteSample(in)) + require.Equal(t, 1, c.packets, "one full frame should be emitted before close") + + // Close must flush the 40-sample remainder as a zero-padded final frame. + require.NoError(t, enc.Close()) + require.Equal(t, 2, c.packets, "Close must flush the padded trailing partial frame") + require.Positive(t, c.bytes, "flushed frame must carry encoded bytes") +} + +// A stereo Opus packet decoded with a mono target must be downmixed to a single +// mono frame (channel-adaptive decode), and the result must be non-silent. +func TestOpusDecodeStereoToMono(t *testing.T) { + const ( + rate = 48000 + perChannel = rate / 50 // 960 samples/channel = 20ms + ) + + // Build a genuine stereo Opus packet with a raw 2-channel encoder. + enc, err := hopus.NewEncoder(rate, 2, hopus.AppAudio) + require.NoError(t, err) + stereo := make([]int16, perChannel*2) // interleaved L/R + for i := 0; i < perChannel; i++ { + v := int16(5000 * (i % 60)) + stereo[2*i] = v // L + stereo[2*i+1] = v // R + } + pkt := make([]byte, 4000) + n, err := enc.Encode(stereo, pkt) + require.NoError(t, err) + pkt = pkt[:n] + + // Decode with a mono target; the decoder should detect 2 channels and downmix. + var out msdk.PCM16Sample + sink := msdk.NewPCM16BufferWriter(&out, rate) + dec, err := DecodeWith(sink, 1, logger.GetLogger()) + require.NoError(t, err) + require.NoError(t, dec.WriteSample(Sample(pkt))) + + require.Equal(t, perChannel, len(out), "stereo packet must downmix to one mono frame") + var energy int64 + for _, s := range out { + energy += int64(s) * int64(s) + } + require.Positive(t, energy, "downmixed audio must be non-silent") +} + +// The decoder must tolerate a small run of corrupt packets without returning an +// error, then recover and decode a subsequent valid packet. +func TestOpusDecodeToleratesCorruptPackets(t *testing.T) { + const ( + rate = 48000 + frameLen = rate / 50 // 960 + ) + + var out msdk.PCM16Sample + sink := msdk.NewPCM16BufferWriter(&out, rate) + dec, err := DecodeWith(sink, 1, logger.GetLogger()) + require.NoError(t, err) + + // Single-byte code-3 packet (TOC lower 2 bits = 3): a code-3 frame requires + // at least 2 bytes, so libopus returns OPUS_INVALID_PACKET. The mono `s` bit + // (bit 2) is 0, so per-packet channel detection still succeeds and we reach + // the decode-tolerance path. Five corrupt packets is exactly the tolerated + // threshold (a 6th would return an error). + corrupt := []Sample{ + {0x03}, {0x03}, {0x03}, {0x03}, {0x03}, + } + for i, pkt := range corrupt { + require.NoError(t, dec.WriteSample(pkt), "corrupt packet %d must be tolerated, not error", i) + } + require.Empty(t, out, "corrupt packets must not produce PCM") + + // A valid packet afterwards must decode and reset the tolerance counter. + venc, err := hopus.NewEncoder(rate, 1, hopus.AppVoIP) + require.NoError(t, err) + sig := make([]int16, frameLen) + for i := range sig { + sig[i] = int16(4000 * (i % 50)) + } + buf := make([]byte, 4000) + vn, err := venc.Encode(sig, buf) + require.NoError(t, err) + require.NoError(t, dec.WriteSample(Sample(buf[:vn]))) + require.Equal(t, frameLen, len(out), "decoder must recover and decode a valid packet") +}