Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## v0.7.1

- Fix WebSocket / protocol-upgrade connections hanging
- Upgrade requests (`wss://`, and proprietary HTTP-upgrade transports like MMTLS long-link) could spin forever with `upgrade expected but not completed`. The proxy fabricated a `101 Switching Protocols` to the client regardless of what the upstream actually returned, so a failed upgrade left the client waiting on a tunnel that was never bridged. It now forwards the upstream's real response when it isn't a genuine `101`, and the dedicated upgrade HTTP client gained the same connection hardening (`connect_timeout`, `tcp_keepalive`, no idle pooling) as the main client.
- Excluded hosts performing a plain-HTTP protocol upgrade are now blind-tunneled at the TCP level instead of being run through the (HTTP-only) upgrade bridge, so MITM-excluded apps using non-HTTP upgrade protocols work. Previously the exclusion list was only consulted on the `CONNECT` path, so excluding such a host had no effect on its plain-HTTP upgrade traffic.
- When an opaque (non-WebSocket) upgrade is seen for a host that is *not* excluded, a warning is logged naming the host and suggesting it be added to the exclusions, instead of failing cryptically.
- Validate filter lists when added
- Adding a filter now rejects URLs that do not serve a `text/plain` filter list (e.g. an HTML error/landing page returned with a `200`) with a `422`, instead of silently saving a broken filter. The error is surfaced in the web UI, and filters whose URL stops serving a list are dropped from the engine with a warning on the next refresh.
- Fix proxied requests randomly hanging/timing out
Expand Down
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
&& cargo build --release; rm src/main.rs

COPY . .
ARG COMPILE_MODE="release"
RUN --mount=type=cache,target=/usr/local/cargo/registry \
--mount=type=cache,target=/app/target \
--mount=type=cache,target=/root/.npm \
cd web_frontend && trunk build --release \
&& cd .. && cargo build --release \
&& cp target/release/privaxy /privaxy-out \
cd web_frontend && trunk build --${COMPILE_MODE} \
&& cd .. && cargo build --${COMPILE_MODE} \
&& cp target/${COMPILE_MODE}/privaxy /privaxy-out \
&& chmod +x /privaxy-out

