add support for bypassing fragmented mp4 generation for live TS streams#150
add support for bypassing fragmented mp4 generation for live TS streams#150
Conversation
* Add timecode parameter. Add elv_channel stats * Fix parsing for exc timecode, add timecode to mux text, and correct sample cmd --------- Co-authored-by: Peter Tseng <42591958+elv-peter@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Adds a new “raw-only” pathway for live MPEG-TS inputs that bypasses ffmpeg/libavpipe entirely, enabling verbatim TS part generation without producing fragmented MP4.
Changes:
- Introduces
CopyModeRawOnlyand a new MPEGTS “start” stat (AV_IN_STAT_MPEGTS_START). - Adds a
BypassProcessorabstraction + global registry to run non-ffmpeg pipelines. - Implements an MPEGTS bypass processor with first-packet start reporting and wires it into
XcInit/XcRun/XcCancel.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| goavpipe/structs.go | Adds new copy mode + MPEGTS start stat; adds JSON tags to input processor config fields. |
| goavpipe/handlers.go | Adds BypassProcessor interface and global handle registry for bypass processors. |
| broadcastproto/mpegts/mpegts.go | Extends SequentialOpener to support reporting stream start; adds ReportStart() on processor. |
| broadcastproto/mpegts/custom.go | Adds a first-packet hook to trigger “start” reporting. |
| broadcastproto/mpegts/bypass.go | New bypass processor implementation that reads TS and segments/writes parts without ffmpeg. |
| avpipe_seq_writer.go | Implements ReportStart() and emits the new MPEGTS start stat. |
| avpipe.go | Adds bypass-mode execution/cancel support and refactors AVPipeOpenInput into a Go helper. |
Comments suppressed due to low confidence (1)
avpipe.go:1186
- Bypass-handle execution falls through into the normal ffmpeg path. For handles < -1 this will hit
if handle < 0 { return EAV_BAD_HANDLE }after the bypass processor completes, so bypass runs will always be reported asEAV_BAD_HANDLE. Add an explicitreturn nil(and any needed cleanup) afterprocessor.Wait()in the bypass branch.
defer goavpipe.XCEnded()
if handle < 0 {
return EAV_BAD_HANDLE
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if params.InputCfg.CopyMode == goavpipe.CopyModeRawOnly { | ||
| goavpipe.Log.Info("initializing bypass processor", "copy_mode", params.InputCfg.CopyMode) | ||
| // Bypass ffmpeg completely and copy the stream verbatim to parts | ||
| bypassProcessor, err := mpegts.NewBypassProcessor(params, seqOpenerF) | ||
| if err != nil { | ||
| return -1, errors.E("XcInit", errors.K.Invalid.Default(), err) | ||
| } | ||
| handle := goavpipe.Globals.InitBypassProcessor(bypassProcessor) | ||
| return handle, nil | ||
| } |
There was a problem hiding this comment.
New bypass mode (CopyModeRawOnly / negative handles) is not covered by existing tests in avpipe_test.go (no references to raw_only). Add tests that exercise XcInit/XcRun/XcCancel for bypass handles, including the early-cancel-before-start case and ensuring resources (CIO handler/output opener tables) are cleaned up.
| fd, _ := AVPipeOpenInputGo(processor.XcParams().Url) | ||
| err := processor.Start(fd) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| processor.Wait() |
There was a problem hiding this comment.
The bypass path opens an input fd via AVPipeOpenInputGo(...) but never closes it, which will leak the input handler and associated global maps (gHandlers, gURLOutputOpenersByHandler). Ensure the fd is closed (e.g., call AVPipeCloseInput/equivalent) once the bypass processor finishes (including on error paths). Also check fd < 0 before calling processor.Start(fd) and return a meaningful error if opening fails.
| func XcCancel(handle int32) error { | ||
| if handle < -1 { | ||
| processor, ok := goavpipe.Globals.GetBypassProcessor(handle) | ||
| if !ok { | ||
| return EAV_BAD_HANDLE | ||
| } |
There was a problem hiding this comment.
XcCancel for bypass handles calls processor.Cancel() unconditionally. If cancel can be invoked before XcRun has called Start(), Cancel() may dereference a nil netReader (panic). Make bypass cancellation safe before start (either guard here or make BypassProcessor.Cancel() a no-op until started).
| func XcCancel(handle int32) error { | |
| if handle < -1 { | |
| processor, ok := goavpipe.Globals.GetBypassProcessor(handle) | |
| if !ok { | |
| return EAV_BAD_HANDLE | |
| } | |
| func XcCancel(handle int32) (err error) { | |
| if handle < -1 { | |
| processor, ok := goavpipe.Globals.GetBypassProcessor(handle) | |
| if !ok { | |
| return EAV_BAD_HANDLE | |
| } | |
| // Make bypass cancellation safe even if invoked before the processor has started. | |
| // If Cancel() panics (for example, due to a nil internal reader), convert that | |
| // into a regular error instead of crashing the process. | |
| defer func() { | |
| if r := recover(); r != nil { | |
| err = EAV_CANCEL_FAILED | |
| } | |
| }() |
| // CopyModeRawOnly is like CopyModeRaw, but also disables transcoding and production of fragmented mp4 parts | ||
| CopyModeRawOnly CopyMode = "raw_only" |
There was a problem hiding this comment.
CopyModeRawOnly is introduced here, but InputConfig.Validate() does not currently accept/handle the raw_only value, so XcParams.Validate() will reject configs that try to use this new mode. Update the validation switch statements to include CopyModeRawOnly (and apply the intended constraints, likely similar to CopyModeRaw).
* use bypass processor if `raw_only` is specified on the stream independent of the `bypass_libav_reader` setting * ensure BypassProcessor.Status/Cancel/Wait don't panic if called before Start() * ensure net reader is canceled even if blocked on reading from network (by closing the socket) * comment typo
# Conflicts: # goavpipe/structs.go
* Fix RTP streamids - use udp:// * Fix atof NULL seg_duration --------- Co-authored-by: Serban Simu <serban@Serbans-MacBook-Pro.local>
…processor # Conflicts: # go.mod # go.sum
No description provided.