Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/build-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ jobs:
if: steps.tauri-cli-cache.outputs.cache-hit != 'true'
shell: bash
run: cargo install --locked --path app/src-tauri/vendor/tauri-cef/crates/tauri-cli
- name: Enable Corepack
run: corepack enable
- name: Install dependencies
run: yarn install --frozen-lockfile
run: pnpm install --frozen-lockfile

# vite build runs via tauri.conf.json's beforeBuildCommand during the
# "Build Tauri app" step below — no separate frontend build needed.
Expand Down
3 changes: 3 additions & 0 deletions app/src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ windows-sys = { version = "0.59", features = [
# parameter is gated behind it in windows-sys 0.59.
"Win32_System_Threading",
"Win32_Security",
"Win32_Storage_FileSystem",
"Win32_System_IO",
"Win32_System_Pipes",
] }

[features]
Expand Down
373 changes: 373 additions & 0 deletions app/src-tauri/src/deep_link_ipc_windows.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,373 @@
//! Pre-CEF deep-link forwarding for Windows.
//!
//! `openhuman://` OAuth callbacks launch a second `OpenHuman.exe` with the
//! URL in argv. The Windows pre-CEF mutex guard exits secondaries before Tauri's
//! single-instance/deep-link plugins can run, so the URL must be forwarded here.

#![cfg(target_os = "windows")]

use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, OnceLock,
},
time::Duration,
};

use windows_sys::Win32::{
Foundation::{
CloseHandle, GetLastError, ERROR_BROKEN_PIPE, ERROR_PIPE_CONNECTED, HANDLE,
INVALID_HANDLE_VALUE,
},
Storage::FileSystem::{
CreateFileW, ReadFile, WriteFile, FILE_GENERIC_WRITE, OPEN_EXISTING, PIPE_ACCESS_INBOUND,
},
System::Pipes::{
ConnectNamedPipe, CreateNamedPipeW, PIPE_READMODE_BYTE, PIPE_TYPE_BYTE,
PIPE_UNLIMITED_INSTANCES, PIPE_WAIT,
},
};

const PIPE_NAME: &str = r"\\.\pipe\com.openhuman.app-deeplink";
const FORWARD_RETRY_ATTEMPTS: usize = 40;
const FORWARD_RETRY_DELAY: Duration = Duration::from_millis(50);

pub(crate) enum ForwardResult {
Forwarded,
NoPrimary,
NoUrls,
}

pub(crate) fn collect_deep_link_urls_from_args<I, S>(args: I) -> Vec<String>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
args.into_iter()
.skip(1)
.filter_map(|arg| {
let arg = arg.as_ref();
arg.starts_with("openhuman://").then(|| arg.to_string())
})
.collect()
}

pub(crate) fn extract_deep_link_urls() -> Vec<String> {
collect_deep_link_urls_from_args(std::env::args())
}

pub(crate) fn try_forward_deep_links() -> ForwardResult {
let urls = extract_deep_link_urls();
if urls.is_empty() {
return ForwardResult::NoUrls;
}

log::info!(
"[deep-link-ipc] secondary: found {} deep-link URL(s), forwarding to primary",
urls.len()
);

for attempt in 1..=FORWARD_RETRY_ATTEMPTS {
match open_pipe_for_write() {
Some(handle) => {
let result = write_urls(handle, &urls);
unsafe {
CloseHandle(handle);
}
if result {
log::info!(
"[deep-link-ipc] secondary: forwarded {} deep-link URL(s)",
urls.len()
);
return ForwardResult::Forwarded;
}
}
None if attempt < FORWARD_RETRY_ATTEMPTS => {
std::thread::sleep(FORWARD_RETRY_DELAY);
}
None => {}
}
}

log::warn!(
"[deep-link-ipc] secondary: primary pipe was unavailable; deep-link URL was not forwarded"
);
ForwardResult::NoPrimary
}

static PENDING_URLS: OnceLock<Arc<Mutex<Vec<String>>>> = OnceLock::new();
static LIVE_HANDLER: OnceLock<Mutex<Option<Box<dyn Fn(String) + Send + Sync>>>> = OnceLock::new();

fn pending_queue() -> &'static Arc<Mutex<Vec<String>>> {
PENDING_URLS.get_or_init(|| Arc::new(Mutex::new(Vec::new())))
}