# Prebuilt path: expect $PREBUILT_BINARY to exist in the build context.
Expand Down
2 changes: 1 addition & 1 deletion filterlists-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub async fn get_filters() -> Result<Vec<Filter>, FilterListError> {
pub async fn get_filter_information(filter: FilterArgs) -> Result<FilterDetails, FilterListError> {
let id = match filter {
FilterArgs::U32(id) => id,
FilterArgs::Filter(filter) => filter.id.clone(),
FilterArgs::Filter(filter) => filter.id,
};
_get::<FilterDetails>(&format!("{FILTERLISTS_API_URL}/lists/{id}")).await
}
Expand Down
23 changes: 14 additions & 9 deletions privaxy/src/resources/proxy.pac.tera
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,27 @@ function FindProxyForURL(url, host) {
if (myIp === "{{ ip }}") return "DIRECT";
{%- endfor %}
{%- endif %}
{%- if cidrs | length > 0 or fqdns | length > 0 %}
var hostIsIp = /^\d+\.\d+\.\d+\.\d+$/.test(host) || host.indexOf(":") !== -1;
{%- endif %}
{%- if cidrs | length > 0 %}
if (/^\d+\.\d+\.\d+\.\d+$/.test(host)) {
if (hostIsIp) {
{%- for ip, netmask in cidrs %}
if (isInNet(host, "{{ ip }}", "{{ netmask }}")) return "DIRECT";
{%- endfor %}
}
{%- endif %}
{%- if fqdns | length > 0 %}
var directDomains = [
{%- for fqdn in fqdns %}
"{{ fqdn }}",
{%- endfor %}
];
for (var i = 0; i < directDomains.length; i++) {
var d = directDomains[i];
if (host === d || dnsDomainIs(host, "." + d)) return "DIRECT";
if (!hostIsIp) {
var directDomains = [
{%- for fqdn in fqdns %}
"{{ fqdn }}",
{%- endfor %}
];
for (var i = 0; i < directDomains.length; i++) {
var d = directDomains[i];
if (host === d || dnsDomainIs(host, "." + d)) return "DIRECT";
}
}
{%- endif %}
return "PROXY {{ proxy_host }}";
Expand Down
18 changes: 16 additions & 2 deletions privaxy/src/server/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,20 @@ async fn privaxy_backend(
configuration_save_lock: Arc<tokio::sync::Mutex<()>>,
notify_reload: Arc<tokio::sync::Notify>,
) {
// Mirror the reqwest client's connection hardening (see above): without a
// connect timeout and OS-level keepalive, an upgrade can hang on a pooled
// keep-alive connection the remote has silently dropped, surfacing as
// "upgrade expected but not completed".
let mut http_connector = hyper::client::HttpConnector::new();
http_connector.enforce_http(false);
http_connector.set_connect_timeout(Some(Duration::from_secs(10)));
http_connector.set_keepalive(Some(Duration::from_secs(60)));

let https_connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http1()
.build();
.wrap_connector(http_connector);
let config = read_configuration(&configuration_save_lock).await;
let network_config = &config.network;
let doh_config = network_config.doh.clone();
Expand All @@ -380,7 +389,12 @@ async fn privaxy_backend(
// handle compression.
// Hyper's client don't follow redirects, which is what we want, nothing to
// disable here.
let hyper_client = Client::builder().build(https_connector);
// An upgraded connection is consumed by the tunnel anyway, so idle pooling
// buys nothing and only risks reusing a stale connection under a long-lived
// WebSocket — disable it.
let hyper_client = Client::builder()
.pool_max_idle_per_host(0)
.build(https_connector);

let make_service = make_service_fn(move |conn: &AddrStream| {
let client_ip_address = conn.remote_addr().ip();
Expand Down
172 changes: 168 additions & 4 deletions privaxy/src/server/proxy/mitm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ use hyper::{
};
use hyper_rustls::HttpsConnector;
use std::{net::IpAddr, sync::Arc};
use tokio::{net::TcpStream, sync::broadcast};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
net::TcpStream,
sync::broadcast,
};
use tokio_rustls::TlsAcceptor;

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -108,9 +112,35 @@ pub(crate) async fn serve_mitm_session(
});

Ok(Response::new(Body::empty()))
} else if local_exclusion_store.contains(authority.host())
&& req.headers().contains_key(http::header::UPGRADE)
{
// An excluded host performing a protocol upgrade over plain HTTP — e.g.
// WeChat's MMTLS long-link (`http://dns.weixin.qq.com/mmtls/...`), which
// speaks a proprietary, non-HTTP protocol once upgraded. The hyper-based
// bridge in `serve` can't carry that (the upstream never returns a clean
// `101`, so the upgrade "expected but not completed"). Blind-tunnel the
// bytes at the TCP level instead, the same way excluded CONNECT hosts
// are tunneled.
tunnel_http_upgrade(req, authority).await
} else {
// The request is not of method `CONNECT`. Therefore,
// this request is for an HTTP resource.
//
// An opaque (non-WebSocket) protocol upgrade to a host that is *not*
// excluded will be routed through `serve`, whose hyper bridge cannot
// carry a non-HTTP protocol — it will hang or fail. We can't safely
// tunnel it (the user hasn't opted the host out of filtering), so warn
// and let it proceed, pointing the user at the exclusion list.
if is_opaque_upgrade(req.headers()) {
log::warn!(
"Proxying opaque protocol-upgrade traffic (MMTLS?) for {}; \
this is unlikely to work through the MITM proxy. Consider adding the host \
to your exclusions.",
authority
);
}

serve(
adblock_requester,
req,
Expand All @@ -128,12 +158,146 @@ pub(crate) async fn serve_mitm_session(
}
}

async fn tunnel(mut upgraded: &mut Upgraded, authority: &Authority) -> std::io::Result<()> {
let mut server = TcpStream::connect(authority.to_string()).await?;
/// An HTTP `Upgrade` request whose target protocol is something other than
/// WebSocket (or h2c) — e.g. WeChat's MMTLS long-link. The proxy can't do
/// anything useful with such a protocol, and its hyper-based upgrade bridge
/// can't carry it; these are only handled correctly by blind-tunneling, which
/// requires the host to be excluded.
fn is_opaque_upgrade(headers: &http::HeaderMap) -> bool {
headers
.get(http::header::UPGRADE)
.and_then(|value| value.to_str().ok())
.map(|value| {
// The Upgrade header may list multiple comma-separated tokens, each
// optionally `name/version`. Treat it as opaque only if no token is
// a protocol we can actually bridge.
value.split(',').all(|token| {
let name = token.trim().split('/').next().unwrap_or("").trim();
!name.eq_ignore_ascii_case("websocket") && !name.eq_ignore_ascii_case("h2c")
})
})
.unwrap_or(false)
}

/// Blind-tunnel a plain-HTTP protocol upgrade to an excluded host. The proxied
/// request carries an absolute-form URI; we replay it to the upstream in
/// origin-form over a raw socket, return our own `101` to the client, and pipe
/// the (opaque) post-upgrade bytes both ways. The upstream's own `101` header
/// block is discarded so the client sees exactly one status line.
///
/// thank you, wechat, for making this necessary
async fn tunnel_http_upgrade(
req: Request<Body>,
authority: Authority,
) -> Result<Response<Body>, hyper::Error> {
// Build the origin-form request head before `req` is moved into the task.
let path = req
.uri()
.path_and_query()
.map(|pq| pq.as_str())
.unwrap_or("/");
let mut head = format!("{} {} HTTP/1.1\r\n", req.method(), path);
for (name, value) in req.headers() {
head.push_str(name.as_str());
head.push_str(": ");
// Header values are effectively always ASCII here; lossy conversion just
// avoids failing the replay on a pathological non-UTF8 value.
head.push_str(&String::from_utf8_lossy(value.as_bytes()));
head.push_str("\r\n");
}
head.push_str("\r\n");

let upgrade_value = req.headers().get(http::header::UPGRADE).cloned();

tokio::io::copy_bidirectional(&mut upgraded, &mut server).await?;
// The bridge runs detached: `hyper::upgrade::on` only resolves once we have
// returned the `101` below, so awaiting it here would deadlock.
tokio::spawn(async move {
match bridge_http_upgrade(req, head, &authority).await {
Ok(()) => log::debug!("HTTP-upgrade tunnel closed for {}", authority),
Err(e) => log::warn!("HTTP-upgrade tunnel for {} failed: {}", authority, e),
}
});

let mut response = Response::new(Body::empty());
*response.status_mut() = http::StatusCode::SWITCHING_PROTOCOLS;
response.headers_mut().insert(
http::header::CONNECTION,
http::HeaderValue::from_static("upgrade"),
);
if let Some(upgrade) = upgrade_value {
response
.headers_mut()
.insert(http::header::UPGRADE, upgrade);
}
Ok(response)
}

/// Upstream half of `tunnel_http_upgrade`: wait for the client upgrade, connect
/// to the origin, replay the request head, strip the origin's `101`, then pipe.
async fn bridge_http_upgrade(
req: Request<Body>,
head: String,
authority: &Authority,
) -> std::io::Result<()> {
let host = authority.host();
// Proxied `http://` authorities carry no port; default to 80.
let port = authority.port_u16().unwrap_or(80);

let mut client = hyper::upgrade::on(req)
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
let mut upstream = TcpStream::connect((host, port)).await?;
upstream.write_all(head.as_bytes()).await?;

let leftover = read_past_response_headers(&mut upstream).await?;
if !leftover.is_empty() {
client.write_all(&leftover).await?;
}

pipe(&mut client, &mut upstream).await
}

/// Read from `stream` until the end of the HTTP response header block
/// (`\r\n\r\n`) and return any bytes that followed it (the start of the tunneled
/// payload). If the upstream closes or never sends a recognizable header block,
/// whatever was read is returned so it can still be forwarded.
async fn read_past_response_headers(stream: &mut TcpStream) -> std::io::Result<Vec<u8>> {
const HEADER_CAP: usize = 64 * 1024;
let mut buf = Vec::new();
let mut chunk = [0u8; 1024];

loop {
let n = stream.read(&mut chunk).await?;
if n == 0 {
return Ok(buf);
}
buf.extend_from_slice(&chunk[..n]);

if let Some(pos) = buf.windows(4).position(|window| window == b"\r\n\r\n") {
return Ok(buf.split_off(pos + 4));
}
if buf.len() > HEADER_CAP {
// No header terminator in a sane amount of data; treat everything as
// payload rather than stalling.
return Ok(buf);
}
}
}

async fn tunnel(upgraded: &mut Upgraded, authority: &Authority) -> std::io::Result<()> {
let mut server = TcpStream::connect(authority.to_string()).await?;

log::debug!("Started tunneling host: {}", authority);

pipe(upgraded, &mut server).await
}

/// Pipe two duplex streams in both directions until either side closes.
async fn pipe<A, B>(a: &mut A, b: &mut B) -> std::io::Result<()>
where
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
tokio::io::copy_bidirectional(a, b).await?;
Ok(())
}
39 changes: 36 additions & 3 deletions privaxy/src/server/proxy/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,27 +554,56 @@ async fn perform_two_ends_upgrade(
) -> Response<Body> {
let (mut duplex_client, mut duplex_server) = tokio::io::duplex(32);

// Captured for log context; `uri` is moved into `new_request` below.
let request_uri = uri.to_string();

let mut new_request = Request::new(Body::empty());
*new_request.headers_mut() = request.headers().clone();
*new_request.uri_mut() = uri;

let client_uri = request_uri.clone();
tokio::spawn(async move {
match hyper::upgrade::on(request).await {
Ok(mut upgraded_client) => {
let _result =
tokio::io::copy_bidirectional(&mut upgraded_client, &mut duplex_client).await;
}
Err(e) => {
log::debug!("Unable to upgrade: {}", e)
log::warn!(
"Unable to upgrade client connection for {}: {}",
client_uri,
e
)
}
}
});

let response = match hyper_client.request(new_request).await {
Ok(response) => response,
Err(_err) => return get_empty_response(http::StatusCode::BAD_REQUEST),
Err(err) => {
log::warn!(
"Upstream upgrade request failed for {}: {}",
request_uri,
err
);
return get_empty_response(http::StatusCode::BAD_GATEWAY);
}
};

// Only bridge a genuine protocol switch. If the upstream did not return
// `101 Switching Protocols`, forwarding a fabricated 101 leaves the client
// believing the upgrade succeeded while no bytes are ever bridged from the
// server half — the connection then hangs forever. Forward the upstream's
// actual response instead so the client can fail (or follow it) cleanly.
if response.status() != StatusCode::SWITCHING_PROTOCOLS {
log::warn!(
"Upstream did not upgrade {} (status {}); forwarding response as-is",
request_uri,
response.status()
);
return response;
}

let mut new_response = get_empty_response(StatusCode::SWITCHING_PROTOCOLS);
*new_response.headers_mut() = response.headers().clone();

Expand All @@ -586,7 +615,11 @@ async fn perform_two_ends_upgrade(
});
}
Err(e) => {
log::debug!("Unable to upgrade: {}", e)
log::warn!(
"Unable to upgrade upstream connection for {}: {}",
request_uri,
e
)
}
}

Expand Down
Loading