Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/data-track-schemas-ffi.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
livekit: patch
livekit-datatrack: patch
livekit-ffi: patch
---

Add schema metadata support for data tracks.
5 changes: 4 additions & 1 deletion livekit-datatrack/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
/// Common types for local and remote tracks.
mod track;

/// Schema and frame encoding metadata for typed tracks.
mod schema;

/// Local track publication.
mod local;

Expand All @@ -40,7 +43,7 @@ mod error;

/// Public APIs re-exported by client SDKs.
pub mod api {
pub use crate::{error::*, frame::*, local::*, remote::*, track::*};
pub use crate::{error::*, frame::*, local::*, remote::*, schema::*, track::*};
}

/// Internal APIs used within client SDKs to power data tracks functionality.
Expand Down
3 changes: 3 additions & 0 deletions livekit-datatrack/src/local/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use crate::{
api::{DataTrackInfo, DataTrackOptions, LocalDataTrack, PublishError},
packet::Handle,
schema::{DataTrackFrameEncoding, DataTrackSchemaId},
};
use bytes::Bytes;
use from_variants::FromVariants;
Expand Down Expand Up @@ -124,6 +125,8 @@ pub struct SfuPublishRequest {
pub handle: Handle,
pub name: String,
pub uses_e2ee: bool,
pub schema: Option<DataTrackSchemaId>,
pub frame_encoding: Option<DataTrackFrameEncoding>,
}

