Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
719 changes: 330 additions & 389 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] }
tokio-stream = "0.1.17"
url = "2.5.4"
thiserror = "2.0.17"
futures-util = "0.3.31"

[dev-dependencies]
alloy-hardforks = "0.4.0"
Expand Down
3 changes: 2 additions & 1 deletion bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use tokio::select;
#[tokio::main(flavor = "multi_thread")]
async fn main() -> eyre::Result<()> {
let _guard = init4_bin_base::init4();
let init_span_guard = info_span!("builder initialization");
let init_span_guard = info_span!("builder initialization").entered();

builder::config_from_env();

// Set up env and metrics tasks
Expand Down
30 changes: 13 additions & 17 deletions src/quincey.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,23 @@ pub enum QuinceyError {

/// Error contacting the remote quincey API.
#[error("Error contacting quincey API: {0}")]
Remote(#[from] reqwest::Error),
Remote(reqwest::Error),

/// Error with the owned signet.
#[error("Error with owned signet: {0}")]
Owned(#[from] eyre::Report),
}

impl From<reqwest::Error> for QuinceyError {
fn from(err: reqwest::Error) -> Self {
if err.status() == Some(reqwest::StatusCode::FORBIDDEN) {
QuinceyError::NotOurSlot
} else {
QuinceyError::Remote(err)
}
}
}

/// A quincey client for making requests to the Quincey API.
#[derive(Debug, Clone)]
pub enum Quincey {
Expand Down Expand Up @@ -89,23 +99,9 @@ impl Quincey {

let token = token.secret().await?;

let resp = client
.post(url.clone())
.json(sig_request)
.bearer_auth(token)
.send()
.await
.map_err(QuinceyError::Remote)?;

if resp.status() == reqwest::StatusCode::FORBIDDEN {
return Err(QuinceyError::NotOurSlot);
}
let resp = client.post(url.clone()).json(sig_request).bearer_auth(token).send().await?;

resp.error_for_status()
.map_err(QuinceyError::Remote)?
.json::<SignResponse>()
.await
.map_err(QuinceyError::Remote)
resp.error_for_status()?.json::<SignResponse>().await.map_err(QuinceyError::Remote)
}

/// Get a signature for the provided request, by either using the owned
Expand Down
71 changes: 33 additions & 38 deletions src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
//! Bundler service responsible for fetching bundles and sending them to the simulator.
use crate::config::BuilderConfig;
use init4_bin_base::perms::SharedToken;
use reqwest::{Client, Url};
use signet_tx_cache::types::{TxCacheBundle, TxCacheBundlesResponse};
use init4_bin_base::perms::tx_cache::BuilderTxCache;
use signet_tx_cache::{TxCacheError, types::TxCacheBundle};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
task::JoinHandle,
time::{self, Duration},
};
use tracing::{Instrument, debug, error, trace, trace_span};
use tracing::{Instrument, error, trace, trace_span};

/// Poll interval for the bundle poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
Expand All @@ -18,10 +17,10 @@ const POLL_INTERVAL_MS: u64 = 1000;
pub struct BundlePoller {
/// The builder configuration values.
config: &'static BuilderConfig,
/// Authentication module that periodically fetches and stores auth tokens.
token: SharedToken,
/// Holds a Reqwest client
client: Client,

/// Client for the tx cache.
tx_cache: BuilderTxCache,

/// Defines the interval at which the bundler polls the tx-pool for bundles.
poll_interval_ms: u64,
}
Expand All @@ -42,34 +41,37 @@ impl BundlePoller {
/// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms.
pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self {
let config = crate::config();
let token = config.oauth_token();
Self { config, token, client: Client::new(), poll_interval_ms }
}

/// Fetches bundles from the transaction cache and returns them.
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<TxCacheBundle>> {
let bundle_url: Url = self.config.tx_pool_url.join("bundles")?;
let token =
self.token.secret().await.map_err(|e| eyre::eyre!("Failed to read token: {e}"))?;

self.client
.get(bundle_url)
.bearer_auth(token)
.send()
.await?
.error_for_status()?
.json()
.await
.map(|resp: TxCacheBundlesResponse| resp.bundles)
.map_err(Into::into)
let cache = signet_tx_cache::TxCache::new(config.tx_pool_url.clone());
let tx_cache = BuilderTxCache::new(cache, config.oauth_token());
Self { config, tx_cache, poll_interval_ms }
}

/// Returns the poll duration as a [`Duration`].
const fn poll_duration(&self) -> Duration {
Duration::from_millis(self.poll_interval_ms)
}

async fn task_future(mut self, outbound: UnboundedSender<TxCacheBundle>) {
/// Checks the bundle cache for new bundles.
pub async fn check_bundle_cache(&self) -> Result<Vec<TxCacheBundle>, TxCacheError> {
let res = self.tx_cache.get_bundles().await;

match res {
Ok(bundles) => {
trace!(count = ?bundles.len(), "found bundles");
Ok(bundles)
}
Err(TxCacheError::NotOurSlot) => {
trace!("Not our slot to fetch bundles");
Err(TxCacheError::NotOurSlot)
}
Err(err) => {
error!(?err, "Failed to fetch bundles from tx-cache");
Err(err)
}
}
}

async fn task_future(self, outbound: UnboundedSender<TxCacheBundle>) {
loop {
let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url);

Expand All @@ -85,17 +87,10 @@ impl BundlePoller {
// exit the span after the check.
drop(_guard);

if let Ok(bundles) = self
.check_bundle_cache()
.instrument(span.clone())
.await
.inspect_err(|err| debug!(%err, "Error fetching bundles"))
{
let _guard = span.entered();
trace!(count = ?bundles.len(), "found bundles");
if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await {
for bundle in bundles.into_iter() {
if let Err(err) = outbound.send(bundle) {
error!(err = ?err, "Failed to send bundle - channel is dropped");
span_debug!(span, ?err, "Failed to send bundle - channel is dropped");
break;
}
}
Expand Down
74 changes: 24 additions & 50 deletions src/tasks/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,26 +276,7 @@ impl EnvTask {

drop(span);

// This span will be updated at the end of each loop iteration.
let mut span = info_span!(
parent: None,
"SimEnv",
confirmed.host.number = "initial",
confirmed.ru.number = "initial",
confirmed.ru.hash = "initial",
confirmed.timestamp = 0,
confirmed.slot = 0,
sim.host.number = "initial",
sim.ru.number = "initial",
sim.slot = 0,
trace_id = tracing::field::Empty,
);

while let Some(rollup_header) = rollup_headers
.next()
.instrument(info_span!(parent: &span, "waiting_for_notification"))
.await
{
while let Some(rollup_header) = rollup_headers.next().await {
let host_block_number =
self.config.constants.rollup_block_to_host_block_num(rollup_header.number);
let rollup_block_number = rollup_header.number;
Expand All @@ -306,17 +287,23 @@ impl EnvTask {
.expect("valid timestamp");
let sim_slot = self.config.slot_calculator.current_slot().expect("chain has started");

// Populate span fields.
// Create a `BlockConstruction` span
let span = info_span!(
parent: None,
"BlockConstruction",
confirmed.host.number = host_block_number,
confirmed.host.hash = tracing::field::Empty,
confirmed.ru.number = rollup_block_number,
confirmed.ru.hash = %rollup_header.hash,
confirmed.timestamp = rollup_header.timestamp,
confirmed.slot = confirmed_slot,
sim.host.number = host_block_number + 1,
sim.ru.number = rollup_block_number + 1,
sim.slot = sim_slot,
trace_id = tracing::field::Empty,
);
// Ensure that we record the OpenTelemetry trace ID in the span.
span.record("trace_id", span.context().span().span_context().trace_id().to_string());
span.record("confirmed.host.number", host_block_number);
span.record("confirmed.ru.number", rollup_block_number);
span.record("confirmed.ru.hash", rollup_header.hash.to_string());
span.record("confirmed.timestamp", rollup_header.timestamp);
span.record("confirmed.slot", confirmed_slot);
span.record("sim.slot", sim_slot);
span.record("sim.host.number", host_block_number + 1);
span.record("sim.ru.number", rollup_block_number + 1);

let (host_block_res, quincey_res) = tokio::join!(
self.host_provider
Expand All @@ -334,15 +321,15 @@ impl EnvTask {
Err(QuinceyError::NotOurSlot) => {
span_debug!(
span,
"not our slot according to quincey - skipping block submission"
"not our slot according to quincey - skipping block construction"
);
continue;
}
Err(err) => {
span_error!(
span,
%err,
"error during quincey preflight check - skipping block submission"
"error during quincey preflight check - skipping block construction"
);
continue;
}
Expand All @@ -352,16 +339,17 @@ impl EnvTask {
let host_block_opt = res_unwrap_or_continue!(
host_block_res,
span,
error!("error fetching previous host block - skipping block submission")
error!("error fetching previous host block - skipping block construction")
);

let host_header = opt_unwrap_or_continue!(
host_block_opt,
span,
warn!("previous host block not found - skipping block submission")
warn!("previous host block not found - skipping block construction")
)
.header
.inner;
.header;

span.record("confirmed.host.hash", host_header.hash.to_string());

if rollup_header.timestamp != host_header.timestamp {
span_warn!(
Expand All @@ -375,7 +363,7 @@ impl EnvTask {

// Construct the block env using the previous block header
let rollup_env = self.construct_rollup_env(rollup_header.into());
let host_env = self.construct_host_env(host_header);
let host_env = self.construct_host_env(host_header.inner);

span_info!(
span,
Expand All @@ -389,20 +377,6 @@ impl EnvTask {
tracing::debug!("receiver dropped, stopping task");
break;
}

// Create a new span for the next iteration.
span = info_span!(
"SimEnv",
confirmed.host.number = host_block_number + 1,
confirmed.ru.number = rollup_block_number + 1,
confirmed.ru.hash = tracing::field::Empty,
confirmed.timestamp = tracing::field::Empty,
confirmed.slot = tracing::field::Empty,
sim.host.number = host_block_number + 2,
sim.ru.number = rollup_block_number + 2,
sim.slot = tracing::field::Empty,
trace_id = tracing::field::Empty,
);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/tasks/submit/flashbots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl FlashbotsTask {
);

let tx = prep.prep_transaction(sim_result.prev_host()).await?;

let sendable = self
.host_provider()
.fill(tx.into_request())
Expand Down
25 changes: 13 additions & 12 deletions src/tasks/submit/prep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use alloy::{
rpc::types::TransactionRequest,
sol_types::SolCall,
};
use futures_util::FutureExt;
use init4_bin_base::deps::metrics::counter;
use signet_sim::BuiltBlock;
use signet_types::{SignRequest, SignResponse};
Expand Down Expand Up @@ -103,14 +104,13 @@ impl<'a> SubmitPrep<'a> {
}

/// Encodes the rollup block into a sidecar.
#[instrument(skip(self), level = "debug")]
async fn build_sidecar(&self) -> eyre::Result<BlobTransactionSidecar> {
let sidecar = self.block.encode_blob::<SimpleCoder>().build()?;

Ok(sidecar)
self.block.encode_blob::<SimpleCoder>().build().map_err(Into::into)
}

/// Build a signature and header input for the host chain transaction.
async fn build_input(&self) -> eyre::Result<Vec<u8>> {
async fn build_input(&self) -> eyre::Result<Bytes> {
let (v, r, s) = self.quincey_signature().await?;

let header = Zenith::BlockHeader {
Expand All @@ -120,19 +120,21 @@ impl<'a> SubmitPrep<'a> {
rewardAddress: self.sig_request().ru_reward_address,
blockDataHash: *self.block.contents_hash(),
};
debug!(?header.hostBlockNumber, "built zenith block header");

let data = Zenith::submitBlockCall { header, v, r, s, _4: Bytes::new() }.abi_encode();
let call = Zenith::submitBlockCall { header, v, r, s, _4: Bytes::new() };

Ok(data)
Ok(call.abi_encode().into())
}

/// Create a new transaction request for the host chain.
async fn new_tx_request(&self) -> eyre::Result<TransactionRequest> {
let nonce =
self.provider.get_transaction_count(self.provider.default_signer_address()).await?;
let nonce_fut = self
.provider
.get_transaction_count(self.provider.default_signer_address())
.into_future()
.map(|res| res.map_err(Into::into));

let (sidecar, input) = try_join!(self.build_sidecar(), self.build_input())?;
let (nonce, sidecar, input) =
try_join!(nonce_fut, self.build_sidecar(), self.build_input())?;

let tx = TransactionRequest::default()
.with_blob_sidecar(sidecar)
Expand All @@ -144,7 +146,6 @@ impl<'a> SubmitPrep<'a> {
}

/// Prepares a transaction for submission to the host chain.
#[instrument(skip_all, level = "debug")]
pub async fn prep_transaction(self, prev_host: &Header) -> eyre::Result<Bumpable> {
let req = self.new_tx_request().in_current_span().await?;
Ok(Bumpable::new(req, prev_host))
Expand Down
2 changes: 1 addition & 1 deletion tests/bundle_poller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async fn test_bundle_poller_roundtrip() -> Result<()> {
setup_logging();
setup_test_config();

let mut bundle_poller = builder::tasks::cache::BundlePoller::new();
let bundle_poller = builder::tasks::cache::BundlePoller::new();

let _ = bundle_poller.check_bundle_cache().await?;

Expand Down