diff --git a/dataplane/src/packet_processor/mod.rs b/dataplane/src/packet_processor/mod.rs index 6fea14132..bb9dc89c7 100644 --- a/dataplane/src/packet_processor/mod.rs +++ b/dataplane/src/packet_processor/mod.rs @@ -15,6 +15,7 @@ use concurrency::sync::Arc; use flow_entry::flow_table::{ExpirationsNF, FlowLookup, FlowTable}; use flow_filter::{FlowFilter, FlowFilterTableWriter}; +use nat::portfw::PortForwarder; use nat::stateful::NatAllocatorWriter; use nat::stateless::NatTablesWriter; use nat::{StatefulNat, StatelessNat}; @@ -87,6 +88,7 @@ pub(crate) fn start_router( let flow_filter = FlowFilter::new("flow-filter", flowfiltertablesr_factory.handle()); let flow_lookup = FlowLookup::new("flow-lookup", flow_table.clone()); let flow_expirations_nf = ExpirationsNF::new(flow_table.clone()); + let port_forwarding = PortForwarder::new("port-forwarder"); // Build the pipeline for a router. The composition of the pipeline (in stages) is currently // hard-coded. In any pipeline, the Stats and ExpirationsNF stages should go last @@ -95,6 +97,7 @@ pub(crate) fn start_router( .add_stage(iprouter1) .add_stage(flow_lookup) .add_stage(flow_filter) + .add_stage(port_forwarding) .add_stage(stateless_nat) .add_stage(stateful_nat) .add_stage(iprouter2) diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index 823713534..968b68be3 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -133,6 +133,10 @@ impl FlowFilter { if let Some(dst_vpcd) = self.check_packet_flow_info(packet) { packet.meta_mut().dst_vpcd = Some(dst_vpcd); + + // FIXME: here we should call set_nat_requirements() instead of + // hard-coding stateful nat, as differentiate also port-forwarding. + packet.meta_mut().set_stateful_nat(true); } else { packet.done(DoneReason::Filtered); @@ -215,6 +219,10 @@ fn set_nat_requirements(packet: &mut Packet, data: &R if data.requires_stateless_nat() { packet.meta_mut().set_stateless_nat(true); } + if data.requires_port_forwarding() { + packet.meta_mut().set_port_forwarding(true); + } + // FIXME: we should forbid/(warn about) combos that we don't support } #[cfg(test)] diff --git a/flow-filter/src/tables.rs b/flow-filter/src/tables.rs index a45f28395..0aaf06654 100644 --- a/flow-filter/src/tables.rs +++ b/flow-filter/src/tables.rs @@ -458,6 +458,7 @@ impl ValueWithAssociatedRanges for SrcConnectionData { pub(crate) enum NatRequirement { Stateless, Stateful, + PortForwarding, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -489,6 +490,11 @@ impl RemoteData { self.src_nat_req == Some(NatRequirement::Stateless) || self.dst_nat_req == Some(NatRequirement::Stateless) } + pub(crate) fn requires_port_forwarding(&self) -> bool { + // This is temporary: do we want to reuse dst_nat_req + // or have a separate field? + self.dst_nat_req == Some(NatRequirement::PortForwarding) + } } #[derive(Debug, Clone)] diff --git a/nat/src/lib.rs b/nat/src/lib.rs index b28d4b0d6..85ccbebf6 100644 --- a/nat/src/lib.rs +++ b/nat/src/lib.rs @@ -23,6 +23,7 @@ mod icmp_error_msg; mod port; +pub mod portfw; pub mod stateful; pub mod stateless; diff --git a/nat/src/portfw/mod.rs b/nat/src/portfw/mod.rs new file mode 100644 index 000000000..a4a1550e2 --- /dev/null +++ b/nat/src/portfw/mod.rs @@ -0,0 +1,48 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +//! Port forwarding stage + +use net::buffer::PacketBufferMut; +use net::packet::Packet; +use pipeline::NetworkFunction; + +#[allow(unused)] +use tracing::{debug, error, warn}; + +use tracectl::trace_target; +trace_target!("port-forwarding", LevelFilter::INFO, &["nat", "pipeline"]); + +/// A port-forwarding network function +pub struct PortForwarder { + name: String, +} + +impl PortForwarder { + /// Creates a new [`PortForwarder`] + #[must_use] + pub fn new(name: &str) -> Self { + Self { + name: name.to_string(), + } + } + /// Do port forwarding for the given packet. + fn process_packet(&self, _packet: &mut Packet) { + debug!("{}: processing packet", self.name); + // TODO + } +} + +impl NetworkFunction for PortForwarder { + fn process<'a, Input: Iterator> + 'a>( + &'a mut self, + input: Input, + ) -> impl Iterator> + 'a { + input.filter_map(|mut packet| { + if !packet.is_done() && packet.meta().requires_port_forwarding() { + self.process_packet(&mut packet); + } + packet.enforce() + }) + } +} diff --git a/nat/src/stateful/mod.rs b/nat/src/stateful/mod.rs index 43c049ca9..7ae6c4417 100644 --- a/nat/src/stateful/mod.rs +++ b/nat/src/stateful/mod.rs @@ -552,8 +552,6 @@ impl StatefulNat { fn translate_packet( &self, packet: &mut Packet, - src_vpc_id: VpcDiscriminant, - dst_vpc_id: VpcDiscriminant, ) -> Result { // Hot path: if we have a session, directly translate the address already if let Some(translate) = Self::lookup_session::(packet) { @@ -592,6 +590,9 @@ impl StatefulNat { let translation_data = Self::get_translation_data(&alloc.src, &alloc.dst); + let src_vpc_id = packet.meta().src_vpcd.unwrap_or_else(|| unreachable!()); + let dst_vpc_id = packet.meta().dst_vpcd.unwrap_or_else(|| unreachable!()); + let reverse_flow_key = Self::new_reverse_session(&flow_key, &alloc, src_vpc_id, dst_vpc_id)?; let (forward_state, reverse_state) = Self::new_states_from_alloc(alloc, idle_timeout); @@ -610,8 +611,6 @@ impl StatefulNat { fn nat_packet( &self, packet: &mut Packet, - src_vpc_id: VpcDiscriminant, - dst_vpc_id: VpcDiscriminant, ) -> Result { let nfi = self.name(); @@ -620,35 +619,36 @@ impl StatefulNat { return Err(StatefulNatError::BadIpHeader); }; match net { - Net::Ipv4(_) => self.translate_packet::(packet, src_vpc_id, dst_vpc_id), - Net::Ipv6(_) => self.translate_packet::(packet, src_vpc_id, dst_vpc_id), + Net::Ipv4(_) => self.translate_packet::(packet), + Net::Ipv6(_) => self.translate_packet::(packet), } } /// Processes one packet. This is the main entry point for processing a packet. This is also the /// function that we pass to [`StatefulNat::process`] to iterate over packets. fn process_packet(&self, packet: &mut Packet) { - let Some(src_vpc_id) = Self::get_src_vpc_id(packet) else { - warn!( - "{}: Packet has no source VPC discriminant!. Will drop...", - self.name() - ); + // In order to NAT a packet for which a session does not exist, we + // need (and expect) the packet to be annotated with both src & dst discriminants. + // A packet without those should have never made it here. + if Self::get_src_vpc_id(packet).is_none() { + let emsg = "Packet has no source VPC discriminant!. This is a bug. Will drop..."; + warn!(emsg); + debug_assert!(false, "{emsg}"); packet.done(DoneReason::Unroutable); return; - }; - let Some(dst_vpc_id) = Self::get_dst_vpc_id(packet) else { - warn!( - "{}: Packet has no destination VPC discriminant!. Will drop...", - self.name() - ); + } + if Self::get_dst_vpc_id(packet).is_none() { + let emsg = "Packet has no destination VPC discriminant!. This is a bug. Will drop..."; + warn!(emsg); + debug_assert!(false, "{emsg}"); packet.done(DoneReason::Unroutable); return; - }; + } // TODO: Check whether the packet is fragmented // TODO: Check whether we need protocol-aware processing - match self.nat_packet(packet, src_vpc_id, dst_vpc_id) { + match self.nat_packet(packet) { Err(error) => { packet.done(translate_error(&error)); error!("{}: Error processing packet: {error}", self.name()); diff --git a/net/src/packet/display.rs b/net/src/packet/display.rs index b2f7072ff..224172457 100644 --- a/net/src/packet/display.rs +++ b/net/src/packet/display.rs @@ -257,6 +257,9 @@ fn fmt_metadata_flags(meta: &PacketMeta, f: &mut Formatter<'_>) -> std::fmt::Res if meta.requires_stateless_nat() { write!(f, " req-stateless-nat")?; } + if meta.requires_port_forwarding() { + write!(f, " req-port-forwarding")?; + } if meta.is_natted() { write!(f, " natted")?; } diff --git a/net/src/packet/meta.rs b/net/src/packet/meta.rs index 05e52ce78..4e14e4895 100644 --- a/net/src/packet/meta.rs +++ b/net/src/packet/meta.rs @@ -106,7 +106,7 @@ pub enum DoneReason { bitflags! { #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] - struct MetaFlags: u16 { + struct MetaFlags: u32 { const INITIALIZED = 0b0000_0001; /* initialized */ const IS_L2_BCAST = 0b0000_0010; /* frame is eth broadcast */ const NATTED = 0b0000_0100; /* set to true if a packet has been NATed */ @@ -115,6 +115,7 @@ bitflags! { const IS_OVERLAY = 0b0010_0000; /* Packet was obtained by decapsulation and belongs to a VPC */ const REQ_STATEFUL_NAT = 0b0100_0000; /* Packet requires stateful NAT (source and/or destination) */ const REQ_STATELESS_NAT = 0b1000_0000; /* Packet requires stateless NAT (source and/or destination) */ + const REQ_PORT_FORWARDING = 0b0001_0000_0000; /* Packet requires port forwarding */ } } @@ -177,6 +178,17 @@ impl PacketMeta { self.flags.remove(MetaFlags::REQ_STATELESS_NAT); } } + #[must_use] + pub fn requires_port_forwarding(&self) -> bool { + self.flags.contains(MetaFlags::REQ_PORT_FORWARDING) + } + pub fn set_port_forwarding(&mut self, value: bool) { + if value { + self.flags.insert(MetaFlags::REQ_PORT_FORWARDING); + } else { + self.flags.remove(MetaFlags::REQ_PORT_FORWARDING); + } + } #[must_use] pub fn is_initialized(&self) -> bool {