fn live_handler() -> &'static Mutex<Option<Box<dyn Fn(String) + Send + Sync>>> {
LIVE_HANDLER.get_or_init(|| Mutex::new(None))
}

pub(crate) fn redact_url_for_log(url: &str) -> String {
url.parse::<url::Url>()
.map(|mut parsed| {
parsed.set_query(None);
parsed.set_fragment(None);
parsed.to_string()
})
.unwrap_or_else(|_| "<invalid deep link>".to_string())
}

fn dispatch_url(url: String) {
if let Ok(guard) = live_handler().lock() {
if let Some(ref handler) = *guard {
handler(url);
return;
}
}

if let Ok(mut queue) = pending_queue().lock() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] Two separate lock acquisitions create a narrow TOCTOU window.

If a URL arrives from the pipe thread after the live_handler lock is released (handler seen as None) but before pending_queue is locked here, drain_pending_urls can fire on the setup thread between those two points — it sets the handler and drains the queue — leaving the URL pushed into an already-drained queue with no subsequent drain scheduled.

The window is startup-only (only open until setup() runs) and realistically only reachable if a second OAuth callback fires while the app is still initialising, so the blast radius is negligible in practice. A straightforward fix is to hold a single combined lock through both operations, or to check the handler again after acquiring the queue lock:

if let Ok(mut queue) = pending_queue().lock() {
    // Re-check handler under the queue lock to close the TOCTOU window
    if let Ok(guard) = live_handler().lock() {
        if let Some(ref handler) = *guard {
            handler(url);
            return;
        }
    }
    queue.push(url);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix (applied locally, needs push to PR branch): Addressing the TOCTOU window flagged by @M3gA-Mind.

Acquire pending_queue first, then re-check live_handler under that lock. This closes the window where drain_pending_urls could fire between the two separate lock acquisitions.

fn dispatch_url(url: String) {
    if let Ok(mut queue) = pending_queue().lock() {
        // Re-check handler under the queue lock to close the TOCTOU window:
        // if drain_pending_urls fires between releasing the handler lock and
        // acquiring the queue lock, the URL would be pushed into an already-
        // drained queue with no subsequent drain scheduled.
        if let Ok(guard) = live_handler().lock() {
            if let Some(ref handler) = *guard {
                handler(url);
                return;
            }
        }
        log::debug!(
            \"[deep-link-ipc] queued URL before setup: {}\",
            redact_url_for_log(&url)
        );
        queue.push(url);
    }
}

Please apply this patch to your branch and push.

log::debug!(
"[deep-link-ipc] queued URL before setup: {}",
redact_url_for_log(&url)
);
queue.push(url);
}
}

pub(crate) struct DeepLinkPipeGuard {
stop: Arc<AtomicBool>,
}

impl Drop for DeepLinkPipeGuard {
fn drop(&mut self) {
self.stop.store(true, Ordering::SeqCst);
if let Some(handle) = open_pipe_for_write() {
unsafe {
CloseHandle(handle);
}
}
}
}

pub(crate) fn bind_and_listen() -> Option<DeepLinkPipeGuard> {
let stop = Arc::new(AtomicBool::new(false));
let thread_stop = Arc::clone(&stop);

match std::thread::Builder::new()
.name("deep-link-ipc-windows".into())
.spawn(move || listener_loop(thread_stop))
{
Ok(_) => {
log::info!("[deep-link-ipc] primary: named pipe listener started");
Some(DeepLinkPipeGuard { stop })
}
Err(err) => {
log::warn!("[deep-link-ipc] failed to spawn listener thread: {err}");
None
}
}
}

fn listener_loop(stop: Arc<AtomicBool>) {
while !stop.load(Ordering::SeqCst) {
let pipe = match create_pipe_for_read() {
Some(pipe) => pipe,
None => {
std::thread::sleep(Duration::from_millis(250));
continue;
}
};

let connected = unsafe { ConnectNamedPipe(pipe, std::ptr::null_mut()) != 0 }
|| unsafe { GetLastError() } == ERROR_PIPE_CONNECTED;

if connected {
for url in read_urls(pipe) {
log::info!(
"[deep-link-ipc] primary: received deep-link URL: {}",
redact_url_for_log(&url)
);
dispatch_url(url);
}
}

unsafe {
CloseHandle(pipe);
}
}
}

