Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dataplane/src/packet_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use concurrency::sync::Arc;
use flow_entry::flow_table::{ExpirationsNF, FlowLookup, FlowTable};
use flow_filter::{FlowFilter, FlowFilterTableWriter};

use nat::portfw::PortForwarder;
use nat::stateful::NatAllocatorWriter;
use nat::stateless::NatTablesWriter;
use nat::{StatefulNat, StatelessNat};
Expand Down Expand Up @@ -87,6 +88,7 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
let flow_filter = FlowFilter::new("flow-filter", flowfiltertablesr_factory.handle());
let flow_lookup = FlowLookup::new("flow-lookup", flow_table.clone());
let flow_expirations_nf = ExpirationsNF::new(flow_table.clone());
let port_forwarding = PortForwarder::new("port-forwarder");

// Build the pipeline for a router. The composition of the pipeline (in stages) is currently
// hard-coded. In any pipeline, the Stats and ExpirationsNF stages should go last
Expand All @@ -95,6 +97,7 @@ pub(crate) fn start_router<Buf: PacketBufferMut>(
.add_stage(iprouter1)
.add_stage(flow_lookup)
.add_stage(flow_filter)
.add_stage(port_forwarding)
.add_stage(stateless_nat)
.add_stage(stateful_nat)
.add_stage(iprouter2)
Expand Down
8 changes: 8 additions & 0 deletions flow-filter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl FlowFilter {

if let Some(dst_vpcd) = self.check_packet_flow_info(packet) {
packet.meta_mut().dst_vpcd = Some(dst_vpcd);

// FIXME: here we should call set_nat_requirements() instead of
// hard-coding stateful nat, as differentiate also port-forwarding.

packet.meta_mut().set_stateful_nat(true);
} else {
packet.done(DoneReason::Filtered);
Expand Down Expand Up @@ -215,6 +219,10 @@ fn set_nat_requirements<Buf: PacketBufferMut>(packet: &mut Packet<Buf>, data: &R
if data.requires_stateless_nat() {
packet.meta_mut().set_stateless_nat(true);
}
if data.requires_port_forwarding() {
packet.meta_mut().set_port_forwarding(true);
}
// FIXME: we should forbid/(warn about) combos that we don't support
}

#[cfg(test)]
Expand Down
6 changes: 6 additions & 0 deletions flow-filter/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ impl ValueWithAssociatedRanges for SrcConnectionData {
pub(crate) enum NatRequirement {
Stateless,
Stateful,
PortForwarding,
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -489,6 +490,11 @@ impl RemoteData {
self.src_nat_req == Some(NatRequirement::Stateless)
|| self.dst_nat_req == Some(NatRequirement::Stateless)
}
pub(crate) fn requires_port_forwarding(&self) -> bool {
// This is temporary: do we want to reuse dst_nat_req
// or have a separate field?
self.dst_nat_req == Some(NatRequirement::PortForwarding)
}
}

#[derive(Debug, Clone)]
Expand Down
1 change: 1 addition & 0 deletions nat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

mod icmp_error_msg;
mod port;
pub mod portfw;
pub mod stateful;
pub mod stateless;

Expand Down
48 changes: 48 additions & 0 deletions nat/src/portfw/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Open Network Fabric Authors

//! Port forwarding stage

use net::buffer::PacketBufferMut;
use net::packet::Packet;
use pipeline::NetworkFunction;

#[allow(unused)]
use tracing::{debug, error, warn};

use tracectl::trace_target;
trace_target!("port-forwarding", LevelFilter::INFO, &["nat", "pipeline"]);

/// A port-forwarding network function
pub struct PortForwarder {
name: String,
}

impl PortForwarder {
/// Creates a new [`PortForwarder`]
#[must_use]
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
}
}
/// Do port forwarding for the given packet.
fn process_packet<Buf: PacketBufferMut>(&self, _packet: &mut Packet<Buf>) {
debug!("{}: processing packet", self.name);
// TODO
}
}

impl<Buf: PacketBufferMut> NetworkFunction<Buf> for PortForwarder {
fn process<'a, Input: Iterator<Item = Packet<Buf>> + 'a>(
&'a mut self,
input: Input,
) -> impl Iterator<Item = Packet<Buf>> + 'a {
input.filter_map(|mut packet| {
if !packet.is_done() && packet.meta().requires_port_forwarding() {
self.process_packet(&mut packet);
}
packet.enforce()
})
}
}
38 changes: 19 additions & 19 deletions nat/src/stateful/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,8 +552,6 @@ impl StatefulNat {
fn translate_packet<Buf: PacketBufferMut, I: NatIpWithBitmap>(
&self,
packet: &mut Packet<Buf>,
src_vpc_id: VpcDiscriminant,
dst_vpc_id: VpcDiscriminant,
) -> Result<bool, StatefulNatError> {
// Hot path: if we have a session, directly translate the address already
if let Some(translate) = Self::lookup_session::<I, Buf>(packet) {
Expand Down Expand Up @@ -592,6 +590,9 @@ impl StatefulNat {

let translation_data = Self::get_translation_data(&alloc.src, &alloc.dst);

let src_vpc_id = packet.meta().src_vpcd.unwrap_or_else(|| unreachable!());
let dst_vpc_id = packet.meta().dst_vpcd.unwrap_or_else(|| unreachable!());

let reverse_flow_key =
Self::new_reverse_session(&flow_key, &alloc, src_vpc_id, dst_vpc_id)?;
let (forward_state, reverse_state) = Self::new_states_from_alloc(alloc, idle_timeout);
Expand All @@ -610,8 +611,6 @@ impl StatefulNat {
fn nat_packet<Buf: PacketBufferMut>(
&self,
packet: &mut Packet<Buf>,
src_vpc_id: VpcDiscriminant,
dst_vpc_id: VpcDiscriminant,
) -> Result<bool, StatefulNatError> {
let nfi = self.name();

Expand All @@ -620,35 +619,36 @@ impl StatefulNat {
return Err(StatefulNatError::BadIpHeader);
};
match net {
Net::Ipv4(_) => self.translate_packet::<Buf, Ipv4Addr>(packet, src_vpc_id, dst_vpc_id),
Net::Ipv6(_) => self.translate_packet::<Buf, Ipv6Addr>(packet, src_vpc_id, dst_vpc_id),
Net::Ipv4(_) => self.translate_packet::<Buf, Ipv4Addr>(packet),
Net::Ipv6(_) => self.translate_packet::<Buf, Ipv6Addr>(packet),
}
}

/// Processes one packet. This is the main entry point for processing a packet. This is also the
/// function that we pass to [`StatefulNat::process`] to iterate over packets.
fn process_packet<Buf: PacketBufferMut>(&self, packet: &mut Packet<Buf>) {
let Some(src_vpc_id) = Self::get_src_vpc_id(packet) else {
warn!(
"{}: Packet has no source VPC discriminant!. Will drop...",
self.name()
);
// In order to NAT a packet for which a session does not exist, we
// need (and expect) the packet to be annotated with both src & dst discriminants.
// A packet without those should have never made it here.
if Self::get_src_vpc_id(packet).is_none() {
let emsg = "Packet has no source VPC discriminant!. This is a bug. Will drop...";
warn!(emsg);
debug_assert!(false, "{emsg}");
packet.done(DoneReason::Unroutable);
return;
};
let Some(dst_vpc_id) = Self::get_dst_vpc_id(packet) else {
warn!(
"{}: Packet has no destination VPC discriminant!. Will drop...",
self.name()
);
}
if Self::get_dst_vpc_id(packet).is_none() {
let emsg = "Packet has no destination VPC discriminant!. This is a bug. Will drop...";
warn!(emsg);
debug_assert!(false, "{emsg}");
packet.done(DoneReason::Unroutable);
return;
};
}

// TODO: Check whether the packet is fragmented
// TODO: Check whether we need protocol-aware processing

match self.nat_packet(packet, src_vpc_id, dst_vpc_id) {
match self.nat_packet(packet) {
Err(error) => {
packet.done(translate_error(&error));
error!("{}: Error processing packet: {error}", self.name());
Expand Down
3 changes: 3 additions & 0 deletions net/src/packet/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ fn fmt_metadata_flags(meta: &PacketMeta, f: &mut Formatter<'_>) -> std::fmt::Res
if meta.requires_stateless_nat() {
write!(f, " req-stateless-nat")?;
}
if meta.requires_port_forwarding() {
write!(f, " req-port-forwarding")?;
}
if meta.is_natted() {
write!(f, " natted")?;
}
Expand Down
14 changes: 13 additions & 1 deletion net/src/packet/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub enum DoneReason {

bitflags! {
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
struct MetaFlags: u16 {
struct MetaFlags: u32 {
const INITIALIZED = 0b0000_0001; /* initialized */
const IS_L2_BCAST = 0b0000_0010; /* frame is eth broadcast */
const NATTED = 0b0000_0100; /* set to true if a packet has been NATed */
Expand All @@ -115,6 +115,7 @@ bitflags! {
const IS_OVERLAY = 0b0010_0000; /* Packet was obtained by decapsulation and belongs to a VPC */
const REQ_STATEFUL_NAT = 0b0100_0000; /* Packet requires stateful NAT (source and/or destination) */
const REQ_STATELESS_NAT = 0b1000_0000; /* Packet requires stateless NAT (source and/or destination) */
const REQ_PORT_FORWARDING = 0b0001_0000_0000; /* Packet requires port forwarding */
}
}

Expand Down Expand Up @@ -177,6 +178,17 @@ impl PacketMeta {
self.flags.remove(MetaFlags::REQ_STATELESS_NAT);
}
}
#[must_use]
pub fn requires_port_forwarding(&self) -> bool {
self.flags.contains(MetaFlags::REQ_PORT_FORWARDING)
}
pub fn set_port_forwarding(&mut self, value: bool) {
if value {
self.flags.insert(MetaFlags::REQ_PORT_FORWARDING);
} else {
self.flags.remove(MetaFlags::REQ_PORT_FORWARDING);
}
}

#[must_use]
pub fn is_initialized(&self) -> bool {
Expand Down
Loading