Skip to content

Commit eda31bb

Browse files
sanityclaude
andauthored
fix(put): use hop-by-hop routing instead of embedded origin field (#2243)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 1277374 commit eda31bb

File tree

19 files changed

+1558
-3497
lines changed

19 files changed

+1558
-3497
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,8 @@ jobs:
126126
- name: Test
127127
# Limit test threads to reduce resource contention on high-core-count runners
128128
# Without this, 64+ parallel tests cause timing-sensitive network tests to fail
129-
run: cargo test --workspace --no-default-features --features trace,websocket,redb -- --test-threads=8
129+
# --nocapture shows logs in real-time instead of buffering until test completion
130+
run: cargo test --workspace --no-default-features --features trace,websocket,redb -- --test-threads=8 --nocapture
130131

131132
six_peer_regression:
132133
name: six-peer-regression

crates/core/src/contract/executor.rs

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -462,35 +462,6 @@ impl ComposeNetworkMessage<operations::subscribe::SubscribeOp> for SubscribeCont
462462
}
463463
}
464464

465-
#[allow(unused)]
466-
struct PutContract {
467-
contract: ContractContainer,
468-
state: WrappedState,
469-
related_contracts: RelatedContracts<'static>,
470-
}
471-
472-
impl ComposeNetworkMessage<operations::put::PutOp> for PutContract {
473-
fn initiate_op(self, op_manager: &OpManager) -> operations::put::PutOp {
474-
let PutContract {
475-
contract,
476-
state,
477-
related_contracts,
478-
} = self;
479-
operations::put::start_op(
480-
contract,
481-
related_contracts,
482-
state,
483-
op_manager.ring.max_hops_to_live,
484-
false,
485-
)
486-
}
487-
488-
async fn resume_op(op: operations::put::PutOp, op_manager: &OpManager) -> Result<(), OpError> {
489-
operations::put::request_put(op_manager, op).await
490-
}
491-
}
492-
493-
#[allow(unused)]
494465
struct UpdateContract {
495466
key: ContractKey,
496467
new_state: WrappedState,

crates/core/src/message.rs

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//! See `architecture.md`.
44
55
use std::{
6-
borrow::{Borrow, Cow},
6+
borrow::Cow,
77
fmt::Display,
88
net::SocketAddr,
99
time::{Duration, SystemTime},
@@ -98,7 +98,8 @@ impl Transaction {
9898
self.id.0.to_le_bytes()
9999
}
100100

101-
fn elapsed(&self) -> Duration {
101+
/// Returns the elapsed time since this transaction was created.
102+
pub fn elapsed(&self) -> Duration {
102103
let current_unix_epoch_ts = SystemTime::now()
103104
.duration_since(SystemTime::UNIX_EPOCH)
104105
.expect("now should be always be later than unix epoch")
@@ -264,8 +265,6 @@ mod sealed_msg_type {
264265
pub(crate) trait MessageStats {
265266
fn id(&self) -> &Transaction;
266267

267-
fn target(&self) -> Option<PeerKeyLocation>;
268-
269268
fn requested_location(&self) -> Option<Location>;
270269
}
271270

@@ -397,8 +396,6 @@ impl From<NetMessage> for semver::Version {
397396
pub trait InnerMessage: Into<NetMessage> {
398397
fn id(&self) -> &Transaction;
399398

400-
fn target(&self) -> Option<impl Borrow<PeerKeyLocation>>;
401-
402399
fn requested_location(&self) -> Option<Location>;
403400
}
404401

@@ -551,12 +548,6 @@ impl MessageStats for NetMessage {
551548
}
552549
}
553550

554-
fn target(&self) -> Option<PeerKeyLocation> {
555-
match self {
556-
NetMessage::V1(msg) => msg.target(),
557-
}
558-
}
559-
560551
fn requested_location(&self) -> Option<Location> {
561552
match self {
562553
NetMessage::V1(msg) => msg.requested_location(),
@@ -578,19 +569,6 @@ impl MessageStats for NetMessageV1 {
578569
}
579570
}
580571

581-
fn target(&self) -> Option<PeerKeyLocation> {
582-
match self {
583-
NetMessageV1::Connect(op) => op.target().cloned(),
584-
NetMessageV1::Put(op) => op.target().as_ref().map(|b| b.borrow().clone()),
585-
NetMessageV1::Get(op) => op.target().as_ref().map(|b| b.borrow().clone()),
586-
NetMessageV1::Subscribe(op) => op.target().as_ref().map(|b| b.borrow().clone()),
587-
NetMessageV1::Update(op) => op.target().as_ref().map(|b| b.borrow().clone()),
588-
NetMessageV1::Aborted(_) => None,
589-
NetMessageV1::Unsubscribed { .. } => None,
590-
NetMessageV1::ProximityCache { .. } => None,
591-
}
592-
}
593-
594572
fn requested_location(&self) -> Option<Location> {
595573
match self {
596574
NetMessageV1::Connect(op) => op.requested_location(),

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 162 additions & 207 deletions
Large diffs are not rendered by default.

crates/core/src/node/op_state_manager.rs

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,49 @@ impl OpManager {
477477
Ok(())
478478
}
479479

480+
/// Peek at the next hop address for an operation without removing it.
481+
/// Used by hop-by-hop routing to determine where to send initial outbound messages.
482+
/// Returns None if the operation doesn't exist or doesn't have a next hop address.
483+
pub fn peek_next_hop_addr(&self, id: &Transaction) -> Option<std::net::SocketAddr> {
484+
if self.ops.completed.contains(id) || self.ops.under_progress.contains(id) {
485+
return None;
486+
}
487+
match id.transaction_type() {
488+
TransactionType::Connect => self
489+
.ops
490+
.connect
491+
.get(id)
492+
.and_then(|op| op.get_next_hop_addr()),
493+
TransactionType::Put => self.ops.put.get(id).and_then(|op| op.get_next_hop_addr()),
494+
TransactionType::Get => self.ops.get.get(id).and_then(|op| op.get_next_hop_addr()),
495+
TransactionType::Subscribe => self
496+
.ops
497+
.subscribe
498+
.get(id)
499+
.and_then(|op| op.get_next_hop_addr()),
500+
TransactionType::Update => self
501+
.ops
502+
.update
503+
.get(id)
504+
.and_then(|op| op.get_next_hop_addr()),
505+
}
506+
}
507+
508+
/// Peek at the full target peer (including public key) without removing the operation.
509+
/// Used when establishing new connections where we need the public key for handshake.
510+
pub fn peek_target_peer(&self, id: &Transaction) -> Option<PeerKeyLocation> {
511+
if self.ops.completed.contains(id) || self.ops.under_progress.contains(id) {
512+
return None;
513+
}
514+
match id.transaction_type() {
515+
TransactionType::Connect => {
516+
self.ops.connect.get(id).and_then(|op| op.get_target_peer())
517+
}
518+
// Other operations only store addresses, not full peer info
519+
_ => None,
520+
}
521+
}
522+
480523
pub fn pop(&self, id: &Transaction) -> Result<Option<OpEnum>, OpNotAvailable> {
481524
if self.ops.completed.contains(id) {
482525
return Err(OpNotAvailable::Completed);
@@ -653,9 +696,11 @@ impl OpManager {
653696
/// Notify the operation manager that a transaction is being transacted over the network.
654697
pub fn sending_transaction(&self, peer: &PeerKeyLocation, msg: &NetMessage) {
655698
let transaction = msg.id();
656-
if let (Some(recipient), Some(target)) = (msg.target(), msg.requested_location()) {
699+
// With hop-by-hop routing, record the request using the peer we're sending to
700+
// and the message's requested location (contract location)
701+
if let Some(target_loc) = msg.requested_location() {
657702
self.ring
658-
.record_request(recipient.clone(), target, transaction.transaction_type());
703+
.record_request(peer.clone(), target_loc, transaction.transaction_type());
659704
}
660705
if let Some(peer_addr) = peer.socket_addr() {
661706
self.ring
@@ -767,7 +812,13 @@ async fn garbage_cleanup_task<ER: NetEventRegister>(
767812
} else {
768813
ops.under_progress.remove(&tx);
769814
ops.completed.remove(&tx);
770-
tracing::debug!("Transaction timed out: {tx}");
815+
tracing::info!(
816+
tx = %tx,
817+
tx_type = ?tx.transaction_type(),
818+
elapsed_ms = tx.elapsed().as_millis(),
819+
ttl_ms = crate::config::OPERATION_TTL.as_millis(),
820+
"Transaction timed out"
821+
);
771822

772823
// Check if this is a child operation and propagate timeout to parent
773824
if let Some(parent_tx) = sub_op_tracker.get_parent(tx) {
@@ -812,7 +863,13 @@ async fn garbage_cleanup_task<ER: NetEventRegister>(
812863
TransactionType::Update => ops.update.remove(&tx).is_some(),
813864
};
814865
if removed {
815-
tracing::debug!("Transaction timed out: {tx}");
866+
tracing::info!(
867+
tx = %tx,
868+
tx_type = ?tx.transaction_type(),
869+
elapsed_ms = tx.elapsed().as_millis(),
870+
ttl_ms = crate::config::OPERATION_TTL.as_millis(),
871+
"Transaction timed out"
872+
);
816873

817874
// Check if this is a child operation and propagate timeout to parent
818875
if let Some(parent_tx) = sub_op_tracker.get_parent(tx) {

0 commit comments

Comments
 (0)