/// Request sent to the SFU to unpublish a track.
Expand Down
18 changes: 18 additions & 0 deletions livekit-datatrack/src/local/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ impl Manager {
handle,
name: event.options.name,
uses_e2ee: self.encryption_provider.is_some(),
schema: event.options.schema,
frame_encoding: event.options.frame_encoding,
};
_ = self.event_out_tx.send(event.into()).await;
}
Expand Down Expand Up @@ -280,6 +282,8 @@ impl Manager {
handle: info.pub_handle,
name: info.name.clone(),
uses_e2ee: info.uses_e2ee,
schema: info.schema.clone(),
frame_encoding: info.frame_encoding,
};
_ = state_tx.send(PublishState::Republishing);
_ = self.event_out_tx.send(event.into()).await;
Expand Down Expand Up @@ -525,6 +529,8 @@ mod tests {
pub_handle,
name: event.name,
uses_e2ee: event.uses_e2ee,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
_ = input.send(event.into());
Expand Down Expand Up @@ -604,6 +610,8 @@ mod tests {
pub_handle: handle,
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle, result: Ok(info) };
input.send(event.into()).unwrap();
Expand Down Expand Up @@ -634,6 +642,8 @@ mod tests {
pub_handle: event.handle,
name: "secure".into(),
uses_e2ee: true,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
input.send(event.into()).unwrap();
Expand Down Expand Up @@ -674,6 +684,8 @@ mod tests {
pub_handle: handle,
name: track_name.clone(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle, result: Ok(info) };
input.send(event.into()).unwrap();
Expand All @@ -699,6 +711,8 @@ mod tests {
pub_handle: handle,
name: track_name.clone(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle, result: Ok(info) };
input.send(event.into()).unwrap();
Expand Down Expand Up @@ -728,6 +742,8 @@ mod tests {
pub_handle: event.handle,
name: name.into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
input.send(event.into()).unwrap();
Expand Down Expand Up @@ -767,6 +783,8 @@ mod tests {
pub_handle: event.handle,
name: "active".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};
let event = SfuPublishResponse { handle: event.handle, result: Ok(info) };
input.send(event.into()).unwrap();
Expand Down
13 changes: 12 additions & 1 deletion livekit-datatrack/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use crate::{
api::{DataTrack, DataTrackFrame, DataTrackInfo, InternalError},
schema::{DataTrackFrameEncoding, DataTrackSchemaId},
track::DataTrackInner,
};
use std::{fmt, marker::PhantomData, sync::Arc};
Expand Down Expand Up @@ -153,6 +154,8 @@ impl Drop for LocalTrackInner {
#[derive(Clone, Debug)]
pub struct DataTrackOptions {
pub(crate) name: String,
pub(crate) schema: Option<DataTrackSchemaId>,
pub(crate) frame_encoding: Option<DataTrackFrameEncoding>,
}

impl DataTrackOptions {
Expand All @@ -165,7 +168,15 @@ impl DataTrackOptions {
/// - Must be unique per publisher
///
pub fn new(name: impl Into<String>) -> Self {
Self { name: name.into() }
Self { name: name.into(), schema: None, frame_encoding: None }
}

pub fn with_schema(self, schema: DataTrackSchemaId) -> Self {
Self { schema: Some(schema), ..self }
}

pub fn with_frame_encoding(self, encoding: DataTrackFrameEncoding) -> Self {
Self { frame_encoding: Some(encoding), ..self }
}
}

Expand Down
74 changes: 69 additions & 5 deletions livekit-datatrack/src/local/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,17 @@ impl From<SfuPublishRequest> for proto::PublishDataTrackRequest {
fn from(event: SfuPublishRequest) -> Self {
use proto::encryption::Type;
let encryption = if event.uses_e2ee { Type::Gcm } else { Type::None }.into();
Self { pub_handle: event.handle.into(), name: event.name, encryption }
let schema = event.schema.map(|schema| schema.into());
let frame_encoding = event
.frame_encoding
.map(|encoding| proto::DataTrackFrameEncoding::from(encoding) as i32);
Self {
pub_handle: event.handle.into(),
name: event.name,
encryption,
schema,
frame_encoding,
}
}
}

Expand Down Expand Up @@ -74,8 +84,18 @@ impl TryFrom<proto::DataTrackInfo> for DataTrackInfo {
proto::encryption::Type::Gcm => true,
other => Err(anyhow!("Unsupported E2EE type: {:?}", other))?,
};
let frame_encoding = msg.frame_encoding.map(|_| msg.frame_encoding().into());
let sid: DataTrackSid = msg.sid.try_into().map_err(anyhow::Error::from)?;
Ok(Self { pub_handle: handle, sid: RwLock::new(sid).into(), name: msg.name, uses_e2ee })
let schema = msg.schema.map(|schema| schema.into());

Ok(Self {
pub_handle: handle,
sid: RwLock::new(sid).into(),
name: msg.name,
uses_e2ee,
schema,
frame_encoding,
})
}
}

Expand Down Expand Up @@ -106,12 +126,19 @@ impl From<DataTrackInfo> for proto::DataTrackInfo {
proto::encryption::Type::Gcm
} else {
proto::encryption::Type::None
};
} as i32;
let sid = info.sid().to_string();
let schema = info.schema.map(|schema| schema.into());
let frame_encoding = info
.frame_encoding
.map(|encoding| proto::DataTrackFrameEncoding::from(encoding) as i32);
Self {
pub_handle: info.pub_handle.into(),
sid: info.sid().to_string(),
sid,
name: info.name,
encryption: encryption as i32,
encryption,
schema,
frame_encoding,
}
}
}
Expand All @@ -128,6 +155,8 @@ pub fn publish_responses_for_sync_state(

#[cfg(test)]
mod tests {
use crate::schema::{DataTrackFrameEncoding, DataTrackSchemaEncoding, DataTrackSchemaId};

use super::*;
use fake::{Fake, Faker};

Expand All @@ -137,6 +166,8 @@ mod tests {
handle: 1u32.try_into().unwrap(),
name: "track".into(),
uses_e2ee: true,
schema: None,
frame_encoding: None,
};
let request: proto::PublishDataTrackRequest = event.into();
assert_eq!(request.pub_handle, 1);
Expand All @@ -159,6 +190,12 @@ mod tests {
sid: "DTR_1234".into(),
name: "track".into(),
encryption: proto::encryption::Type::Gcm.into(),
schema: proto::DataTrackSchemaId {
name: "schema".into(),
encoding: proto::DataTrackSchemaEncoding::JsonSchema.into(),
}
.into(),
frame_encoding: Some(proto::DataTrackFrameEncoding::Json.into()),
}
.into(),
};
Expand All @@ -169,9 +206,36 @@ mod tests {
assert_eq!(info.pub_handle, 1u32.try_into().unwrap());
assert_eq!(*info.sid.read().unwrap(), "DTR_1234".to_string().try_into().unwrap());
assert_eq!(info.name, "track");
assert_eq!(
info.schema,
Some(DataTrackSchemaId::new("schema", DataTrackSchemaEncoding::JsonSchema))
);
assert_eq!(info.frame_encoding, Some(DataTrackFrameEncoding::Json));
assert!(info.uses_e2ee);
}

#[test]
fn test_frame_encoding_mapping() {
let base = proto::DataTrackInfo {
pub_handle: 1,
sid: "DTR_1234".into(),
name: "track".into(),
encryption: proto::encryption::Type::None.into(),
schema: None,
frame_encoding: None,
};

let info: DataTrackInfo = base.clone().try_into().unwrap();
assert_eq!(info.frame_encoding, None);

let unspecified = proto::DataTrackInfo {
frame_encoding: Some(proto::DataTrackFrameEncoding::Unspecified.into()),
..base
};
let info: DataTrackInfo = unspecified.try_into().unwrap();
assert_eq!(info.frame_encoding, Some(DataTrackFrameEncoding::Other));
}

#[test]
fn test_from_request_response() {
use proto::request_response::{Reason, Request};
Expand Down
18 changes: 18 additions & 0 deletions livekit-datatrack/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,8 @@ mod tests {
pub_handle: Faker.fake(), // Pub handle
name: track_name.clone(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
}],
)]),
};
Expand Down Expand Up @@ -658,6 +660,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate track published
Expand Down Expand Up @@ -694,6 +698,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate three identical publication updates
Expand Down Expand Up @@ -726,6 +732,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate track published
Expand Down Expand Up @@ -785,6 +793,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: true,
schema: None,
frame_encoding: None,
};

// Simulate track published (with e2ee)
Expand Down Expand Up @@ -846,6 +856,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate track published
Expand Down Expand Up @@ -944,6 +956,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate track published
Expand Down Expand Up @@ -987,6 +1001,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate track published
Expand Down Expand Up @@ -1038,6 +1054,8 @@ mod tests {
pub_handle: Faker.fake(),
name: "test".into(),
uses_e2ee: false,
schema: None,
frame_encoding: None,
};

// Simulate track published
Expand Down
2 changes: 2 additions & 0 deletions livekit-datatrack/src/remote/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ mod tests {
sid: "DTR_1234".into(),
name: "track1".into(),
encryption: proto::encryption::Type::Gcm.into(),
schema: None,
frame_encoding: None,
}];
let mut participant_info = proto::ParticipantInfo { data_tracks, ..Default::default() };

Expand Down
Loading