Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4ef2f37
fix(put): use hop-by-hop routing instead of embedded origin field
sanity Dec 8, 2025
4b2ec5a
refactor(put): simplify to Request/Response pattern (~60% code reduct…
sanity Dec 8, 2025
d3b154b
fix(put): restore subscribe parameter functionality
sanity Dec 8, 2025
68e2b4f
fix(put): restore subscribe parameter functionality
sanity Dec 8, 2025
dc12f33
fix(put): trigger Update when PUT changes state on subscribed contract
sanity Dec 8, 2025
0c52318
refactor: complete hop-by-hop routing for all operations
sanity Dec 8, 2025
67f7e40
fix: address race condition and block_on issues in hop-by-hop routing
sanity Dec 8, 2025
5add854
fix(get): extract target_addr from current_target for forward Requests
sanity Dec 8, 2025
0895dc5
refactor: simplify Update messages and rename target to next_hop
sanity Dec 8, 2025
f053f16
fix: enable UPDATE hop-by-hop routing via notify_op_change
sanity Dec 8, 2025
53b05be
fix: UPDATE hop-by-hop routing - fix timing issue where operation was…
sanity Dec 8, 2025
c73e058
fix: use unique temp directories in store tests to avoid permission c…
sanity Dec 8, 2025
5529c99
fix: race condition in hop-by-hop routing where state saved after send
sanity Dec 8, 2025
a729a03
ci: enable transport debug logging to diagnose packet delivery issues
sanity Dec 8, 2025
5fcc35c
fix(ci): allow info level for test_utils in RUST_LOG
sanity Dec 8, 2025
f88d3e9
ci: re-trigger CI to check for flaky test
sanity Dec 8, 2025
bc7ba23
ci: re-run tests (attempt 2)
sanity Dec 8, 2025
b6b3cc4
refactor: rename target_addr to next_hop_addr for clarity
sanity Dec 10, 2025
5893baf
ci: enable info logging for PUT operations to diagnose test failure
sanity Dec 10, 2025
e62c9bd
ci: trigger re-run to verify test stability
sanity Dec 10, 2025
6254e43
ci: add diagnostic println for connectivity test mesh check
sanity Dec 11, 2025
df59bd9
ci: add more diagnostic logging for pub_key update and QueryConnections
sanity Dec 11, 2025
b837f90
fix: resolve address mismatch in handle_connect_peer for transient pr…
sanity Dec 11, 2025
2e32f13
feat: improve operation lifecycle logging for CI debugging
sanity Dec 11, 2025
6a3b438
ci: use JSON log format for structured output
sanity Dec 11, 2025
a67df53
ci: add op_state_manager and connect to RUST_LOG for timeout visibility
sanity Dec 11, 2025
18029d9
ci: add target_connections and ttl to connect start log
sanity Dec 11, 2025
1842f2f
test: add target_connections logging to debug flaky connect tests
sanity Dec 11, 2025
4b246ed
test: force connection teardown to catch NAT routing bugs determinist…
sanity Dec 11, 2025
1cf3229
Revert "test: force connection teardown to catch NAT routing bugs det…
sanity Dec 11, 2025
118bfcb
fix(put): store next_hop in AwaitingResponse state for routing
sanity Dec 11, 2025
ed60048
ci: remove debug logging from CI workflow
sanity Dec 11, 2025
a9ef9a3
build: remove broken wiki submodule reference
sanity Dec 12, 2025
4890a1f
refactor(test): improve structured logging in test_put_with_subscribe…
sanity Dec 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ jobs:
- name: Test
# Limit test threads to reduce resource contention on high-core-count runners
# Without this, 64+ parallel tests cause timing-sensitive network tests to fail
run: cargo test --workspace --no-default-features --features trace,websocket,redb -- --test-threads=8
# --nocapture shows logs in real-time instead of buffering until test completion
run: cargo test --workspace --no-default-features --features trace,websocket,redb -- --test-threads=8 --nocapture

six_peer_regression:
name: six-peer-regression
Expand Down
29 changes: 0 additions & 29 deletions crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,35 +462,6 @@ impl ComposeNetworkMessage<operations::subscribe::SubscribeOp> for SubscribeCont
}
}

#[allow(unused)]
struct PutContract {
contract: ContractContainer,
state: WrappedState,
related_contracts: RelatedContracts<'static>,
}

