From 9fb3b5c9a146082b89f86997690fcf92ca73156c Mon Sep 17 00:00:00 2001 From: Greg Lamberson Date: Thu, 12 Feb 2026 22:14:31 +0200 Subject: [PATCH 1/3] feat(server): add EGFX server integration with DVC bridge Wire ironrdp-egfx's GraphicsPipelineServer into ironrdp-server, enabling H.264 AVC420/AVC444 video streaming to RDP clients. Changes to ironrdp-egfx: - Make CHANNEL_NAME public for cross-crate DVC registration - Add ZGFX uncompressed segment wrapping for DVC wire format - Auto-send ResetGraphics before first CreateSurface (MS-RDPEGFX) - Track DVC channel_id from start() for proactive frame encoding - Rewrite drain_output() to ZGFX-wrap PDUs for DVC transmission New ironrdp-server gfx module (behind "egfx" feature): - GfxServerFactory trait following CliprdrServerFactory pattern - GfxDvcBridge: Arc wrapper enabling shared access between DVC message processing and proactive frame submission - ServerEvent::Egfx variant for routing EGFX PDUs to the wire - Builder integration with with_gfx_factory() --- Cargo.lock | 1 + crates/ironrdp-egfx/src/lib.rs | 3 +- crates/ironrdp-egfx/src/server.rs | 164 +++++++++++++++++- crates/ironrdp-server/Cargo.toml | 2 + crates/ironrdp-server/src/builder.rs | 17 ++ crates/ironrdp-server/src/gfx.rs | 105 +++++++++++ crates/ironrdp-server/src/lib.rs | 4 + crates/ironrdp-server/src/server.rs | 35 ++++ .../tests/egfx/server.rs | 5 +- 9 files changed, 325 insertions(+), 11 deletions(-) create mode 100644 crates/ironrdp-server/src/gfx.rs diff --git a/Cargo.lock b/Cargo.lock index 451d9bf8c..8741a0d44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2859,6 +2859,7 @@ dependencies = [ "ironrdp-core", "ironrdp-displaycontrol", "ironrdp-dvc", + "ironrdp-egfx", "ironrdp-graphics", "ironrdp-pdu", "ironrdp-rdpsnd", diff --git a/crates/ironrdp-egfx/src/lib.rs b/crates/ironrdp-egfx/src/lib.rs index 54cea6638..202e68994 100644 --- a/crates/ironrdp-egfx/src/lib.rs +++ b/crates/ironrdp-egfx/src/lib.rs @@ -1,7 +1,8 @@ #![cfg_attr(doc, doc = include_str!("../README.md"))] #![doc(html_logo_url = "https://cdnweb.devolutions.net/images/projects/devolutions/logos/devolutions-icon-shadow.svg")] -pub(crate) const CHANNEL_NAME: &str = "Microsoft::Windows::RDS::Graphics"; +/// EGFX dynamic virtual channel name per MS-RDPEGFX +pub const CHANNEL_NAME: &str = "Microsoft::Windows::RDS::Graphics"; pub mod client; pub mod pdu; diff --git a/crates/ironrdp-egfx/src/server.rs b/crates/ironrdp-egfx/src/server.rs index bf82bf2cb..d584fb9fd 100644 --- a/crates/ironrdp-egfx/src/server.rs +++ b/crates/ironrdp-egfx/src/server.rs @@ -58,8 +58,8 @@ use std::collections::{HashMap, VecDeque}; use std::time::Instant; -use ironrdp_core::{decode, impl_as_any}; -use ironrdp_dvc::{DvcMessage, DvcProcessor, DvcServerProcessor}; +use ironrdp_core::{decode, impl_as_any, Encode, EncodeResult, WriteCursor}; +use ironrdp_dvc::{DvcEncode, DvcMessage, DvcProcessor, DvcServerProcessor}; use ironrdp_pdu::gcc::Monitor; use ironrdp_pdu::geometry::InclusiveRectangle; use ironrdp_pdu::{decode_err, PduResult}; @@ -85,6 +85,81 @@ const DEFAULT_MAX_FRAMES_IN_FLIGHT: u32 = 3; /// Special queue depth value indicating client has disabled acknowledgments const SUSPEND_FRAME_ACK_QUEUE_DEPTH: u32 = 0xFFFFFFFF; +// ============================================================================ +// ZGFX Segment Wrapping +// ============================================================================ + +/// Maximum data per ZGFX segment (MS-RDPEGFX 2.2.5) +const ZGFX_SEGMENT_MAXSIZE: usize = 65535; + +/// Single-segment descriptor (MS-RDPEGFX 2.2.5.1) +const DESCRIPTOR_SINGLE: u8 = 0xE0; + +/// Multi-segment descriptor (MS-RDPEGFX 2.2.5.1) +const DESCRIPTOR_MULTIPART: u8 = 0xE1; + +/// RDP8_BULK_ENCODED_DATA with no compression (CompressionType = RDP8 | no flags) +const BULK_ENCODED_UNCOMPRESSED: u8 = 0x04; + +/// Wrap raw data in ZGFX uncompressed segment format. +/// +/// Produces the wire format that `Decompressor::decompress()` expects: +/// single-segment for small data, multi-segment for data exceeding 65535 bytes. +fn wrap_zgfx_uncompressed(data: &[u8]) -> Vec { + if data.len() <= ZGFX_SEGMENT_MAXSIZE { + let mut out = Vec::with_capacity(2 + data.len()); + out.push(DESCRIPTOR_SINGLE); + out.push(BULK_ENCODED_UNCOMPRESSED); + out.extend_from_slice(data); + out + } else { + let segment_count = data.len().div_ceil(ZGFX_SEGMENT_MAXSIZE); + // Header: descriptor(1) + segment_count(2) + uncompressed_size(4) + per-segment: size(4) + flag(1) + data + let header_size = 1 + 2 + 4 + segment_count * 4; + let mut out = Vec::with_capacity(header_size + segment_count + data.len()); + + out.push(DESCRIPTOR_MULTIPART); + out.extend_from_slice(&u16::try_from(segment_count).unwrap_or(u16::MAX).to_le_bytes()); + out.extend_from_slice(&u32::try_from(data.len()).unwrap_or(u32::MAX).to_le_bytes()); + + for chunk in data.chunks(ZGFX_SEGMENT_MAXSIZE) { + // segment_size includes the 1-byte bulk header + let segment_size = u32::try_from(chunk.len() + 1).unwrap_or(u32::MAX); + out.extend_from_slice(&segment_size.to_le_bytes()); + out.push(BULK_ENCODED_UNCOMPRESSED); + out.extend_from_slice(chunk); + } + + out + } +} + +/// Pre-encoded ZGFX-wrapped bytes for DVC transmission. +/// +/// `Encode::encode()` takes `&self`, but ZGFX wrapping is done in `drain_output()` +/// where `&mut self` is available. This type holds the already-wrapped bytes. +struct ZgfxWrappedBytes { + bytes: Vec, + pdu_name: &'static str, +} + +impl Encode for ZgfxWrappedBytes { + fn encode(&self, dst: &mut WriteCursor<'_>) -> EncodeResult<()> { + dst.write_slice(&self.bytes); + Ok(()) + } + + fn name(&self) -> &'static str { + self.pdu_name + } + + fn size(&self) -> usize { + self.bytes.len() + } +} + +impl DvcEncode for ZgfxWrappedBytes {} + // ============================================================================ // Surface Management // ============================================================================ @@ -607,7 +682,12 @@ pub struct GraphicsPipelineServer { output_width: u16, output_height: u16, + /// MS-RDPEGFX requires ResetGraphics before any CreateSurface + reset_graphics_sent: bool, output_queue: VecDeque, + + /// Stored from DvcProcessor::start() for proactive frame encoding + channel_id: Option, } impl GraphicsPipelineServer { @@ -626,10 +706,29 @@ impl GraphicsPipelineServer { frames, output_width: 0, output_height: 0, + reset_graphics_sent: false, output_queue: VecDeque::new(), + channel_id: None, } } + /// Set desktop output dimensions for ResetGraphics. + /// + /// Call before `create_surface()` when the desktop size differs from + /// the surface size (e.g. 16-pixel alignment padding). + pub fn set_output_dimensions(&mut self, width: u16, height: u16) { + self.output_width = width; + self.output_height = height; + } + + /// DVC channel ID assigned by DRDYNVC. + /// + /// Returns `None` before the channel has been started. + #[must_use] + pub fn channel_id(&self) -> Option { + self.channel_id + } + // ======================================================================== // State Queries // ======================================================================== @@ -688,6 +787,31 @@ impl GraphicsPipelineServer { return None; } + // MS-RDPEGFX: ResetGraphics MUST precede any CreateSurface. + // Auto-send on first surface creation if not explicitly sent via resize(). + if !self.reset_graphics_sent { + let desktop_width = if self.output_width > 0 { + self.output_width + } else { + width + }; + let desktop_height = if self.output_height > 0 { + self.output_height + } else { + height + }; + + self.output_queue.push_back(GfxPdu::ResetGraphics(ResetGraphicsPdu { + width: u32::from(desktop_width), + height: u32::from(desktop_height), + monitors: Vec::new(), + })); + + self.output_width = desktop_width; + self.output_height = desktop_height; + self.reset_graphics_sent = true; + } + let surface_id = self.surfaces.allocate_id(); let surface = Surface::new(surface_id, width, height, pixel_format); @@ -804,6 +928,7 @@ impl GraphicsPipelineServer { monitors, })); + self.reset_graphics_sent = true; self.state = ServerState::Ready; } @@ -1016,14 +1141,34 @@ impl GraphicsPipelineServer { // Output Management // ======================================================================== - /// Drain the output queue and return PDUs to send + /// Drain the output queue, ZGFX-wrapping each PDU for DVC transmission. + /// + /// Each `GfxPdu` is encoded to bytes then wrapped in uncompressed ZGFX + /// segment format. Windows clients expect this wrapping on the EGFX DVC. /// - /// Call this method to get pending PDUs that need to be sent to the client. + /// # Panics + /// + /// Panics if a `GfxPdu` fails to encode. This indicates a bug in the PDU + /// encoding logic, not a runtime condition. #[expect(clippy::as_conversions, reason = "Box to Box coercion")] pub fn drain_output(&mut self) -> Vec { self.output_queue .drain(..) - .map(|pdu| Box::new(pdu) as DvcMessage) + .map(|pdu| { + let pdu_name = pdu.name(); + let pdu_size = pdu.size(); + let mut pdu_bytes = vec![0u8; pdu_size]; + let mut cursor = WriteCursor::new(&mut pdu_bytes); + pdu.encode(&mut cursor).expect("GfxPdu encoding should not fail"); + + let wrapped = wrap_zgfx_uncompressed(&pdu_bytes); + trace!(pdu_name, pdu_size, wrapped = wrapped.len(), "ZGFX wrapped"); + + Box::new(ZgfxWrappedBytes { + bytes: wrapped, + pdu_name, + }) as DvcMessage + }) .collect() } @@ -1099,13 +1244,16 @@ impl DvcProcessor for GraphicsPipelineServer { CHANNEL_NAME } - fn start(&mut self, _channel_id: u32) -> PduResult> { - // Server waits for client CapabilitiesAdvertise before sending anything + fn start(&mut self, channel_id: u32) -> PduResult> { + self.channel_id = Some(channel_id); + debug!(channel_id, "EGFX channel started"); Ok(vec![]) } fn close(&mut self, _channel_id: u32) { + debug!("EGFX channel closed"); self.state = ServerState::Closed; + self.reset_graphics_sent = false; self.handler.on_close(); } @@ -1142,7 +1290,7 @@ impl DvcServerProcessor for GraphicsPipelineServer {} /// Encode an AVC444 bitmap stream to bytes fn encode_avc444_bitmap_stream(stream: &Avc444BitmapStream<'_>) -> Vec { - use ironrdp_pdu::{Encode as _, WriteCursor}; + use ironrdp_pdu::Encode as _; let size = stream.size(); let mut buf = vec![0u8; size]; diff --git a/crates/ironrdp-server/Cargo.toml b/crates/ironrdp-server/Cargo.toml index 684f34c60..ab658a864 100644 --- a/crates/ironrdp-server/Cargo.toml +++ b/crates/ironrdp-server/Cargo.toml @@ -21,6 +21,7 @@ helper = ["dep:x509-cert", "dep:rustls-pemfile"] rayon = ["dep:rayon"] qoi = ["dep:qoicoubeh", "ironrdp-pdu/qoi"] qoiz = ["dep:zstd-safe", "qoi", "ironrdp-pdu/qoiz"] +egfx = ["dep:ironrdp-egfx"] # Internal (PRIVATE!) features used to aid testing. # Don't rely on these whatsoever. They may disappear at any time. @@ -51,6 +52,7 @@ bytes = "1" visibility = { version = "0.1", optional = true } qoicoubeh = { version = "0.5", optional = true } zstd-safe = { version = "7.2", optional = true } +ironrdp-egfx = { path = "../ironrdp-egfx", version = "0.1", optional = true } [dev-dependencies] tokio = { version = "1", features = ["sync"] } diff --git a/crates/ironrdp-server/src/builder.rs b/crates/ironrdp-server/src/builder.rs index 499e00b35..4c914ab88 100644 --- a/crates/ironrdp-server/src/builder.rs +++ b/crates/ironrdp-server/src/builder.rs @@ -6,6 +6,8 @@ use tokio_rustls::TlsAcceptor; use super::clipboard::CliprdrServerFactory; use super::display::{DesktopSize, RdpServerDisplay}; +#[cfg(feature = "egfx")] +use super::gfx::GfxServerFactory; use super::handler::{KeyboardEvent, MouseEvent, RdpServerInputHandler}; use super::server::{RdpServer, RdpServerOptions, RdpServerSecurity}; use crate::{DisplayUpdate, RdpServerDisplayUpdates, SoundServerFactory}; @@ -31,6 +33,8 @@ pub struct BuilderDone { display: Box, cliprdr_factory: Option>, sound_factory: Option>, + #[cfg(feature = "egfx")] + gfx_factory: Option>, } pub struct RdpServerBuilder { @@ -124,6 +128,8 @@ impl RdpServerBuilder { sound_factory: None, cliprdr_factory: None, codecs: server_codecs_capabilities(&[]).expect("can't panic for &[]"), + #[cfg(feature = "egfx")] + gfx_factory: None, }, } } @@ -138,6 +144,8 @@ impl RdpServerBuilder { sound_factory: None, cliprdr_factory: None, codecs: server_codecs_capabilities(&[]).expect("can't panic for &[]"), + #[cfg(feature = "egfx")] + gfx_factory: None, }, } } @@ -154,6 +162,13 @@ impl RdpServerBuilder { self } + /// Configure EGFX (Graphics Pipeline Extension) for H.264 video streaming. + #[cfg(feature = "egfx")] + pub fn with_gfx_factory(mut self, gfx_factory: Option>) -> Self { + self.state.gfx_factory = gfx_factory; + self + } + pub fn with_bitmap_codecs(mut self, codecs: BitmapCodecs) -> Self { self.state.codecs = codecs; self @@ -170,6 +185,8 @@ impl RdpServerBuilder { self.state.display, self.state.sound_factory, self.state.cliprdr_factory, + #[cfg(feature = "egfx")] + self.state.gfx_factory, ) } } diff --git a/crates/ironrdp-server/src/gfx.rs b/crates/ironrdp-server/src/gfx.rs new file mode 100644 index 000000000..27b320dc4 --- /dev/null +++ b/crates/ironrdp-server/src/gfx.rs @@ -0,0 +1,105 @@ +//! EGFX (Graphics Pipeline Extension) server integration. +//! +//! Provides the bridge between `ironrdp-egfx`'s `GraphicsPipelineServer` and +//! `ironrdp-server`'s `RdpServer`, enabling H.264 video streaming via DVC. +//! +//! The bridge pattern (`GfxDvcBridge`) wraps an `Arc>` +//! so the display handler can call `send_avc420_frame()` proactively while the +//! DVC infrastructure handles client messages (capability negotiation, frame acks). + +use std::sync::{Arc, Mutex}; + +use ironrdp_core::impl_as_any; +use ironrdp_dvc::{DvcMessage, DvcProcessor, DvcServerProcessor}; +use ironrdp_egfx::server::{GraphicsPipelineHandler, GraphicsPipelineServer}; +use ironrdp_pdu::PduResult; +use ironrdp_svc::SvcMessage; + +/// Shared handle to a `GraphicsPipelineServer`. +/// +/// Uses `std::sync::Mutex` (not tokio) because `DvcProcessor` trait methods +/// are synchronous and cannot hold async locks. +pub type GfxServerHandle = Arc>; + +/// Factory for creating EGFX graphics pipeline handlers. +/// +/// Follows the same pattern as `CliprdrServerFactory` and `SoundServerFactory`. +pub trait GfxServerFactory: Send { + /// Create a handler for EGFX callbacks (caps negotiation, frame acks). + fn build_gfx_handler(&self) -> Box; + + /// Create a bridge and shared server handle for proactive frame sending. + /// + /// When returning `Some`, the bridge is registered with DrdynvcServer for + /// client messages, and the handle is available for direct frame submission. + /// Returns `None` by default, falling back to `build_gfx_handler()`. + fn build_server_with_handle(&self) -> Option<(GfxDvcBridge, GfxServerHandle)> { + None + } +} + +/// DVC bridge wrapping a shared `GraphicsPipelineServer`. +/// +/// Delegates all `DvcProcessor` methods to the inner server through a mutex, +/// enabling shared access from both the DVC layer and the display handler. +pub struct GfxDvcBridge { + inner: GfxServerHandle, +} + +impl GfxDvcBridge { + pub fn new(server: GfxServerHandle) -> Self { + Self { inner: server } + } + + pub fn server(&self) -> &GfxServerHandle { + &self.inner + } +} + +impl_as_any!(GfxDvcBridge); + +impl DvcProcessor for GfxDvcBridge { + fn channel_name(&self) -> &str { + ironrdp_egfx::CHANNEL_NAME + } + + fn start(&mut self, channel_id: u32) -> PduResult> { + self.inner + .lock() + .expect("GfxServerHandle mutex poisoned") + .start(channel_id) + } + + fn process(&mut self, channel_id: u32, payload: &[u8]) -> PduResult> { + self.inner + .lock() + .expect("GfxServerHandle mutex poisoned") + .process(channel_id, payload) + } + + fn close(&mut self, channel_id: u32) { + self.inner + .lock() + .expect("GfxServerHandle mutex poisoned") + .close(channel_id) + } +} + +impl DvcServerProcessor for GfxDvcBridge {} + +/// Message for routing EGFX PDUs to the wire via `ServerEvent`. +#[derive(Debug)] +pub enum EgfxServerMessage { + /// Pre-encoded DVC messages from `GraphicsPipelineServer::drain_output()`. + SendMessages { channel_id: u32, messages: Vec }, +} + +impl core::fmt::Display for EgfxServerMessage { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::SendMessages { channel_id, messages } => { + write!(f, "SendMessages(channel={channel_id}, count={})", messages.len()) + } + } + } +} diff --git a/crates/ironrdp-server/src/lib.rs b/crates/ironrdp-server/src/lib.rs index bddb9a1a5..3d736da83 100644 --- a/crates/ironrdp-server/src/lib.rs +++ b/crates/ironrdp-server/src/lib.rs @@ -11,6 +11,8 @@ mod capabilities; mod clipboard; mod display; mod encoder; +#[cfg(feature = "egfx")] +mod gfx; mod handler; #[cfg(feature = "helper")] mod helper; @@ -19,6 +21,8 @@ mod sound; pub use clipboard::*; pub use display::*; +#[cfg(feature = "egfx")] +pub use gfx::*; pub use handler::*; #[cfg(feature = "helper")] pub use helper::*; diff --git a/crates/ironrdp-server/src/server.rs b/crates/ironrdp-server/src/server.rs index 5b35b54c3..f3e5179e8 100644 --- a/crates/ironrdp-server/src/server.rs +++ b/crates/ironrdp-server/src/server.rs @@ -32,6 +32,8 @@ use {ironrdp_dvc as dvc, ironrdp_rdpsnd as rdpsnd}; use crate::clipboard::CliprdrServerFactory; use crate::display::{DisplayUpdate, RdpServerDisplay}; use crate::encoder::{UpdateEncoder, UpdateEncoderCodecs}; +#[cfg(feature = "egfx")] +use crate::gfx::{EgfxServerMessage, GfxServerFactory}; use crate::handler::RdpServerInputHandler; use crate::{builder, capabilities, SoundServerFactory}; @@ -217,6 +219,8 @@ pub struct RdpServer { static_channels: StaticChannelSet, sound_factory: Option>, cliprdr_factory: Option>, + #[cfg(feature = "egfx")] + gfx_factory: Option>, ev_sender: mpsc::UnboundedSender, ev_receiver: Arc>>, creds: Option, @@ -230,6 +234,8 @@ pub enum ServerEvent { Rdpsnd(RdpsndServerMessage), SetCredentials(Credentials), GetLocalAddr(oneshot::Sender>), + #[cfg(feature = "egfx")] + Egfx(EgfxServerMessage), } pub trait ServerEventSender { @@ -256,6 +262,7 @@ impl RdpServer { display: Box, mut sound_factory: Option>, mut cliprdr_factory: Option>, + #[cfg(feature = "egfx")] gfx_factory: Option>, ) -> Self { let (ev_sender, ev_receiver) = ServerEvent::create_channel(); if let Some(cliprdr) = cliprdr_factory.as_mut() { @@ -271,6 +278,8 @@ impl RdpServer { static_channels: StaticChannelSet::new(), sound_factory, cliprdr_factory, + #[cfg(feature = "egfx")] + gfx_factory, ev_sender, ev_receiver: Arc::new(Mutex::new(ev_receiver)), creds: None, @@ -307,6 +316,22 @@ impl RdpServer { handler: Arc::clone(&self.handler), }) .with_dynamic_channel(DisplayControlServer::new(Box::new(dcs_backend))); + + #[cfg(feature = "egfx")] + let dvc = { + let mut dvc = dvc; + if let Some(gfx_factory) = self.gfx_factory.as_deref() { + if let Some((bridge, _handle)) = gfx_factory.build_server_with_handle() { + dvc = dvc.with_dynamic_channel(bridge); + } else { + let handler = gfx_factory.build_gfx_handler(); + let gfx_server = ironrdp_egfx::server::GraphicsPipelineServer::new(handler); + dvc = dvc.with_dynamic_channel(gfx_server); + } + } + dvc + }; + acceptor.attach_static_channel(dvc); } @@ -570,6 +595,16 @@ impl RdpServer { let data = server_encode_svc_messages(msgs.into(), channel_id, user_channel_id)?; writer.write_all(&data).await?; } + #[cfg(feature = "egfx")] + ServerEvent::Egfx(msg) => match msg { + EgfxServerMessage::SendMessages { messages, .. } => { + let drdynvc_channel_id = self + .get_channel_id_by_type::() + .ok_or_else(|| anyhow!("DRDYNVC channel not found"))?; + let data = server_encode_svc_messages(messages, drdynvc_channel_id, user_channel_id)?; + writer.write_all(&data).await?; + } + }, } } diff --git a/crates/ironrdp-testsuite-core/tests/egfx/server.rs b/crates/ironrdp-testsuite-core/tests/egfx/server.rs index dcc192d65..ed8ab50d1 100644 --- a/crates/ironrdp-testsuite-core/tests/egfx/server.rs +++ b/crates/ironrdp-testsuite-core/tests/egfx/server.rs @@ -179,9 +179,10 @@ fn test_surface_lifecycle() { assert!(server.delete_surface(sid)); assert!(server.get_surface(sid).is_none()); - // Drain output (should have CreateSurface, MapSurfaceToOutput, DeleteSurface PDUs) + // Drain output: ResetGraphics (auto-sent before first surface), CreateSurface, + // MapSurfaceToOutput, DeleteSurface let output = server.drain_output(); - assert_eq!(output.len(), 3); + assert_eq!(output.len(), 4); } #[test] From 96a8c7d92f23dba42dcfe929d628528dac0809e6 Mon Sep 17 00:00:00 2001 From: Greg Lamberson Date: Fri, 13 Feb 2026 15:49:37 +0200 Subject: [PATCH 2/3] fix(server): address review feedback on EGFX integration - Remove unused channel_id from EgfxServerMessage::SendMessages - Make GfxServerFactory extend ServerEventSender for event loop signaling - Store GfxServerHandle in RdpServer instead of discarding it - Replace local wrap_zgfx_uncompressed with ironrdp_graphics::zgfx::wrap_uncompressed --- crates/ironrdp-egfx/src/server.rs | 52 ++--------------------------- crates/ironrdp-server/src/gfx.rs | 13 +++++--- crates/ironrdp-server/src/server.rs | 26 +++++++++++++-- 3 files changed, 33 insertions(+), 58 deletions(-) diff --git a/crates/ironrdp-egfx/src/server.rs b/crates/ironrdp-egfx/src/server.rs index d584fb9fd..b39d0b98d 100644 --- a/crates/ironrdp-egfx/src/server.rs +++ b/crates/ironrdp-egfx/src/server.rs @@ -60,6 +60,7 @@ use std::time::Instant; use ironrdp_core::{decode, impl_as_any, Encode, EncodeResult, WriteCursor}; use ironrdp_dvc::{DvcEncode, DvcMessage, DvcProcessor, DvcServerProcessor}; +use ironrdp_graphics::zgfx::wrap_uncompressed; use ironrdp_pdu::gcc::Monitor; use ironrdp_pdu::geometry::InclusiveRectangle; use ironrdp_pdu::{decode_err, PduResult}; @@ -85,55 +86,6 @@ const DEFAULT_MAX_FRAMES_IN_FLIGHT: u32 = 3; /// Special queue depth value indicating client has disabled acknowledgments const SUSPEND_FRAME_ACK_QUEUE_DEPTH: u32 = 0xFFFFFFFF; -// ============================================================================ -// ZGFX Segment Wrapping -// ============================================================================ - -/// Maximum data per ZGFX segment (MS-RDPEGFX 2.2.5) -const ZGFX_SEGMENT_MAXSIZE: usize = 65535; - -/// Single-segment descriptor (MS-RDPEGFX 2.2.5.1) -const DESCRIPTOR_SINGLE: u8 = 0xE0; - -/// Multi-segment descriptor (MS-RDPEGFX 2.2.5.1) -const DESCRIPTOR_MULTIPART: u8 = 0xE1; - -/// RDP8_BULK_ENCODED_DATA with no compression (CompressionType = RDP8 | no flags) -const BULK_ENCODED_UNCOMPRESSED: u8 = 0x04; - -/// Wrap raw data in ZGFX uncompressed segment format. -/// -/// Produces the wire format that `Decompressor::decompress()` expects: -/// single-segment for small data, multi-segment for data exceeding 65535 bytes. -fn wrap_zgfx_uncompressed(data: &[u8]) -> Vec { - if data.len() <= ZGFX_SEGMENT_MAXSIZE { - let mut out = Vec::with_capacity(2 + data.len()); - out.push(DESCRIPTOR_SINGLE); - out.push(BULK_ENCODED_UNCOMPRESSED); - out.extend_from_slice(data); - out - } else { - let segment_count = data.len().div_ceil(ZGFX_SEGMENT_MAXSIZE); - // Header: descriptor(1) + segment_count(2) + uncompressed_size(4) + per-segment: size(4) + flag(1) + data - let header_size = 1 + 2 + 4 + segment_count * 4; - let mut out = Vec::with_capacity(header_size + segment_count + data.len()); - - out.push(DESCRIPTOR_MULTIPART); - out.extend_from_slice(&u16::try_from(segment_count).unwrap_or(u16::MAX).to_le_bytes()); - out.extend_from_slice(&u32::try_from(data.len()).unwrap_or(u32::MAX).to_le_bytes()); - - for chunk in data.chunks(ZGFX_SEGMENT_MAXSIZE) { - // segment_size includes the 1-byte bulk header - let segment_size = u32::try_from(chunk.len() + 1).unwrap_or(u32::MAX); - out.extend_from_slice(&segment_size.to_le_bytes()); - out.push(BULK_ENCODED_UNCOMPRESSED); - out.extend_from_slice(chunk); - } - - out - } -} - /// Pre-encoded ZGFX-wrapped bytes for DVC transmission. /// /// `Encode::encode()` takes `&self`, but ZGFX wrapping is done in `drain_output()` @@ -1161,7 +1113,7 @@ impl GraphicsPipelineServer { let mut cursor = WriteCursor::new(&mut pdu_bytes); pdu.encode(&mut cursor).expect("GfxPdu encoding should not fail"); - let wrapped = wrap_zgfx_uncompressed(&pdu_bytes); + let wrapped = wrap_uncompressed(&pdu_bytes); trace!(pdu_name, pdu_size, wrapped = wrapped.len(), "ZGFX wrapped"); Box::new(ZgfxWrappedBytes { diff --git a/crates/ironrdp-server/src/gfx.rs b/crates/ironrdp-server/src/gfx.rs index 27b320dc4..5663768d4 100644 --- a/crates/ironrdp-server/src/gfx.rs +++ b/crates/ironrdp-server/src/gfx.rs @@ -15,6 +15,8 @@ use ironrdp_egfx::server::{GraphicsPipelineHandler, GraphicsPipelineServer}; use ironrdp_pdu::PduResult; use ironrdp_svc::SvcMessage; +use crate::server::ServerEventSender; + /// Shared handle to a `GraphicsPipelineServer`. /// /// Uses `std::sync::Mutex` (not tokio) because `DvcProcessor` trait methods @@ -23,8 +25,9 @@ pub type GfxServerHandle = Arc>; /// Factory for creating EGFX graphics pipeline handlers. /// -/// Follows the same pattern as `CliprdrServerFactory` and `SoundServerFactory`. -pub trait GfxServerFactory: Send { +/// Implements `ServerEventSender` so the factory can signal the server event loop +/// when EGFX frames are ready to be drained and sent. +pub trait GfxServerFactory: ServerEventSender + Send { /// Create a handler for EGFX callbacks (caps negotiation, frame acks). fn build_gfx_handler(&self) -> Box; @@ -91,14 +94,14 @@ impl DvcServerProcessor for GfxDvcBridge {} #[derive(Debug)] pub enum EgfxServerMessage { /// Pre-encoded DVC messages from `GraphicsPipelineServer::drain_output()`. - SendMessages { channel_id: u32, messages: Vec }, + SendMessages { messages: Vec }, } impl core::fmt::Display for EgfxServerMessage { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { match self { - Self::SendMessages { channel_id, messages } => { - write!(f, "SendMessages(channel={channel_id}, count={})", messages.len()) + Self::SendMessages { messages } => { + write!(f, "SendMessages(count={})", messages.len()) } } } diff --git a/crates/ironrdp-server/src/server.rs b/crates/ironrdp-server/src/server.rs index f3e5179e8..88aec97d9 100644 --- a/crates/ironrdp-server/src/server.rs +++ b/crates/ironrdp-server/src/server.rs @@ -221,6 +221,8 @@ pub struct RdpServer { cliprdr_factory: Option>, #[cfg(feature = "egfx")] gfx_factory: Option>, + #[cfg(feature = "egfx")] + gfx_handle: Option, ev_sender: mpsc::UnboundedSender, ev_receiver: Arc>>, creds: Option, @@ -262,7 +264,7 @@ impl RdpServer { display: Box, mut sound_factory: Option>, mut cliprdr_factory: Option>, - #[cfg(feature = "egfx")] gfx_factory: Option>, + #[cfg(feature = "egfx")] mut gfx_factory: Option>, ) -> Self { let (ev_sender, ev_receiver) = ServerEvent::create_channel(); if let Some(cliprdr) = cliprdr_factory.as_mut() { @@ -271,6 +273,10 @@ impl RdpServer { if let Some(snd) = sound_factory.as_mut() { snd.set_sender(ev_sender.clone()); } + #[cfg(feature = "egfx")] + if let Some(gfx) = gfx_factory.as_mut() { + gfx.set_sender(ev_sender.clone()); + } Self { opts, handler: Arc::new(Mutex::new(handler)), @@ -280,6 +286,8 @@ impl RdpServer { cliprdr_factory, #[cfg(feature = "egfx")] gfx_factory, + #[cfg(feature = "egfx")] + gfx_handle: None, ev_sender, ev_receiver: Arc::new(Mutex::new(ev_receiver)), creds: None, @@ -295,6 +303,17 @@ impl RdpServer { &self.ev_sender } + /// Returns the shared EGFX server handle for proactive frame submission. + /// + /// Available after `build_server_with_handle()` returns `Some` during + /// channel setup. Display handlers use this to call + /// `send_avc420_frame()` / `send_avc444_frame()` and then signal the + /// event loop via `ServerEvent::Egfx`. + #[cfg(feature = "egfx")] + pub fn gfx_handle(&self) -> Option<&crate::gfx::GfxServerHandle> { + self.gfx_handle.as_ref() + } + fn attach_channels(&mut self, acceptor: &mut Acceptor) { if let Some(cliprdr_factory) = self.cliprdr_factory.as_deref() { let backend = cliprdr_factory.build_cliprdr_backend(); @@ -321,7 +340,8 @@ impl RdpServer { let dvc = { let mut dvc = dvc; if let Some(gfx_factory) = self.gfx_factory.as_deref() { - if let Some((bridge, _handle)) = gfx_factory.build_server_with_handle() { + if let Some((bridge, handle)) = gfx_factory.build_server_with_handle() { + self.gfx_handle = Some(handle); dvc = dvc.with_dynamic_channel(bridge); } else { let handler = gfx_factory.build_gfx_handler(); @@ -597,7 +617,7 @@ impl RdpServer { } #[cfg(feature = "egfx")] ServerEvent::Egfx(msg) => match msg { - EgfxServerMessage::SendMessages { messages, .. } => { + EgfxServerMessage::SendMessages { messages } => { let drdynvc_channel_id = self .get_channel_id_by_type::() .ok_or_else(|| anyhow!("DRDYNVC channel not found"))?; From a177edaacc196d2877760bdf4d120253d4a75ef4 Mon Sep 17 00:00:00 2001 From: Greg Lamberson Date: Mon, 16 Feb 2026 19:09:47 +0200 Subject: [PATCH 3/3] refactor(server): use .context() for channel lookup errors Replace .ok_or_else(|| anyhow!(...)) with .context() for rdpsnd, clipboard, and EGFX channel lookups. All three use static strings so .context() is cleaner and consistent with the rest of the file. --- crates/ironrdp-server/src/server.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/ironrdp-server/src/server.rs b/crates/ironrdp-server/src/server.rs index 88aec97d9..eb609a8f4 100644 --- a/crates/ironrdp-server/src/server.rs +++ b/crates/ironrdp-server/src/server.rs @@ -2,7 +2,7 @@ use core::net::SocketAddr; use std::rc::Rc; use std::sync::Arc; -use anyhow::{anyhow, bail, Context as _, Result}; +use anyhow::{bail, Context as _, Result}; use ironrdp_acceptor::{Acceptor, AcceptorResult, BeginResult, DesktopSize}; use ironrdp_async::Framed; use ironrdp_cliprdr::backend::ClipboardMessage; @@ -584,7 +584,7 @@ impl RdpServer { .context("failed to send rdpsnd event")?; let channel_id = self .get_channel_id_by_type::() - .ok_or_else(|| anyhow!("SVC channel not found"))?; + .context("SVC channel not found")?; let data = server_encode_svc_messages(msgs.into(), channel_id, user_channel_id)?; writer.write_all(&data).await?; } @@ -611,7 +611,7 @@ impl RdpServer { .context("failed to send clipboard event")?; let channel_id = self .get_channel_id_by_type::() - .ok_or_else(|| anyhow!("SVC channel not found"))?; + .context("SVC channel not found")?; let data = server_encode_svc_messages(msgs.into(), channel_id, user_channel_id)?; writer.write_all(&data).await?; } @@ -620,7 +620,7 @@ impl RdpServer { EgfxServerMessage::SendMessages { messages } => { let drdynvc_channel_id = self .get_channel_id_by_type::() - .ok_or_else(|| anyhow!("DRDYNVC channel not found"))?; + .context("DRDYNVC channel not found")?; let data = server_encode_svc_messages(messages, drdynvc_channel_id, user_channel_id)?; writer.write_all(&data).await?; }