Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions lean_client/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions lean_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"http_api",
"metrics",
"networking",
"spec_test_fixtures",
"validator",
"xmss",
]
Expand Down Expand Up @@ -227,6 +228,7 @@ fork_choice = { path = "./fork_choice" }
http_api = { path = "./http_api" }
metrics = { path = "./metrics" }
networking = { path = "./networking" }
spec_test_fixtures = { path = "./spec_test_fixtures" }
validator = { path = "./validator" }
xmss = { path = "./xmss" }

Expand All @@ -245,6 +247,7 @@ futures = "0.3"
features = { git = "https://github.com/grandinetech/grandine", rev = "64afdee3c6be79fceffb66933dcb69a943f3f1ae" }
git-version = "0.3"
hex = "0.4.3"
indexmap = "2"
http-body-util = "0.1"
http_api_utils = { git = "https://github.com/grandinetech/grandine", rev = "64afdee3c6be79fceffb66933dcb69a943f3f1ae" }
k256 = "0.13"
Expand Down
21 changes: 21 additions & 0 deletions lean_client/containers/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,27 @@ impl State {
.start_timer()
});

// Each unique AttestationData must appear at most once per block.
// Mirrors leanSpec spec.py:1247-1252. Our own builder collapses
// duplicates upstream via `aggregate_by_data`, so this guards
// externally-built blocks.
ensure!(
!AggregatedAttestation::has_duplicate_data(attestations),
"Block contains duplicate AttestationData entries; \
each AttestationData must appear at most once",
);

// Cap distinct AttestationData entries per block. Mirrors leanSpec
// spec.py:1253-1256. With the duplicate check above, len ==
// distinct-count, so this is equivalent to the spec's
// `len(att_data_set) <= MAX_ATTESTATIONS_DATA`.
ensure!(
(attestations.len_u64() as usize) <= MAX_ATTESTATIONS_DATA,
"Block contains {} distinct AttestationData entries; maximum is {}",
attestations.len_u64(),
MAX_ATTESTATIONS_DATA,
);

ensure!(
self.justifications_roots
.into_iter()
Expand Down
1 change: 1 addition & 0 deletions lean_client/fork_choice/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ anyhow = { workspace = true }
bls = { workspace = true }
containers = { workspace = true }
env-config = { workspace = true }
indexmap = { workspace = true }
metrics = { workspace = true }
ssz = { workspace = true }
tracing = { workspace = true }
Expand Down
32 changes: 25 additions & 7 deletions lean_client/fork_choice/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ pub fn on_block(
store: &mut Store,
cache: &mut BlockCache,
signed_block: SignedBlock,
verify_signatures: bool,
) -> Result<BlockOutcome> {
let block_root = signed_block.block.hash_tree_root();

Expand All @@ -509,23 +510,34 @@ pub fn on_block(
);
}

process_block_internal(store, signed_block, block_root)?;
process_pending_blocks(store, cache, vec![block_root]);
process_block_internal(store, signed_block, block_root, verify_signatures)?;
process_pending_blocks(store, cache, vec![block_root], verify_signatures);

Ok(BlockOutcome::Applied)
}