impl ComposeNetworkMessage<operations::put::PutOp> for PutContract {
fn initiate_op(self, op_manager: &OpManager) -> operations::put::PutOp {
let PutContract {
contract,
state,
related_contracts,
} = self;
operations::put::start_op(
contract,
related_contracts,
state,
op_manager.ring.max_hops_to_live,
false,
)
}

async fn resume_op(op: operations::put::PutOp, op_manager: &OpManager) -> Result<(), OpError> {
operations::put::request_put(op_manager, op).await
}
}

#[allow(unused)]
struct UpdateContract {
Copy link
Collaborator

@iduartgomez iduartgomez Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this removed ? we no longer initiate puts from the executor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PutContract struct was already marked #[allow(unused)] - it was dead code that was never called. The executor initiates PUTs through a different path (via ClientRequest::ContractOphandle_client_request), not through the ComposeNetworkMessage trait.

I removed it as part of the code cleanup since it added ~30 lines of unused code that would need to be maintained. The SubscribeContract and UpdateContract structs are kept because they're actually used.

[AI-assisted - Claude]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the git history more closely:

PutContract was originally added and used by you in commit c0d8230 ("Revamp upsert contract logic at wasm executor", Dec 2023). It was used to initiate PUT operations from the executor via self.op_request(request).

However, the usage was removed in commit 1e9ee5a ("Break executor source and some other changes", also Dec 2023) - that commit removed the op_request(PutContract) calls but left the struct definition behind with #[allow(unused)].

So the struct has been dead code since that refactor ~2 years ago. The executor currently initiates PUTs through a different path (via ClientRequest::ContractOp handling in the node event loop).

Should we restore the executor's ability to initiate PUTs via ComposeNetworkMessage? If so, I can add that back. Otherwise, the dead code removal seems appropriate.

[AI-assisted - Claude]

key: ContractKey,
new_state: WrappedState,
Expand Down
28 changes: 3 additions & 25 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! See `architecture.md`.

use std::{
borrow::{Borrow, Cow},
borrow::Cow,
fmt::Display,
net::SocketAddr,
time::{Duration, SystemTime},
Expand Down Expand Up @@ -98,7 +98,8 @@ impl Transaction {
self.id.0.to_le_bytes()
}

fn elapsed(&self) -> Duration {
/// Returns the elapsed time since this transaction was created.
pub fn elapsed(&self) -> Duration {
let current_unix_epoch_ts = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("now should be always be later than unix epoch")
Expand Down Expand Up @@ -264,8 +265,6 @@ mod sealed_msg_type {
pub(crate) trait MessageStats {
fn id(&self) -> &Transaction;

fn target(&self) -> Option<PeerKeyLocation>;

fn requested_location(&self) -> Option<Location>;
}

Expand Down Expand Up @@ -397,8 +396,6 @@ impl From<NetMessage> for semver::Version {
pub trait InnerMessage: Into<NetMessage> {
fn id(&self) -> &Transaction;

fn target(&self) -> Option<impl Borrow<PeerKeyLocation>>;

fn requested_location(&self) -> Option<Location>;
}

Expand Down Expand Up @@ -551,12 +548,6 @@ impl MessageStats for NetMessage {
}
}

fn target(&self) -> Option<PeerKeyLocation> {
match self {
NetMessage::V1(msg) => msg.target(),
}
}

fn requested_location(&self) -> Option<Location> {
match self {
NetMessage::V1(msg) => msg.requested_location(),
Expand All @@ -578,19 +569,6 @@ impl MessageStats for NetMessageV1 {
}
}

fn target(&self) -> Option<PeerKeyLocation> {
match self {
NetMessageV1::Connect(op) => op.target().cloned(),
NetMessageV1::Put(op) => op.target().as_ref().map(|b| b.borrow().clone()),
NetMessageV1::Get(op) => op.target().as_ref().map(|b| b.borrow().clone()),
NetMessageV1::Subscribe(op) => op.target().as_ref().map(|b| b.borrow().clone()),
NetMessageV1::Update(op) => op.target().as_ref().map(|b| b.borrow().clone()),
NetMessageV1::Aborted(_) => None,
NetMessageV1::Unsubscribed { .. } => None,
NetMessageV1::ProximityCache { .. } => None,
}
}

fn requested_location(&self) -> Option<Location> {
match self {
NetMessageV1::Connect(op) => op.requested_location(),
Expand Down
369 changes: 162 additions & 207 deletions crates/core/src/node/network_bridge/p2p_protoc.rs

Large diffs are not rendered by default.

65 changes: 61 additions & 4 deletions crates/core/src/node/op_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,49 @@ impl OpManager {
Ok(())
}

/// Peek at the next hop address for an operation without removing it.
/// Used by hop-by-hop routing to determine where to send initial outbound messages.
/// Returns None if the operation doesn't exist or doesn't have a next hop address.
pub fn peek_next_hop_addr(&self, id: &Transaction) -> Option<std::net::SocketAddr> {
if self.ops.completed.contains(id) || self.ops.under_progress.contains(id) {
return None;
}
match id.transaction_type() {
TransactionType::Connect => self
.ops
.connect
.get(id)
.and_then(|op| op.get_next_hop_addr()),
TransactionType::Put => self.ops.put.get(id).and_then(|op| op.get_next_hop_addr()),
TransactionType::Get => self.ops.get.get(id).and_then(|op| op.get_next_hop_addr()),
TransactionType::Subscribe => self
.ops
.subscribe
.get(id)
.and_then(|op| op.get_next_hop_addr()),
TransactionType::Update => self
.ops
.update
.get(id)
.and_then(|op| op.get_next_hop_addr()),
}
}

/// Peek at the full target peer (including public key) without removing the operation.
/// Used when establishing new connections where we need the public key for handshake.
pub fn peek_target_peer(&self, id: &Transaction) -> Option<PeerKeyLocation> {
if self.ops.completed.contains(id) || self.ops.under_progress.contains(id) {
return None;
}
match id.transaction_type() {
TransactionType::Connect => {
self.ops.connect.get(id).and_then(|op| op.get_target_peer())
}
// Other operations only store addresses, not full peer info
_ => None,
}
}

pub fn pop(&self, id: &Transaction) -> Result<Option<OpEnum>, OpNotAvailable> {
if self.ops.completed.contains(id) {
return Err(OpNotAvailable::Completed);
Expand Down Expand Up @@ -653,9 +696,11 @@ impl OpManager {
/// Notify the operation manager that a transaction is being transacted over the network.
pub fn sending_transaction(&self, peer: &PeerKeyLocation, msg: &NetMessage) {
let transaction = msg.id();
if let (Some(recipient), Some(target)) = (msg.target(), msg.requested_location()) {
// With hop-by-hop routing, record the request using the peer we're sending to
// and the message's requested location (contract location)
if let Some(target_loc) = msg.requested_location() {
self.ring
.record_request(recipient.clone(), target, transaction.transaction_type());
.record_request(peer.clone(), target_loc, transaction.transaction_type());
}
if let Some(peer_addr) = peer.socket_addr() {
self.ring
Expand Down Expand Up @@ -767,7 +812,13 @@ async fn garbage_cleanup_task<ER: NetEventRegister>(
} else {
ops.under_progress.remove(&tx);
ops.completed.remove(&tx);
tracing::debug!("Transaction timed out: {tx}");
tracing::info!(
tx = %tx,
tx_type = ?tx.transaction_type(),
elapsed_ms = tx.elapsed().as_millis(),
ttl_ms = crate::config::OPERATION_TTL.as_millis(),
"Transaction timed out"
);

// Check if this is a child operation and propagate timeout to parent
if let Some(parent_tx) = sub_op_tracker.get_parent(tx) {
Expand Down Expand Up @@ -812,7 +863,13 @@ async fn garbage_cleanup_task<ER: NetEventRegister>(
TransactionType::Update => ops.update.remove(&tx).is_some(),
};
if removed {
tracing::debug!("Transaction timed out: {tx}");
tracing::info!(
tx = %tx,
tx_type = ?tx.transaction_type(),
elapsed_ms = tx.elapsed().as_millis(),
ttl_ms = crate::config::OPERATION_TTL.as_millis(),
"Transaction timed out"
);

// Check if this is a child operation and propagate timeout to parent
if let Some(parent_tx) = sub_op_tracker.get_parent(tx) {
Expand Down
Loading
Loading