Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/openshell-sandbox/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ uuid = { version = "1", features = ["v4"] }
[dev-dependencies]
tempfile = "3"
temp-env = "0.3"
tokio-tungstenite = { workspace = true }
futures = { workspace = true }

[lints]
workspace = true
23 changes: 20 additions & 3 deletions crates/openshell-sandbox/src/l7/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,22 @@ use std::collections::HashMap;
use std::future::Future;
use tokio::io::{AsyncRead, AsyncWrite};

/// Outcome of relaying a single HTTP request/response pair.
#[derive(Debug)]
pub enum RelayOutcome {
/// Connection is reusable for further HTTP requests (keep-alive).
Reusable,
/// Connection was consumed (e.g. read-until-EOF or `Connection: close`).
Consumed,
/// Server responded with 101 Switching Protocols.
/// The connection has been upgraded (e.g. to WebSocket) and must be
/// relayed as raw bidirectional TCP from this point forward.
/// Contains any overflow bytes read from upstream past the 101 response
/// headers that belong to the upgraded protocol. The 101 headers
/// themselves have already been forwarded to the client.
Upgraded { overflow: Vec<u8> },
}

/// Body framing for HTTP requests/responses.
#[derive(Debug, Clone, Copy)]
pub enum BodyLength {
Expand Down Expand Up @@ -57,14 +73,15 @@ pub trait L7Provider: Send + Sync {

/// Forward an allowed request to upstream and relay the response back.
///
/// Returns `true` if the upstream connection is reusable (keep-alive),
/// `false` if it was consumed (e.g. read-until-EOF or `Connection: close`).
/// Returns a [`RelayOutcome`] indicating whether the connection is
/// reusable (keep-alive), consumed, or has been upgraded (101 Switching
/// Protocols) and must be relayed as raw bidirectional TCP.
fn relay<C, U>(
&self,
req: &L7Request,
client: &mut C,
upstream: &mut U,
) -> impl Future<Output = Result<bool>> + Send
) -> impl Future<Output = Result<RelayOutcome>> + Send
where
C: AsyncRead + AsyncWrite + Unpin + Send,
U: AsyncRead + AsyncWrite + Unpin + Send;
Expand Down
90 changes: 74 additions & 16 deletions crates/openshell-sandbox/src/l7/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! Parses each request within the tunnel, evaluates it against OPA policy,
//! and either forwards or denies the request.

use crate::l7::provider::L7Provider;
use crate::l7::provider::{L7Provider, RelayOutcome};
use crate::l7::{EnforcementMode, L7EndpointConfig, L7Protocol, L7RequestInfo};
use crate::secrets::{self, SecretResolver};
use miette::{IntoDiagnostic, Result, miette};
Expand Down Expand Up @@ -68,6 +68,40 @@ where
}
}

/// Handle an upgraded connection (101 Switching Protocols).
///
/// Forwards any overflow bytes from the upgrade response to the client, then
/// switches to raw bidirectional TCP copy for the upgraded protocol (WebSocket,
/// HTTP/2, etc.). L7 policy enforcement does not apply after the upgrade —
/// the initial HTTP request was already evaluated.
async fn handle_upgrade<C, U>(
client: &mut C,
upstream: &mut U,
overflow: Vec<u8>,
host: &str,
port: u16,
) -> Result<()>
where
C: AsyncRead + AsyncWrite + Unpin + Send,
U: AsyncRead + AsyncWrite + Unpin + Send,
{
info!(
host = %host,
port = port,
overflow_bytes = overflow.len(),
"101 Switching Protocols — switching to raw bidirectional relay \
(L7 enforcement no longer active)"
);
if !overflow.is_empty() {
client.write_all(&overflow).await.into_diagnostic()?;
client.flush().await.into_diagnostic()?;
}
tokio::io::copy_bidirectional(client, upstream)
.await
.into_diagnostic()?;
Ok(())
}

/// REST relay loop: parse request -> evaluate -> allow/deny -> relay response -> repeat.
async fn relay_rest<C, U>(
config: &L7EndpointConfig,
Expand Down Expand Up @@ -137,10 +171,24 @@ where
// Evaluate L7 policy via Rego (using redacted target)
let (allowed, reason) = evaluate_l7_request(engine, ctx, &request_info)?;

let decision_str = match (allowed, config.enforcement) {
(true, _) => "allow",
(false, EnforcementMode::Audit) => "audit",
(false, EnforcementMode::Enforce) => "deny",
// Check if this is an upgrade request for logging purposes.
let header_end = req
.raw_header
.windows(4)
.position(|w| w == b"\r\n\r\n")
.map_or(req.raw_header.len(), |p| p + 4);
let is_upgrade_request = {
let h = String::from_utf8_lossy(&req.raw_header[..header_end]);
h.lines()
.skip(1)
.any(|l| l.to_ascii_lowercase().starts_with("upgrade:"))
};

let decision_str = match (allowed, config.enforcement, is_upgrade_request) {
(true, _, true) => "allow_upgrade",
(true, _, false) => "allow",
(false, EnforcementMode::Audit, _) => "audit",
(false, EnforcementMode::Enforce, _) => "deny",
};

// Log every L7 decision (using redacted target — never log real secrets)
Expand All @@ -162,20 +210,26 @@ where

if allowed || config.enforcement == EnforcementMode::Audit {
// Forward request to upstream and relay response
let reusable = crate::l7::rest::relay_http_request_with_resolver(
let outcome = crate::l7::rest::relay_http_request_with_resolver(
&req,
client,
upstream,
ctx.secret_resolver.as_deref(),
)
.await?;
if !reusable {
debug!(
host = %ctx.host,
port = ctx.port,
"Upstream connection not reusable, closing L7 relay"
);
return Ok(());
match outcome {
RelayOutcome::Reusable => {} // continue loop
RelayOutcome::Consumed => {
debug!(
host = %ctx.host,
port = ctx.port,
"Upstream connection not reusable, closing L7 relay"
);
return Ok(());
}
RelayOutcome::Upgraded { overflow } => {
return handle_upgrade(client, upstream, overflow, &ctx.host, ctx.port).await;
}
}
} else {
// Enforce mode: deny with 403 and close connection (use redacted target)
Expand Down Expand Up @@ -334,12 +388,16 @@ where
// Forward request with credential rewriting and relay the response.
// relay_http_request_with_resolver handles both directions: it sends
// the request upstream and reads the response back to the client.
let reusable =
let outcome =
crate::l7::rest::relay_http_request_with_resolver(&req, client, upstream, resolver)
.await?;

if !reusable {
break;
match outcome {
RelayOutcome::Reusable => {} // continue loop
RelayOutcome::Consumed => break,
RelayOutcome::Upgraded { overflow } => {
return handle_upgrade(client, upstream, overflow, &ctx.host, ctx.port).await;
}
}
}

Expand Down
Loading
Loading