/// CPU-bound portion of block processing: verify XMSS signatures against the parent state
/// and run the state transition. Safe to run on a `DedicatedExecutor` thread because it
/// touches no `Store` state.
pub fn verify_and_transition(parent_state: State, signed_block: SignedBlock) -> Result<State> {
///
/// Pass `verify_signatures = false` to skip the cryptographic signature check — only
/// safe when signatures have already been validated upstream or when the caller is
/// driving the function with synthetic signature placeholders (e.g. spec-test fixtures
/// that ship unsigned blocks).
pub fn verify_and_transition(
parent_state: State,
signed_block: SignedBlock,
verify_signatures: bool,
) -> Result<State> {
let _timer = METRICS.get().map(|metrics| {
metrics
.lean_fork_choice_block_processing_time_seconds
.start_timer()
});

signed_block.verify_signatures(parent_state.clone())?;
if verify_signatures {
signed_block.verify_signatures(parent_state.clone())?;
}
parent_state.state_transition(signed_block, true)
}

Expand Down Expand Up @@ -822,6 +834,7 @@ fn process_block_internal(
store: &mut Store,
signed_block: SignedBlock,
block_root: H256,
verify_signatures: bool,
) -> Result<()> {
let block = signed_block.block.clone();
let attestations_count = block.body.attestations.len_u64();
Expand All @@ -841,11 +854,16 @@ fn process_block_internal(
"Processing block - parent state info"
);

let new_state = verify_and_transition(parent_state, signed_block.clone())?;
let new_state = verify_and_transition(parent_state, signed_block.clone(), verify_signatures)?;
apply_verified_block(store, signed_block, new_state, block_root)
}

pub fn process_pending_blocks(store: &mut Store, cache: &mut BlockCache, mut roots: Vec<H256>) {
pub fn process_pending_blocks(
store: &mut Store,
cache: &mut BlockCache,
mut roots: Vec<H256>,
verify_signatures: bool,
) {
while let Some(parent_root) = roots.pop() {
let children: Vec<(H256, SignedBlock)> = cache
.get_children(&parent_root)
Expand All @@ -855,7 +873,7 @@ pub fn process_pending_blocks(store: &mut Store, cache: &mut BlockCache, mut roo

for (child_root, child_block) in children {
cache.remove(&child_root);
if process_block_internal(store, child_block, child_root).is_ok() {
if process_block_internal(store, child_block, child_root, verify_signatures).is_ok() {
roots.push(child_root);
}
}
Expand Down
20 changes: 12 additions & 8 deletions lean_client/fork_choice/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use containers::{
AggregatedSignatureProof, AttestationData, Block, BlockHeader, Checkpoint, Config,
SignatureKey, SignedAggregatedAttestation, SignedAttestation, SignedBlock, Slot, State,
};
use indexmap::IndexMap;
use metrics::{METRICS, set_gauge_u64};
use ssz::{H256, SszHash};
use tracing::{info, warn};
Expand Down Expand Up @@ -74,14 +75,17 @@ pub struct Store {
/// Aggregated signature proofs from block bodies (on-chain).
/// These are attestations that have been included in blocks and are part of
/// the "known" pool for safe target computation.
/// Keyed by attestation data root (H256).
pub latest_known_aggregated_payloads: HashMap<H256, Vec<AggregatedSignatureProof>>,
/// Keyed by attestation data root (H256). `IndexMap` preserves insertion
/// order so same-slot equivocation tie-breaks are deterministic and match
/// leanSpec's first-vote-wins semantics (Python dict insertion order).
pub latest_known_aggregated_payloads: IndexMap<H256, Vec<AggregatedSignatureProof>>,

/// Aggregated signature proofs from gossip aggregation topic.
/// These are newly received aggregations that haven't been migrated to "known" yet.
/// At interval 3, we merge this with latest_known_aggregated_payloads for safe target.
/// Keyed by attestation data root (H256).
pub latest_new_aggregated_payloads: HashMap<H256, Vec<AggregatedSignatureProof>>,
/// Keyed by attestation data root (H256). See note on the `known` pool above
/// for why this is `IndexMap`.
pub latest_new_aggregated_payloads: IndexMap<H256, Vec<AggregatedSignatureProof>>,

/// Attestation data indexed by hash (data_root).
/// Used to look up the exact attestation data that was signed when
Expand Down Expand Up @@ -264,8 +268,8 @@ pub fn get_forkchoice_store(
latest_known_attestations: HashMap::new(),
latest_new_attestations: HashMap::new(),
gossip_signatures: HashMap::new(),
latest_known_aggregated_payloads: HashMap::new(),
latest_new_aggregated_payloads: HashMap::new(),
latest_known_aggregated_payloads: IndexMap::new(),
latest_new_aggregated_payloads: IndexMap::new(),
attestation_data_by_root: HashMap::new(),
pending_attestations: HashMap::new(),
pending_aggregated_attestations: HashMap::new(),
Expand Down Expand Up @@ -438,7 +442,7 @@ pub fn update_head(store: &mut Store) {
/// Walks through all aggregated proofs and extracts the latest attestation
/// data for each validator based on their participation bits.
fn extract_attestations_from_aggregated_payloads(
payloads: &HashMap<H256, Vec<AggregatedSignatureProof>>,
payloads: &IndexMap<H256, Vec<AggregatedSignatureProof>>,
attestation_data_by_root: &HashMap<H256, AttestationData>,
) -> HashMap<u64, AttestationData> {
let mut attestations: HashMap<u64, AttestationData> = HashMap::new();
Expand Down Expand Up @@ -525,7 +529,7 @@ pub fn accept_new_attestations(store: &mut Store) {
.extend(store.latest_new_attestations.drain());
// Promote gossip-received aggregated proofs to the known pool so they
// are available for block production at the next interval 0.
for (data_root, proofs) in store.latest_new_aggregated_payloads.drain() {
for (data_root, proofs) in store.latest_new_aggregated_payloads.drain(..) {
store
.latest_known_aggregated_payloads
.entry(data_root)
Expand Down
2 changes: 1 addition & 1 deletion lean_client/fork_choice/tests/fork_choice_test_vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ fn forkchoice(spec_file: &str) {
(store.config.genesis_time + (signed_block.block.slot.0 * 4)) * 1000;
on_tick(&mut store, block_time_millis, false);

on_block(&mut store, &mut cache, signed_block).unwrap();
on_block(&mut store, &mut cache, signed_block, true).unwrap();
Ok(block_root)
}));

Expand Down
3 changes: 3 additions & 0 deletions lean_client/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = { workspace = true }
anyhow = { workspace = true }
axum = { workspace = true }
clap = { workspace = true }
containers = { workspace = true }
fork_choice = { workspace = true }
validator = { workspace = true }
futures = { workspace = true }
Expand All @@ -15,10 +16,12 @@ metrics = { workspace = true }
parking_lot = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
spec_test_fixtures = { workspace = true }
ssz = { workspace = true }
tokio = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
xmss = { workspace = true }

[dev-dependencies]
fork_choice = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion lean_client/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ mod config;
mod handlers;
mod routing;
mod server;
mod test_driver;

pub use aggregator_controller::AggregatorController;
pub use config::HttpServerConfig;
pub use handlers::SharedStore;
pub use routing::normal_routes;
pub use server::run_server;
pub use server::{run_server, run_test_driver_server};
pub use test_driver::{TestDriverState, test_driver_routes};
27 changes: 26 additions & 1 deletion lean_client/http_api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use futures::{TryFutureExt as _, future::FutureExt as _};
use tracing::info;

use crate::{
aggregator_controller::SharedController, config::HttpServerConfig, handlers::SharedStore,
aggregator_controller::SharedController,
config::HttpServerConfig,
handlers::SharedStore,
routing::normal_routes,
test_driver::{TestDriverState, test_driver_routes},
};

pub async fn run_server(
Expand All @@ -15,7 +18,29 @@ pub async fn run_server(
aggregator_controller: SharedController,
) -> Result<()> {
let router = normal_routes(&config, store, aggregator_controller);
serve(config, router).await
}

/// Variant of [`run_server`] that additionally mounts the
/// `/lean/v0/test_driver/*` endpoints.
///
/// The test-driver routes are needed for the hive `spec-assets-*` test
/// suites; they are gated behind a separate startup path so they cannot
/// accidentally be served in production. The caller is responsible for
/// deciding (e.g. via the `HIVE_LEAN_TEST_DRIVER` environment variable)
/// whether to invoke this variant or [`run_server`].
pub async fn run_test_driver_server(
config: HttpServerConfig,
store: SharedStore,
aggregator_controller: SharedController,
) -> Result<()> {
let driver_state = TestDriverState::new(store.clone());
let router = normal_routes(&config, store, aggregator_controller)
.merge(test_driver_routes(driver_state));
serve(config, router).await
}

async fn serve(config: HttpServerConfig, router: axum::Router) -> Result<()> {
let listener = config
.listener()
.await
Expand Down
Loading
Loading