fn pipe_name_wide() -> Vec<u16> {
PIPE_NAME.encode_utf16().chain(std::iter::once(0)).collect()
}

fn create_pipe_for_read() -> Option<HANDLE> {
let name = pipe_name_wide();
let handle = unsafe {
CreateNamedPipeW(
name.as_ptr(),
PIPE_ACCESS_INBOUND,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES,
4096,
4096,
0,
std::ptr::null(),
)
};

if handle == INVALID_HANDLE_VALUE {
log::warn!(
"[deep-link-ipc] CreateNamedPipeW failed with os error {}",
unsafe { GetLastError() }
);
None
} else {
Some(handle)
}
}

fn open_pipe_for_write() -> Option<HANDLE> {
let name = pipe_name_wide();
let handle = unsafe {
CreateFileW(
name.as_ptr(),
FILE_GENERIC_WRITE,
0,
std::ptr::null(),
OPEN_EXISTING,
0,
std::ptr::null_mut(),
)
};

(handle != INVALID_HANDLE_VALUE).then_some(handle)
}

fn write_urls(handle: HANDLE, urls: &[String]) -> bool {
let payload = urls.join("\n") + "\n";
let bytes = payload.as_bytes();
let mut written = 0u32;
let ok = unsafe {
WriteFile(
handle,
bytes.as_ptr(),
bytes.len() as u32,
&mut written,
std::ptr::null_mut(),
)
} != 0;

ok && written == bytes.len() as u32
}

fn read_urls(handle: HANDLE) -> Vec<String> {
let mut all = Vec::new();
let mut buf = [0u8; 1024];

loop {
let mut read = 0u32;
let ok = unsafe {
ReadFile(
handle,
buf.as_mut_ptr(),
buf.len() as u32,
&mut read,
std::ptr::null_mut(),
)
} != 0;

if !ok {
let err = unsafe { GetLastError() };
if err != ERROR_BROKEN_PIPE {
log::debug!("[deep-link-ipc] ReadFile stopped with os error {err}");
}
break;
}

if read == 0 {
break;
}

all.extend_from_slice(&buf[..read as usize]);
}

String::from_utf8_lossy(&all)
.lines()
.filter(|line| line.starts_with("openhuman://"))
.map(ToOwned::to_owned)
.collect()
}

pub(crate) fn drain_pending_urls<R: tauri::Runtime>(app: &tauri::AppHandle<R>) {
use tauri::Emitter;

let app_clone = app.clone();
if let Ok(mut guard) = live_handler().lock() {
*guard = Some(Box::new(move |url: String| {
if let Ok(parsed) = url.parse::<url::Url>() {
if let Err(err) = app_clone.emit("deep-link://new-url", &vec![parsed]) {
log::warn!("[deep-link-ipc] failed to emit deep-link event: {err}");
}
} else {
log::warn!("[deep-link-ipc] received malformed deep-link URL");
}
}));
}

let pending = pending_queue()
.lock()
.map(|mut queue| std::mem::take(&mut *queue))
.unwrap_or_default();

if !pending.is_empty() {
log::info!(
"[deep-link-ipc] draining {} queued deep-link URL(s)",
pending.len()
);
}

for url in pending {
if let Ok(parsed) = url.parse::<url::Url>() {
if let Err(err) = app.emit("deep-link://new-url", &vec![parsed]) {
log::warn!("[deep-link-ipc] failed to emit queued deep-link URL: {err}");
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn collect_deep_link_urls_filters_args() {
let urls = collect_deep_link_urls_from_args([
"OpenHuman.exe",
"openhuman://auth?token=secret&key=auth",
"--flag",
"https://example.test",
"openhuman://oauth/success?integrationId=abc",
]);

assert_eq!(
urls,
vec![
"openhuman://auth?token=secret&key=auth",
"openhuman://oauth/success?integrationId=abc"
]
);
}

#[test]
fn redact_url_removes_query_and_fragment() {
assert_eq!(
redact_url_for_log("openhuman://auth?token=secret&key=auth#frag"),
"openhuman://auth"
);
}

#[test]
fn pipe_name_is_stable_and_app_scoped() {
assert_eq!(PIPE_NAME, r"\\.\pipe\com.openhuman.app-deeplink");
}
}
Loading
Loading