From 6a4df07aaf8f02571004cca3455a87265f1a42ca Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Tue, 10 Feb 2026 15:43:30 +0800 Subject: [PATCH 01/10] feat: add flashblock reorg test scripts --- devnet/scripts/test_flashblock_reorg.py | 664 ++++++++++++++++++++++++ devnet/scripts/test_transfer_leader.sh | 33 ++ 2 files changed, 697 insertions(+) create mode 100755 devnet/scripts/test_flashblock_reorg.py create mode 100644 devnet/scripts/test_transfer_leader.sh diff --git a/devnet/scripts/test_flashblock_reorg.py b/devnet/scripts/test_flashblock_reorg.py new file mode 100755 index 00000000..3d764cd5 --- /dev/null +++ b/devnet/scripts/test_flashblock_reorg.py @@ -0,0 +1,664 @@ +#!/usr/bin/env python3 +""" +Test script to verify flashblock transactions are preserved across sequencer/builder changes. + +This script monitors flashblocks via WebSocket and verifies that all flashblock transactions +(index > 0) eventually appear in canonical blocks, even after a sequencer/builder switch. + +Usage: + python test_flashblock_reorg_mitigation.py [--ws-url URL] [--rpc-url URL] [--duration SECONDS] [--verbose] + +Example: + python test_flashblock_reorg_mitigation.py --ws-url ws://localhost:11111 --rpc-url http://localhost:8124 +""" + +import argparse +import asyncio +import json +import signal +import sys +import time +from dataclasses import dataclass, field +from enum import Enum +from typing import Dict, List, Optional, Set +from urllib.error import URLError +from urllib.request import Request, urlopen + +try: + from websockets import connect + from websockets.exceptions import ConnectionClosed, WebSocketException +except ImportError: + print("Error: websockets library not installed. Run: pip install websockets") + sys.exit(1) + +try: + import rlp + HAS_RLP = True +except ImportError: + HAS_RLP = False + + + +def _get_keccak256_impl(): + """Find and return a keccak256 implementation.""" + # Try pycryptodome + try: + from Crypto.Hash import keccak + def _keccak256(data: bytes) -> bytes: + k = keccak.new(digest_bits=256) + k.update(data) + return k.digest() + return _keccak256 + except ImportError: + pass + + # Try eth-hash + try: + from eth_hash.auto import keccak as eth_keccak + def _keccak256(data: bytes) -> bytes: + return eth_keccak(data) + return _keccak256 + except ImportError: + pass + + # Try pysha3 + try: + import sha3 + def _keccak256(data: bytes) -> bytes: + k = sha3.keccak_256() + k.update(data) + return k.digest() + return _keccak256 + except ImportError: + pass + + # No implementation found + return None + + +# Initialize keccak256 at module load time +_keccak256_impl = _get_keccak256_impl() + +if _keccak256_impl is None: + print("=" * 60) + print("ERROR: No keccak256 implementation found!") + print("=" * 60) + print("Ethereum uses keccak256 (NOT sha3-256) to hash transactions.") + print("Please install one of the following:") + print() + print(" pip install pycryptodome") + print(" pip install eth-hash[pycryptodome]") + print(" pip install pysha3") + print() + print("=" * 60) + sys.exit(1) + + +def keccak256(data: bytes) -> bytes: + """Compute keccak256 hash.""" + return _keccak256_impl(data) + + +_decode_error_logged = False + +def decode_tx_hash(raw_tx_hex: str) -> Optional[str]: + """Decode transaction hash from RLP-encoded hex string.""" + global _decode_error_logged + try: + # Remove 0x prefix if present + if raw_tx_hex.startswith("0x"): + raw_tx_hex = raw_tx_hex[2:] + + raw_bytes = bytes.fromhex(raw_tx_hex) + + # Transaction hash is keccak256 of the RLP-encoded transaction + tx_hash = keccak256(raw_bytes) + return "0x" + tx_hash.hex() + except Exception as e: + if not _decode_error_logged: + print(f"\nWARNING: Transaction decode failed: {type(e).__name__}: {e}") + print(f" Raw tx (first 100 chars): {raw_tx_hex[:100]}...") + print(" Make sure pycryptodome is installed: pip install pycryptodome\n") + _decode_error_logged = True + return None + + +class TxStatus(Enum): + PENDING = "pending" + CONFIRMED = "confirmed" + MISSING = "missing" + + +@dataclass +class TrackedTransaction: + tx_hash: str + parent_hash: str + flashblock_index: int + block_number: int # Expected block number + first_seen_at: float + status: TxStatus = TxStatus.PENDING + + def __hash__(self): + return hash(self.tx_hash) + + +@dataclass +class BlockTracker: + """Tracks flashblock transactions and canonical block confirmations.""" + + # All tracked transactions: tx_hash -> TrackedTransaction + transactions: Dict[str, TrackedTransaction] = field(default_factory=dict) + + # Transactions grouped by expected block number: block_number -> set of tx_hashes + txs_by_block: Dict[int, Set[str]] = field(default_factory=dict) + + # Canonical blocks we've seen: block_number -> set of tx_hashes in that block + canonical_blocks: Dict[int, Set[str]] = field(default_factory=dict) + + # Latest canonical block number + latest_canonical_block: int = 0 + + # Statistics + total_flashblocks_received: int = 0 + total_txs_tracked: int = 0 + total_confirmed: int = 0 + total_missing: int = 0 + total_reorg_count: int = 0 + + # Track reconnections + reconnection_count: int = 0 + + # Blocks to finalize after N confirmations + blocks_to_confirm_after: int = 2 + + +class ReorgDetectedException(Exception): + """Raised when flashblock transactions are missing from canonical chain.""" + pass + +class FlashblockReorgTester: + def __init__( + self, + ws_url: str, + rpc_url: str, + duration: Optional[int] = None, + verbose: bool = False, + ): + self.ws_url = ws_url + self.rpc_url = rpc_url + self.duration = duration + self.verbose = verbose + self.tracker = BlockTracker() + self.running = True + self.start_time = None + + # Current pending block info from flashblocks + self.current_parent_hash: Optional[str] = None + self.current_block_number: Optional[int] = None + self.current_payload_id: Optional[str] = None + + def log(self, message: str, force: bool = False): + """Log message if verbose mode or forced.""" + if self.verbose or force: + timestamp = time.strftime("%H:%M:%S") + print(f"[{timestamp}] {message}") + + def log_always(self, message: str): + """Always log this message.""" + self.log(message, force=True) + + def get_block_from_rpc(self, block_id: str = "latest") -> Optional[dict]: + """Fetch block from RPC endpoint.""" + try: + payload = { + "jsonrpc": "2.0", + "id": 1, + "method": "eth_getBlockByNumber", + "params": [block_id, True], # True = include full tx objects + } + req = Request( + self.rpc_url, + data=json.dumps(payload).encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + with urlopen(req, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return data.get("result") + except (URLError, Exception) as e: + self.log(f"RPC Error: {e}") + return None + + def process_flashblock_payload(self, payload: dict): + """Process a flashblock payload and track transactions.""" + try: + payload_id = payload.get("payload_id") + index = payload.get("index", 0) + base = payload.get("base", {}) + diff = payload.get("diff", {}) + metadata = payload.get("metadata", {}) + + # Get transactions from diff (RLP-encoded hex strings) + raw_transactions = diff.get("transactions", []) + + self.tracker.total_flashblocks_received += 1 + + # For index 0, extract parent_hash and block_number from base + if index == 0 and base: + self.current_parent_hash = base.get("parent_hash") + block_number_hex = base.get("block_number", "0x0") + self.current_block_number = int(block_number_hex, 16) + self.current_payload_id = payload_id + self.log( + f"New pending block #{self.current_block_number} " + f"(parent: {self.current_parent_hash[:16]}..., payload_id: {payload_id})" + ) + + # Also check metadata for block_number (more reliable for index > 0) + if metadata.get("block_number"): + self.current_block_number = metadata["block_number"] + + # If we don't have block context yet, skip + if self.current_block_number is None or self.current_parent_hash is None: + self.log(f"Flashblock index={index} received but no block context yet") + return + + block_number = self.current_block_number + parent_hash = self.current_parent_hash + + # Skip index 0 (sequencer transactions - these are deterministic) + if index == 0: + self.log( + f"Flashblock idx=0 for block #{block_number}: " + f"{len(raw_transactions)} sequencer txs (not tracking)" + ) + return + + # Decode transaction hashes from RLP + now = time.time() + new_txs = 0 + decode_failures = 0 + + for raw_tx in raw_transactions: + tx_hash = decode_tx_hash(raw_tx) + if not tx_hash: + decode_failures += 1 + continue + + if tx_hash not in self.tracker.transactions: + tracked_tx = TrackedTransaction( + tx_hash=tx_hash, + parent_hash=parent_hash, + flashblock_index=index, + block_number=block_number, + first_seen_at=now, + ) + self.tracker.transactions[tx_hash] = tracked_tx + + if block_number not in self.tracker.txs_by_block: + self.tracker.txs_by_block[block_number] = set() + self.tracker.txs_by_block[block_number].add(tx_hash) + + self.tracker.total_txs_tracked += 1 + new_txs += 1 + + msg = ( + f"Flashblock idx={index} for block #{block_number}: " + f"{len(raw_transactions)} txs ({new_txs} new tracked)" + ) + if decode_failures: + msg += f", {decode_failures} decode failures" + self.log(msg) + + if new_txs > 0: + self.log( + f"Block #{block_number} idx={index}: +{new_txs} txs tracked " + f"(total: {len(self.tracker.txs_by_block.get(block_number, set()))})" + ) + + except Exception as e: + self.log(f"Error processing flashblock: {e}") + import traceback + self.log(traceback.format_exc()) + + def check_canonical_block(self, block: dict): + """Check a canonical block and update transaction statuses.""" + try: + block_number = int(block.get("number", "0x0"), 16) + block_hash = block.get("hash", "unknown") + transactions = block.get("transactions", []) + + # Get tx hashes from canonical block + canonical_tx_hashes = set() + for tx in transactions: + if isinstance(tx, str): + canonical_tx_hashes.add(tx) + elif isinstance(tx, dict): + canonical_tx_hashes.add(tx.get("hash", "")) + + self.tracker.canonical_blocks[block_number] = canonical_tx_hashes + + if block_number > self.tracker.latest_canonical_block: + self.tracker.latest_canonical_block = block_number + tracked_for_block = len(self.tracker.txs_by_block.get(block_number, set())) + + # Check if we can finalize any older blocks + self._finalize_old_blocks() + + except Exception as e: + self.log(f"Error checking canonical block: {e}") + + def _finalize_old_blocks(self): + """Mark transactions as confirmed or missing for blocks that are finalized.""" + finalization_threshold = ( + self.tracker.latest_canonical_block - self.tracker.blocks_to_confirm_after + ) + + blocks_to_check = [ + bn for bn in list(self.tracker.txs_by_block.keys()) + if bn <= finalization_threshold + ] + + for block_number in blocks_to_check: + if block_number not in self.tracker.canonical_blocks: + # Canonical block not yet fetched, try to get it + block = self.get_block_from_rpc(hex(block_number)) + if block: + self.check_canonical_block(block) + else: + continue + + canonical_txs = self.tracker.canonical_blocks.get(block_number, set()) + flashblock_txs = self.tracker.txs_by_block.get(block_number, set()) + + confirmed = 0 + missing = 0 + missing_hashes = [] + + for tx_hash in flashblock_txs: + tracked_tx = self.tracker.transactions.get(tx_hash) + if not tracked_tx or tracked_tx.status != TxStatus.PENDING: + continue + + if tx_hash in canonical_txs: + tracked_tx.status = TxStatus.CONFIRMED + self.tracker.total_confirmed += 1 + confirmed += 1 + else: + tracked_tx.status = TxStatus.MISSING + self.tracker.total_missing += 1 + missing += 1 + missing_hashes.append(tx_hash) + + if confirmed > 0 or missing > 0: + self.log_always( + f"Block #{block_number}:" + f"{confirmed} CONFIRMED, {missing} MISSING" + ) + + if missing > 0: + self.log_always(f"\n{'='*60}") + self.log_always(f"!!! REORG DETECTED - Block #{block_number} !!!") + self.log_always(f"{'='*60}") + self.tracker.total_reorg_count += 1 + self.log_always(f"Reorg Count: {self.tracker.total_reorg_count}") + + # Print one confirmed transaction as example + self.log_always(f"\nCONFIRMED TRANSACTION (example):") + self.log_always("-" * 40) + confirmed_example_printed = False + for tx_hash in flashblock_txs: + if tx_hash in canonical_txs and not confirmed_example_printed: + tracked = self.tracker.transactions.get(tx_hash) + idx = tracked.flashblock_index if tracked else "unknown" + self.log_always(f" [confirmed] {tx_hash} (flashblock idx={idx})") + confirmed_example_printed = True + break + + if not confirmed_example_printed: + self.log_always(f" (no confirmed transactions)") + + self.log_always(f"\nMISSING TRANSACTIONS ({len(missing_hashes)} total):") + self.log_always("-" * 40) + # Group by flashblock index + missing_by_index = {} + for tx_hash in missing_hashes: + tracked = self.tracker.transactions.get(tx_hash) + if tracked: + idx = tracked.flashblock_index + if idx not in missing_by_index: + missing_by_index[idx] = [] + missing_by_index[idx].append(tx_hash) + + for idx in sorted(missing_by_index.keys()): + self.log_always(f"\n Flashblock index {idx} ({len(missing_by_index[idx])} txs):") + for tx_hash in missing_by_index[idx]: + self.log_always(f" [MISSING] {tx_hash}") + break + + # Summary + self.log_always(f"\n{'='*60}") + self.log_always(f"SUMMARY:") + self.log_always(f" Flashblock txs: {len(flashblock_txs)}") + self.log_always(f" Canonical txs: {len(canonical_txs)}") + self.log_always(f" Confirmed: {confirmed}") + self.log_always(f" MISSING: {missing}") + self.log_always(f"{'='*60}\n") + + # Clean up finalized block from tracking + if block_number in self.tracker.txs_by_block: + del self.tracker.txs_by_block[block_number] + + async def poll_canonical_blocks(self): + """Periodically poll RPC for new canonical blocks.""" + last_block = 0 + + while self.running: + try: + block = self.get_block_from_rpc("latest") + if block: + block_number = int(block.get("number", "0x0"), 16) + if block_number > last_block: + self.check_canonical_block(block) + last_block = block_number + except ReorgDetectedException: + # Reorg detected, stop immediately + raise + except Exception as e: + self.log(f"Error polling canonical blocks: {e}") + + await asyncio.sleep(1.5) + + async def subscribe_flashblocks(self): + """Subscribe to flashblocks WebSocket and process messages.""" + reconnect_delay = 1 + max_reconnect_delay = 30 + + while self.running: + try: + self.log_always(f"Connecting to WebSocket: {self.ws_url}") + + async with connect(self.ws_url, ping_interval=20, ping_timeout=30, max_size=10 * 1024 * 1024) as ws: # 10MB limit + self.log_always("Connected to flashblocks WebSocket") + reconnect_delay = 1 # Reset on successful connection + + async for message in ws: + if not self.running: + break + + try: + data = json.loads(message) + + # Direct flashblock payload format (not JSON-RPC wrapped) + # Has payload_id, index, base (for index 0), diff + if "payload_id" in data and "diff" in data: + self.process_flashblock_payload(data) + # JSON-RPC subscription format (fallback) + elif "params" in data and "result" in data["params"]: + result = data["params"]["result"] + if isinstance(result, dict): + self.process_flashblock_payload(result) + + except json.JSONDecodeError: + self.log("Failed to parse WebSocket message") + except Exception as e: + self.log(f"Error processing message: {e}") + + except ConnectionClosed as e: + self.tracker.reconnection_count += 1 + self.log_always( + f"WebSocket connection closed: {e}. " + f"Reconnecting in {reconnect_delay}s... " + f"(reconnection #{self.tracker.reconnection_count})" + ) + except WebSocketException as e: + self.tracker.reconnection_count += 1 + self.log_always( + f"WebSocket error: {e}. " + f"Reconnecting in {reconnect_delay}s..." + ) + except Exception as e: + self.log_always(f"Unexpected error: {e}. Reconnecting in {reconnect_delay}s...") + + if self.running: + await asyncio.sleep(reconnect_delay) + reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay) + + def print_summary(self): + """Print final test summary.""" + duration = time.time() - self.start_time if self.start_time else 0 + + print("\n" + "=" * 60) + print("FLASHBLOCK REORG MITIGATION TEST SUMMARY") + print("=" * 60) + print(f"Duration: {duration:.1f} seconds") + print(f"WebSocket URL: {self.ws_url}") + print(f"RPC URL: {self.rpc_url}") + print(f"Reconnections: {self.tracker.reconnection_count}") + print() + print("Transaction Statistics:") + print(f" Total flashblocks received: {self.tracker.total_flashblocks_received}") + print(f" Total transactions tracked (index > 0): {self.tracker.total_txs_tracked}") + print(f" Confirmed in canonical blocks: {self.tracker.total_confirmed}") + print(f" MISSING (reorged): {self.tracker.total_missing}") + + if self.tracker.total_txs_tracked > 0: + confirmation_rate = ( + self.tracker.total_confirmed / self.tracker.total_txs_tracked * 100 + ) + print(f" Confirmation rate: {confirmation_rate:.2f}%") + + # Count still pending + pending = sum( + 1 for tx in self.tracker.transactions.values() + if tx.status == TxStatus.PENDING + ) + if pending > 0: + print(f" Still pending (not finalized): {pending}") + + print() + if self.tracker.total_missing == 0 and self.tracker.total_txs_tracked > 0: + print("RESULT: PASS - No flashblock reorgs detected") + elif self.tracker.total_txs_tracked == 0: + print("RESULT: INCONCLUSIVE - No transactions were tracked") + else: + print(f"RESULT: FAIL - {self.tracker.total_missing} transactions were reorged!") + print(f"{self.tracker.total_reorg_count} reorgs detected!") + print("=" * 60) + + async def run(self): + """Run the test.""" + self.start_time = time.time() + + print("=" * 60) + print("FLASHBLOCK REORG MITIGATION TEST") + print("=" * 60) + print(f"WebSocket URL: {self.ws_url}") + print(f"RPC URL: {self.rpc_url}") + print(f"Duration: {'unlimited' if self.duration is None else f'{self.duration}s'}") + print(f"Verbose: {self.verbose}") + print() + print("Instructions:") + print("1. Wait for flashblocks to start arriving") + print("2. Manually trigger sequencer/builder switch") + print("3. Observe if any transactions go missing") + print("4. Press Ctrl+C to stop and see summary") + print("=" * 60) + print() + + # Create tasks + tasks = [ + asyncio.create_task(self.subscribe_flashblocks()), + asyncio.create_task(self.poll_canonical_blocks()), + ] + + # Add duration limit if specified + if self.duration: + async def duration_limit(): + await asyncio.sleep(self.duration) + self.running = False + self.log_always(f"Duration limit ({self.duration}s) reached, stopping...") + tasks.append(asyncio.create_task(duration_limit())) + + try: + await asyncio.gather(*tasks) + except ReorgDetectedException as e: + self.log_always(f"\nStopping due to reorg detection: {e}") + except asyncio.CancelledError: + pass + finally: + self.running = False + self.print_summary() + + +def main(): + parser = argparse.ArgumentParser( + description="Test flashblock reorg mitigation across sequencer/builder changes" + ) + parser.add_argument( + "--ws-url", + default="ws://localhost:11112", + help="Flashblocks WebSocket URL (default: ws://localhost:11111)", + ) + parser.add_argument( + "--rpc-url", + default="http://localhost:8124", + help="Ethereum RPC URL (default: http://localhost:8124)", + ) + parser.add_argument( + "--duration", + type=int, + default=None, + help="Test duration in seconds (default: run until Ctrl+C)", + ) + parser.add_argument( + "--verbose", + "-v", + action="store_true", + help="Enable verbose logging", + ) + + args = parser.parse_args() + + tester = FlashblockReorgTester( + ws_url=args.ws_url, + rpc_url=args.rpc_url, + duration=args.duration, + verbose=args.verbose, + ) + + # Handle graceful shutdown + def signal_handler(sig, frame): + print("\n\nReceived interrupt signal, shutting down...") + tester.running = False + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + asyncio.run(tester.run()) + except KeyboardInterrupt: + pass + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/devnet/scripts/test_transfer_leader.sh b/devnet/scripts/test_transfer_leader.sh new file mode 100644 index 00000000..00ddd3ec --- /dev/null +++ b/devnet/scripts/test_transfer_leader.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +# Counter for number of times the script has run +count=0 + +echo "Starting transfer_leader.sh loop (every 120 seconds)" +echo "" + +# Trap SIGINT and SIGTERM for graceful shutdown +trap 'echo -e "\n\nStopped after $count executions"; exit 0' INT TERM + +while true; do + # Increment counter + ((count++)) + + # Display current execution count with timestamp + timestamp=$(date '+%Y-%m-%d %H:%M:%S') + echo "[$timestamp] Execution #$count - Running ./transfer_leader.sh" + + # Run the transfer_leader script + ./transfer-leader.sh + + # Capture exit code + exit_code=$? + if [ $exit_code -ne 0 ]; then + echo " WARNING: transfer_leader.sh exited with code $exit_code" + fi + + # Wait 10 seconds before next execution + echo " Wait 5 seconds..." + echo "" + sleep 5 +done \ No newline at end of file From 05b78ed01f0dcabeb70f39d24035872b9632caa6 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Wed, 11 Feb 2026 09:17:25 +0800 Subject: [PATCH 02/10] feat: add random sleep time --- devnet/scripts/test_transfer_leader.sh | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/devnet/scripts/test_transfer_leader.sh b/devnet/scripts/test_transfer_leader.sh index 00ddd3ec..19bf15d8 100644 --- a/devnet/scripts/test_transfer_leader.sh +++ b/devnet/scripts/test_transfer_leader.sh @@ -26,8 +26,10 @@ while true; do echo " WARNING: transfer_leader.sh exited with code $exit_code" fi - # Wait 10 seconds before next execution - echo " Wait 5 seconds..." + # Wait 5 seconds + random 0-500ms before next execution + random_ms=$((RANDOM % 501)) + sleep_time=$(printf '5.%03d' "$random_ms") + echo " Wait ${sleep_time}s..." echo "" - sleep 5 + sleep "$sleep_time" done \ No newline at end of file From 7a459ef40bc5c6e3576b33c2373176479f934376 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Wed, 11 Feb 2026 14:37:24 +0800 Subject: [PATCH 03/10] test: use reth for seq3 --- devnet/3-op-init.sh | 6 +++--- devnet/docker-compose.yml | 29 +++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/devnet/3-op-init.sh b/devnet/3-op-init.sh index 58cfa541..ebaeca57 100755 --- a/devnet/3-op-init.sh +++ b/devnet/3-op-init.sh @@ -253,9 +253,9 @@ if [ "$CONDUCTOR_ENABLED" = "true" ]; then fi # op-seq3 default EL is always op-geth to ensure multiple seqs' geth and reth compatibilities - OP_GETH_DATADIR3="$(pwd)/data/op-geth-seq3" - rm -rf "$OP_GETH_DATADIR3" - cp -r $OP_GETH_DATADIR $OP_GETH_DATADIR3 + OP_RETH_DATADIR3="$(pwd)/data/op-reth-seq3" + rm -rf "$OP_RETH_DATADIR3" + cp -r $OP_RETH_DATADIR $OP_RETH_DATADIR3 fi if [ "$SEQ_TYPE" = "reth" ]; then diff --git a/devnet/docker-compose.yml b/devnet/docker-compose.yml index 0ead9d9b..fc613ac4 100644 --- a/devnet/docker-compose.yml +++ b/devnet/docker-compose.yml @@ -672,6 +672,35 @@ services: aliases: - op-seq-el2 + op-reth-seq3: + image: "${OP_RETH_IMAGE_TAG}" + container_name: op-reth-seq3 + entrypoint: /entrypoint/reth-seq.sh 3 + env_file: + - ./.env + volumes: + - ./data/op-reth-seq3:/datadir + - ./config-op/jwt.txt:/jwt.txt + - ./config-op/genesis-reth.json:/genesis.json + - ./entrypoint:/entrypoint + - ./.env:/.env + - ./config-op/test.reth.seq.config.toml:/config.toml + ports: + - "8323:8545" + - "30305:30303" + - "30305:30303/udp" + - "11114:1111" # outbound flashblocks ws port + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:8545" ] + interval: 3s + timeout: 3s + retries: 10 + start_period: 3s + networks: + default: + aliases: + - op-seq-el3 + op-seq2: image: "${OP_STACK_IMAGE_TAG}" container_name: op-seq2 From 9d667b85eac688bb6656a91cbc1fa000f836b211 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Wed, 11 Feb 2026 15:03:18 +0800 Subject: [PATCH 04/10] test: update reth3 --- devnet/3-op-init.sh | 2 + devnet/docker-compose.yml | 77 ++++++++++++++++++++------------------- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/devnet/3-op-init.sh b/devnet/3-op-init.sh index ebaeca57..552afb11 100755 --- a/devnet/3-op-init.sh +++ b/devnet/3-op-init.sh @@ -262,9 +262,11 @@ if [ "$SEQ_TYPE" = "reth" ]; then echo -n "1aba031aeb5aa8aedadaf04159d20e7d58eeefb3280176c7d59040476c2ab21b" > $OP_RETH_DATADIR/discovery-secret if [ "$CONDUCTOR_ENABLED" = "true" ]; then echo -n "934ee1c6d37504aa6397b13348d2b5788a0bae5d3a77c71645f8b28be54590d9" > $OP_RETH_DATADIR2/discovery-secret + echo -n "e95afe4502d7b84b03856047e8190a5ba4db55dd16e40945163e5cd9ed620227" > $OP_RETH_DATADIR3/discovery-secret if [ "$FLASHBLOCK_ENABLED" = "true" ]; then echo -n "60a4284707ef52c2b8486410be2bc7bf3bf803fcd85f0059b87b8b772eba62b421ef496e2a44135cfd9e74133e2e2b3e30a4a6c428d3f41e3537eea14eaf9ea3" > $OP_RETH_DATADIR/fb-p2p-key echo -n "6c899cb8b6dadfc34ddde60a57a61b3bdc655247a72feae16b851204fd41596f67a5e73ff50c90ec1755bcf640de7333322cce8612f722732f1244af23be007a" > $OP_RETH_DATADIR2/fb-p2p-key + echo -n "dafdac863b9db9f49fb087272cac8fe09740796cb94fee4ad720f2fb747e12b0e95283c52e4578c65b2bf60776afb6441f8dce733316027274f686950b329dd9" > $OP_RETH_DATADIR3/fb-p2p-key fi fi echo "✅ Set p2p nodekey for reth sequencer" diff --git a/devnet/docker-compose.yml b/devnet/docker-compose.yml index fc613ac4..441e2096 100644 --- a/devnet/docker-compose.yml +++ b/devnet/docker-compose.yml @@ -687,8 +687,8 @@ services: - ./config-op/test.reth.seq.config.toml:/config.toml ports: - "8323:8545" - - "30305:30303" - - "30305:30303/udp" + - "30306:30303" + - "30306:30303/udp" - "11114:1111" # outbound flashblocks ws port healthcheck: test: [ "CMD", "curl", "-f", "http://localhost:8545" ] @@ -790,39 +790,40 @@ services: op-seq2: condition: service_healthy - op-geth-seq3: - image: "${OP_GETH_IMAGE_TAG}" - container_name: op-geth-seq3 - entrypoint: geth - volumes: - - ./data/op-geth-seq3:/datadir - - ./config-op/jwt.txt:/jwt.txt - - ./config-op/test.geth.seq.config.toml:/config.toml - ports: - - "8323:8545" - - "30306:30303" - - "30306:30303/udp" - command: - - --networkid=${CHAIN_ID} - - --verbosity=3 - - --datadir=/datadir - - --db.engine=${DB_ENGINE} - - --config=/config.toml - - --gcmode=archive - - --rollup.disabletxpoolgossip=false - - --nodekeyhex=e95afe4502d7b84b03856047e8190a5ba4db55dd16e40945163e5cd9ed620227 - healthcheck: - test: ["CMD", "wget", "--spider", "--quiet", "http://localhost:8545"] - interval: 3s - timeout: 3s - retries: 10 - start_period: 3s - networks: - default: - aliases: - - op-seq-el3 + # op-geth-seq3 - DISABLED (using op-reth-seq3 instead) + # op-geth-seq3: + # image: "${OP_GETH_IMAGE_TAG}" + # container_name: op-geth-seq3 + # entrypoint: geth + # volumes: + # - ./data/op-geth-seq3:/datadir + # - ./config-op/jwt.txt:/jwt.txt + # - ./config-op/test.geth.seq.config.toml:/config.toml + # ports: + # - "8323:8545" + # - "30306:30303" + # - "30306:30303/udp" + # command: + # - --networkid=${CHAIN_ID} + # - --verbosity=3 + # - --datadir=/datadir + # - --db.engine=${DB_ENGINE} + # - --config=/config.toml + # - --gcmode=archive + # - --rollup.disabletxpoolgossip=false + # - --nodekeyhex=e95afe4502d7b84b03856047e8190a5ba4db55dd16e40945163e5cd9ed620227 + # healthcheck: + # test: ["CMD", "wget", "--spider", "--quiet", "http://localhost:8545"] + # interval: 3s + # timeout: 3s + # retries: 10 + # start_period: 3s + # networks: + # default: + # aliases: + # - op-seq-el3 - # Keep op-seq3 to run op-geth as default EL + # Keep op-seq3 to run op-reth as default EL op-seq3: image: "${OP_STACK_IMAGE_TAG}" container_name: op-seq3 @@ -840,7 +841,7 @@ services: - /app/op-node/bin/op-node - --log.level=info - --log.format=logfmtms - - --l2=http://op-geth-seq3:8552 + - --l2=http://op-reth-seq3:8552 - --l2.jwt-secret=/jwt.txt - --sequencer.enabled - --sequencer.stopped @@ -865,9 +866,9 @@ services: - --safedb.path=/data/safedb - --conductor.enabled=${CONDUCTOR_ENABLED:-false} - --conductor.rpc=http://op-conductor3:8547 - - --l2.enginekind=geth + - --l2.enginekind=reth depends_on: - - op-geth-seq3 + - op-reth-seq3 healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9545"] interval: 3s @@ -889,7 +890,7 @@ services: - --log.level=debug # already existed service - --node.rpc=http://op-seq3:9545 - - --execution.rpc=http://op-geth-seq3:8545 + - --execution.rpc=http://op-reth-seq3:8545 # Raft Config - --raft.server.id=conductor-3 - --raft.storage.dir=/data/raft From a530ab618357631c20784318e683d158119c7236 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Wed, 11 Feb 2026 15:03:38 +0800 Subject: [PATCH 05/10] fix: update scripts --- devnet/scripts/test_flashblock_reorg.py | 54 ++++++------ devnet/scripts/test_transfer_leader.sh | 108 ++++++++++++++++++++---- 2 files changed, 119 insertions(+), 43 deletions(-) diff --git a/devnet/scripts/test_flashblock_reorg.py b/devnet/scripts/test_flashblock_reorg.py index 3d764cd5..e3362c83 100755 --- a/devnet/scripts/test_flashblock_reorg.py +++ b/devnet/scripts/test_flashblock_reorg.py @@ -179,12 +179,12 @@ class ReorgDetectedException(Exception): class FlashblockReorgTester: def __init__( self, - ws_url: str, + ws_urls: List[str], rpc_url: str, duration: Optional[int] = None, verbose: bool = False, ): - self.ws_url = ws_url + self.ws_urls = ws_urls self.rpc_url = rpc_url self.duration = duration self.verbose = verbose @@ -228,7 +228,7 @@ def get_block_from_rpc(self, block_id: str = "latest") -> Optional[dict]: self.log(f"RPC Error: {e}") return None - def process_flashblock_payload(self, payload: dict): + def process_flashblock_payload(self, payload: dict, source_url: str = "unknown"): """Process a flashblock payload and track transactions.""" try: payload_id = payload.get("payload_id") @@ -249,7 +249,7 @@ def process_flashblock_payload(self, payload: dict): self.current_block_number = int(block_number_hex, 16) self.current_payload_id = payload_id self.log( - f"New pending block #{self.current_block_number} " + f"[{source_url}] New pending block #{self.current_block_number} " f"(parent: {self.current_parent_hash[:16]}..., payload_id: {payload_id})" ) @@ -268,7 +268,7 @@ def process_flashblock_payload(self, payload: dict): # Skip index 0 (sequencer transactions - these are deterministic) if index == 0: self.log( - f"Flashblock idx=0 for block #{block_number}: " + f"[{source_url}] Flashblock idx=0 for block #{block_number}: " f"{len(raw_transactions)} sequencer txs (not tracking)" ) return @@ -302,7 +302,7 @@ def process_flashblock_payload(self, payload: dict): new_txs += 1 msg = ( - f"Flashblock idx={index} for block #{block_number}: " + f"[{source_url}] Flashblock idx={index} for block #{block_number}: " f"{len(raw_transactions)} txs ({new_txs} new tracked)" ) if decode_failures: @@ -468,17 +468,17 @@ async def poll_canonical_blocks(self): await asyncio.sleep(1.5) - async def subscribe_flashblocks(self): - """Subscribe to flashblocks WebSocket and process messages.""" + async def subscribe_flashblocks_single(self, ws_url: str): + """Subscribe to a single flashblocks WebSocket and process messages.""" reconnect_delay = 1 max_reconnect_delay = 30 while self.running: try: - self.log_always(f"Connecting to WebSocket: {self.ws_url}") + self.log_always(f"Connecting to WebSocket: {ws_url}") - async with connect(self.ws_url, ping_interval=20, ping_timeout=30, max_size=10 * 1024 * 1024) as ws: # 10MB limit - self.log_always("Connected to flashblocks WebSocket") + async with connect(ws_url, ping_interval=20, ping_timeout=30, max_size=10 * 1024 * 1024) as ws: # 10MB limit + self.log_always(f"Connected to flashblocks WebSocket: {ws_url}") reconnect_delay = 1 # Reset on successful connection async for message in ws: @@ -491,33 +491,33 @@ async def subscribe_flashblocks(self): # Direct flashblock payload format (not JSON-RPC wrapped) # Has payload_id, index, base (for index 0), diff if "payload_id" in data and "diff" in data: - self.process_flashblock_payload(data) + self.process_flashblock_payload(data, source_url=ws_url) # JSON-RPC subscription format (fallback) elif "params" in data and "result" in data["params"]: result = data["params"]["result"] if isinstance(result, dict): - self.process_flashblock_payload(result) + self.process_flashblock_payload(result, source_url=ws_url) except json.JSONDecodeError: - self.log("Failed to parse WebSocket message") + self.log(f"Failed to parse WebSocket message from {ws_url}") except Exception as e: - self.log(f"Error processing message: {e}") + self.log(f"Error processing message from {ws_url}: {e}") except ConnectionClosed as e: self.tracker.reconnection_count += 1 self.log_always( - f"WebSocket connection closed: {e}. " + f"WebSocket {ws_url} closed: {e}. " f"Reconnecting in {reconnect_delay}s... " f"(reconnection #{self.tracker.reconnection_count})" ) except WebSocketException as e: self.tracker.reconnection_count += 1 self.log_always( - f"WebSocket error: {e}. " + f"WebSocket {ws_url} error: {e}. " f"Reconnecting in {reconnect_delay}s..." ) except Exception as e: - self.log_always(f"Unexpected error: {e}. Reconnecting in {reconnect_delay}s...") + self.log_always(f"WebSocket {ws_url} unexpected error: {e}. Reconnecting in {reconnect_delay}s...") if self.running: await asyncio.sleep(reconnect_delay) @@ -531,7 +531,7 @@ def print_summary(self): print("FLASHBLOCK REORG MITIGATION TEST SUMMARY") print("=" * 60) print(f"Duration: {duration:.1f} seconds") - print(f"WebSocket URL: {self.ws_url}") + print(f"WebSocket URLs: {', '.join(self.ws_urls)}") print(f"RPC URL: {self.rpc_url}") print(f"Reconnections: {self.tracker.reconnection_count}") print() @@ -572,7 +572,7 @@ async def run(self): print("=" * 60) print("FLASHBLOCK REORG MITIGATION TEST") print("=" * 60) - print(f"WebSocket URL: {self.ws_url}") + print(f"WebSocket URLs: {', '.join(self.ws_urls)}") print(f"RPC URL: {self.rpc_url}") print(f"Duration: {'unlimited' if self.duration is None else f'{self.duration}s'}") print(f"Verbose: {self.verbose}") @@ -585,11 +585,12 @@ async def run(self): print("=" * 60) print() - # Create tasks + # Create tasks - one subscriber per WebSocket URL tasks = [ - asyncio.create_task(self.subscribe_flashblocks()), - asyncio.create_task(self.poll_canonical_blocks()), + asyncio.create_task(self.subscribe_flashblocks_single(url)) + for url in self.ws_urls ] + tasks.append(asyncio.create_task(self.poll_canonical_blocks())) # Add duration limit if specified if self.duration: @@ -616,8 +617,9 @@ def main(): ) parser.add_argument( "--ws-url", - default="ws://localhost:11112", - help="Flashblocks WebSocket URL (default: ws://localhost:11111)", + nargs="+", + default=["ws://localhost:11111", "ws://localhost:11112"], + help="Flashblocks WebSocket URLs (default: ws://localhost:11111 ws://localhost:11112)", ) parser.add_argument( "--rpc-url", @@ -640,7 +642,7 @@ def main(): args = parser.parse_args() tester = FlashblockReorgTester( - ws_url=args.ws_url, + ws_urls=args.ws_url, rpc_url=args.rpc_url, duration=args.duration, verbose=args.verbose, diff --git a/devnet/scripts/test_transfer_leader.sh b/devnet/scripts/test_transfer_leader.sh index 19bf15d8..2b5eb22a 100644 --- a/devnet/scripts/test_transfer_leader.sh +++ b/devnet/scripts/test_transfer_leader.sh @@ -1,35 +1,109 @@ #!/bin/bash -# Counter for number of times the script has run +# Usage: ./test_transfer_leader.sh [max_runs] +# Repeatedly pauses the current leader's conductor+sequencer to trigger failover. + +max_runs=${1:-0} # 0 = unlimited +BASE_PORT=8547 count=0 -echo "Starting transfer_leader.sh loop (every 120 seconds)" +if [ "$max_runs" -gt 0 ]; then + echo "Starting conductor failover test (max $max_runs runs)" +else + echo "Starting conductor failover test (unlimited runs)" +fi echo "" -# Trap SIGINT and SIGTERM for graceful shutdown trap 'echo -e "\n\nStopped after $count executions"; exit 0' INT TERM while true; do - # Increment counter ((count++)) - - # Display current execution count with timestamp timestamp=$(date '+%Y-%m-%d %H:%M:%S') - echo "[$timestamp] Execution #$count - Running ./transfer_leader.sh" + echo "[$timestamp] Execution #$count" + + # --- Step 1: Find current leader --- + LEADER_PORT=0 + OLD_LEADER=0 + for i in {0..2}; do + PORT=$((BASE_PORT + i)) + IS_LEADER=$(curl -s -X POST -H "Content-Type: application/json" \ + --data '{"jsonrpc":"2.0","method":"conductor_leader","params":[],"id":1}' \ + http://localhost:$PORT 2>/dev/null | jq -r .result) + if [ "$IS_LEADER" = "true" ]; then + LEADER_PORT=$PORT + OLD_LEADER=$((i+1)) + break + fi + done + + if [ "$LEADER_PORT" = "0" ]; then + echo " ERROR: No leader found" + sleep 5 + continue + fi - # Run the transfer_leader script - ./transfer-leader.sh + # Map leader to container names (all using reth) + if [ "$OLD_LEADER" = "1" ]; then + CONDUCTOR_CONTAINER="op-conductor" + SEQ_CONTAINER="op-reth-seq" + else + CONDUCTOR_CONTAINER="op-conductor${OLD_LEADER}" + SEQ_CONTAINER="op-reth-seq${OLD_LEADER}" + fi + echo " Current leader: conductor-$OLD_LEADER ($CONDUCTOR_CONTAINER + $SEQ_CONTAINER)" - # Capture exit code - exit_code=$? - if [ $exit_code -ne 0 ]; then - echo " WARNING: transfer_leader.sh exited with code $exit_code" + # --- Step 2: Pause leader's containers to trigger failover --- + echo " Pausing $CONDUCTOR_CONTAINER and $SEQ_CONTAINER..." + docker pause "$CONDUCTOR_CONTAINER" "$SEQ_CONTAINER" 2>/dev/null + if [ $? -ne 0 ]; then + echo " ERROR: Failed to pause containers" + sleep 5 + continue fi - # Wait 5 seconds + random 0-500ms before next execution + # --- Step 3: Wait for new leader election --- + NEW_LEADER=0 + MAX_WAIT=15 + for ((s=1; s<=MAX_WAIT; s++)); do + sleep 0.5 + for i in {0..2}; do + PORT=$((BASE_PORT + i)) + + # Skip paused conductor + if [ $((i+1)) = "$OLD_LEADER" ]; then + continue + fi + + IS_LEADER=$(curl -s -X POST -H "Content-Type: application/json" \ + --data '{"jsonrpc":"2.0","method":"conductor_leader","params":[],"id":1}' \ + http://localhost:$PORT 2>/dev/null | jq -r .result) + + if [ "$IS_LEADER" = "true" ]; then + NEW_LEADER=$((i+1)) + echo " ✓ Failover completed: conductor-$OLD_LEADER → conductor-$NEW_LEADER (${s}s)" + break 2 + fi + done + done + + if [ "$NEW_LEADER" = "0" ]; then + echo " WARNING: No new leader elected after ${MAX_WAIT}s" + fi + + # --- Step 4: Unpause old leader's containers --- + echo " Unpausing $CONDUCTOR_CONTAINER and $SEQ_CONTAINER..." + docker unpause "$CONDUCTOR_CONTAINER" "$SEQ_CONTAINER" 2>/dev/null + + # --- Wait before next iteration --- random_ms=$((RANDOM % 501)) - sleep_time=$(printf '5.%03d' "$random_ms") - echo " Wait ${sleep_time}s..." + sleep_time=$(printf '10.%03d' "$random_ms") + echo " Waiting ${sleep_time}s before next iteration..." echo "" sleep "$sleep_time" -done \ No newline at end of file + + # Stop if max runs reached + if [ "$max_runs" -gt 0 ] && [ "$count" -ge "$max_runs" ]; then + echo "Completed $max_runs runs. Exiting." + exit 0 + fi +done From d61ad89dacd947daba31e94c96edeedd1c912c44 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Wed, 11 Feb 2026 15:30:28 +0800 Subject: [PATCH 06/10] fix: add sleep --- devnet/scripts/test_transfer_leader.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/devnet/scripts/test_transfer_leader.sh b/devnet/scripts/test_transfer_leader.sh index 2b5eb22a..3658eeef 100644 --- a/devnet/scripts/test_transfer_leader.sh +++ b/devnet/scripts/test_transfer_leader.sh @@ -88,6 +88,10 @@ while true; do if [ "$NEW_LEADER" = "0" ]; then echo " WARNING: No new leader elected after ${MAX_WAIT}s" + else + # Wait for new leader to build blocks before unpausing old leader + echo " Waiting 5s for new leader to build blocks..." + sleep 5 fi # --- Step 4: Unpause old leader's containers --- From 69447a220c04481348f3b78804b86319c3a25465 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Wed, 11 Feb 2026 16:52:52 +0800 Subject: [PATCH 07/10] fix: docker stop --- devnet/scripts/test_transfer_leader.sh | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/devnet/scripts/test_transfer_leader.sh b/devnet/scripts/test_transfer_leader.sh index 3658eeef..6971937b 100644 --- a/devnet/scripts/test_transfer_leader.sh +++ b/devnet/scripts/test_transfer_leader.sh @@ -44,17 +44,15 @@ while true; do # Map leader to container names (all using reth) if [ "$OLD_LEADER" = "1" ]; then - CONDUCTOR_CONTAINER="op-conductor" SEQ_CONTAINER="op-reth-seq" else - CONDUCTOR_CONTAINER="op-conductor${OLD_LEADER}" SEQ_CONTAINER="op-reth-seq${OLD_LEADER}" fi - echo " Current leader: conductor-$OLD_LEADER ($CONDUCTOR_CONTAINER + $SEQ_CONTAINER)" + echo " Current leader: $OLD_LEADER ($SEQ_CONTAINER)" # --- Step 2: Pause leader's containers to trigger failover --- - echo " Pausing $CONDUCTOR_CONTAINER and $SEQ_CONTAINER..." - docker pause "$CONDUCTOR_CONTAINER" "$SEQ_CONTAINER" 2>/dev/null + echo " Pausing $SEQ_CONTAINER..." + docker stop "$SEQ_CONTAINER" 2>/dev/null if [ $? -ne 0 ]; then echo " ERROR: Failed to pause containers" sleep 5 @@ -95,12 +93,12 @@ while true; do fi # --- Step 4: Unpause old leader's containers --- - echo " Unpausing $CONDUCTOR_CONTAINER and $SEQ_CONTAINER..." - docker unpause "$CONDUCTOR_CONTAINER" "$SEQ_CONTAINER" 2>/dev/null + echo " Unpausing $SEQ_CONTAINER..." + docker start "$SEQ_CONTAINER" 2>/dev/null # --- Wait before next iteration --- random_ms=$((RANDOM % 501)) - sleep_time=$(printf '10.%03d' "$random_ms") + sleep_time=$(printf '30.%03d' "$random_ms") echo " Waiting ${sleep_time}s before next iteration..." echo "" sleep "$sleep_time" From 51714fd85091d9c39ba608c3eedd632b770982f3 Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Wed, 11 Feb 2026 16:56:50 +0800 Subject: [PATCH 08/10] naming --- devnet/scripts/test_transfer_leader.sh | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/devnet/scripts/test_transfer_leader.sh b/devnet/scripts/test_transfer_leader.sh index 6971937b..0529a10d 100644 --- a/devnet/scripts/test_transfer_leader.sh +++ b/devnet/scripts/test_transfer_leader.sh @@ -1,7 +1,7 @@ #!/bin/bash # Usage: ./test_transfer_leader.sh [max_runs] -# Repeatedly pauses the current leader's conductor+sequencer to trigger failover. +# Repeatedly stops the current leader's conductor+sequencer to trigger failover. max_runs=${1:-0} # 0 = unlimited BASE_PORT=8547 @@ -50,11 +50,11 @@ while true; do fi echo " Current leader: $OLD_LEADER ($SEQ_CONTAINER)" - # --- Step 2: Pause leader's containers to trigger failover --- - echo " Pausing $SEQ_CONTAINER..." + # --- Step 2: Stop leader's containers to trigger failover --- + echo " Stopping $SEQ_CONTAINER..." docker stop "$SEQ_CONTAINER" 2>/dev/null if [ $? -ne 0 ]; then - echo " ERROR: Failed to pause containers" + echo " ERROR: Failed to stop containers" sleep 5 continue fi @@ -67,7 +67,7 @@ while true; do for i in {0..2}; do PORT=$((BASE_PORT + i)) - # Skip paused conductor + # Skip stopped sequencer if [ $((i+1)) = "$OLD_LEADER" ]; then continue fi @@ -87,13 +87,13 @@ while true; do if [ "$NEW_LEADER" = "0" ]; then echo " WARNING: No new leader elected after ${MAX_WAIT}s" else - # Wait for new leader to build blocks before unpausing old leader + # Wait for new leader to build blocks before starting old leader echo " Waiting 5s for new leader to build blocks..." sleep 5 fi - # --- Step 4: Unpause old leader's containers --- - echo " Unpausing $SEQ_CONTAINER..." + # --- Step 4: Start old leader's containers --- + echo " Starting $SEQ_CONTAINER..." docker start "$SEQ_CONTAINER" 2>/dev/null # --- Wait before next iteration --- From f428ce1ba29222a44220245ca90d515596ebb37c Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Wed, 11 Feb 2026 18:22:52 +0800 Subject: [PATCH 09/10] fix: cleanup --- devnet/scripts/test_flashblock_reorg.py | 39 +++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/devnet/scripts/test_flashblock_reorg.py b/devnet/scripts/test_flashblock_reorg.py index e3362c83..a8906f68 100755 --- a/devnet/scripts/test_flashblock_reorg.py +++ b/devnet/scripts/test_flashblock_reorg.py @@ -171,6 +171,9 @@ class BlockTracker: # Blocks to finalize after N confirmations blocks_to_confirm_after: int = 2 + # Memory management: keep only recent canonical blocks for debugging + max_canonical_blocks_to_keep: int = 30 + class ReorgDetectedException(Exception): """Raised when flashblock transactions are missing from canonical chain.""" @@ -448,6 +451,42 @@ def _finalize_old_blocks(self): if block_number in self.tracker.txs_by_block: del self.tracker.txs_by_block[block_number] + # Clean up finalized transactions to prevent unbounded memory growth + txs_to_remove = [ + tx_hash for tx_hash, tx in self.tracker.transactions.items() + if tx.block_number == block_number and tx.status != TxStatus.PENDING + ] + for tx_hash in txs_to_remove: + del self.tracker.transactions[tx_hash] + + if len(txs_to_remove) > 0: + self.log(f"Cleaned up {len(txs_to_remove)} finalized transactions from block #{block_number}") + + # Clean up old canonical blocks to prevent unbounded memory growth + self._cleanup_old_canonical_blocks() + + def _cleanup_old_canonical_blocks(self): + """Remove old canonical blocks to prevent memory growth.""" + if len(self.tracker.canonical_blocks) <= self.tracker.max_canonical_blocks_to_keep: + return + + # Keep only the most recent max_canonical_blocks_to_keep blocks + cutoff_block = self.tracker.latest_canonical_block - self.tracker.max_canonical_blocks_to_keep + + old_blocks = [ + bn for bn in list(self.tracker.canonical_blocks.keys()) + if bn < cutoff_block + ] + + for bn in old_blocks: + del self.tracker.canonical_blocks[bn] + + if len(old_blocks) > 0: + self.log( + f"Cleaned up {len(old_blocks)} old canonical blocks " + f"(keeping last {self.tracker.max_canonical_blocks_to_keep})" + ) + async def poll_canonical_blocks(self): """Periodically poll RPC for new canonical blocks.""" last_block = 0 From ade172cfe3a7893339e657f1f97703685d36b52b Mon Sep 17 00:00:00 2001 From: "lucas.lim" Date: Fri, 20 Mar 2026 16:17:56 +0800 Subject: [PATCH 10/10] revert: restore 3-op-init.sh and docker-compose.yml to main Co-Authored-By: Claude Opus 4.6 (1M context) --- devnet/3-op-init.sh | 8 ++- devnet/docker-compose.yml | 102 ++++++++++++++------------------------ 2 files changed, 39 insertions(+), 71 deletions(-) diff --git a/devnet/3-op-init.sh b/devnet/3-op-init.sh index 552afb11..58cfa541 100755 --- a/devnet/3-op-init.sh +++ b/devnet/3-op-init.sh @@ -253,20 +253,18 @@ if [ "$CONDUCTOR_ENABLED" = "true" ]; then fi # op-seq3 default EL is always op-geth to ensure multiple seqs' geth and reth compatibilities - OP_RETH_DATADIR3="$(pwd)/data/op-reth-seq3" - rm -rf "$OP_RETH_DATADIR3" - cp -r $OP_RETH_DATADIR $OP_RETH_DATADIR3 + OP_GETH_DATADIR3="$(pwd)/data/op-geth-seq3" + rm -rf "$OP_GETH_DATADIR3" + cp -r $OP_GETH_DATADIR $OP_GETH_DATADIR3 fi if [ "$SEQ_TYPE" = "reth" ]; then echo -n "1aba031aeb5aa8aedadaf04159d20e7d58eeefb3280176c7d59040476c2ab21b" > $OP_RETH_DATADIR/discovery-secret if [ "$CONDUCTOR_ENABLED" = "true" ]; then echo -n "934ee1c6d37504aa6397b13348d2b5788a0bae5d3a77c71645f8b28be54590d9" > $OP_RETH_DATADIR2/discovery-secret - echo -n "e95afe4502d7b84b03856047e8190a5ba4db55dd16e40945163e5cd9ed620227" > $OP_RETH_DATADIR3/discovery-secret if [ "$FLASHBLOCK_ENABLED" = "true" ]; then echo -n "60a4284707ef52c2b8486410be2bc7bf3bf803fcd85f0059b87b8b772eba62b421ef496e2a44135cfd9e74133e2e2b3e30a4a6c428d3f41e3537eea14eaf9ea3" > $OP_RETH_DATADIR/fb-p2p-key echo -n "6c899cb8b6dadfc34ddde60a57a61b3bdc655247a72feae16b851204fd41596f67a5e73ff50c90ec1755bcf640de7333322cce8612f722732f1244af23be007a" > $OP_RETH_DATADIR2/fb-p2p-key - echo -n "dafdac863b9db9f49fb087272cac8fe09740796cb94fee4ad720f2fb747e12b0e95283c52e4578c65b2bf60776afb6441f8dce733316027274f686950b329dd9" > $OP_RETH_DATADIR3/fb-p2p-key fi fi echo "✅ Set p2p nodekey for reth sequencer" diff --git a/devnet/docker-compose.yml b/devnet/docker-compose.yml index 441e2096..0ead9d9b 100644 --- a/devnet/docker-compose.yml +++ b/devnet/docker-compose.yml @@ -672,35 +672,6 @@ services: aliases: - op-seq-el2 - op-reth-seq3: - image: "${OP_RETH_IMAGE_TAG}" - container_name: op-reth-seq3 - entrypoint: /entrypoint/reth-seq.sh 3 - env_file: - - ./.env - volumes: - - ./data/op-reth-seq3:/datadir - - ./config-op/jwt.txt:/jwt.txt - - ./config-op/genesis-reth.json:/genesis.json - - ./entrypoint:/entrypoint - - ./.env:/.env - - ./config-op/test.reth.seq.config.toml:/config.toml - ports: - - "8323:8545" - - "30306:30303" - - "30306:30303/udp" - - "11114:1111" # outbound flashblocks ws port - healthcheck: - test: [ "CMD", "curl", "-f", "http://localhost:8545" ] - interval: 3s - timeout: 3s - retries: 10 - start_period: 3s - networks: - default: - aliases: - - op-seq-el3 - op-seq2: image: "${OP_STACK_IMAGE_TAG}" container_name: op-seq2 @@ -790,40 +761,39 @@ services: op-seq2: condition: service_healthy - # op-geth-seq3 - DISABLED (using op-reth-seq3 instead) - # op-geth-seq3: - # image: "${OP_GETH_IMAGE_TAG}" - # container_name: op-geth-seq3 - # entrypoint: geth - # volumes: - # - ./data/op-geth-seq3:/datadir - # - ./config-op/jwt.txt:/jwt.txt - # - ./config-op/test.geth.seq.config.toml:/config.toml - # ports: - # - "8323:8545" - # - "30306:30303" - # - "30306:30303/udp" - # command: - # - --networkid=${CHAIN_ID} - # - --verbosity=3 - # - --datadir=/datadir - # - --db.engine=${DB_ENGINE} - # - --config=/config.toml - # - --gcmode=archive - # - --rollup.disabletxpoolgossip=false - # - --nodekeyhex=e95afe4502d7b84b03856047e8190a5ba4db55dd16e40945163e5cd9ed620227 - # healthcheck: - # test: ["CMD", "wget", "--spider", "--quiet", "http://localhost:8545"] - # interval: 3s - # timeout: 3s - # retries: 10 - # start_period: 3s - # networks: - # default: - # aliases: - # - op-seq-el3 + op-geth-seq3: + image: "${OP_GETH_IMAGE_TAG}" + container_name: op-geth-seq3 + entrypoint: geth + volumes: + - ./data/op-geth-seq3:/datadir + - ./config-op/jwt.txt:/jwt.txt + - ./config-op/test.geth.seq.config.toml:/config.toml + ports: + - "8323:8545" + - "30306:30303" + - "30306:30303/udp" + command: + - --networkid=${CHAIN_ID} + - --verbosity=3 + - --datadir=/datadir + - --db.engine=${DB_ENGINE} + - --config=/config.toml + - --gcmode=archive + - --rollup.disabletxpoolgossip=false + - --nodekeyhex=e95afe4502d7b84b03856047e8190a5ba4db55dd16e40945163e5cd9ed620227 + healthcheck: + test: ["CMD", "wget", "--spider", "--quiet", "http://localhost:8545"] + interval: 3s + timeout: 3s + retries: 10 + start_period: 3s + networks: + default: + aliases: + - op-seq-el3 - # Keep op-seq3 to run op-reth as default EL + # Keep op-seq3 to run op-geth as default EL op-seq3: image: "${OP_STACK_IMAGE_TAG}" container_name: op-seq3 @@ -841,7 +811,7 @@ services: - /app/op-node/bin/op-node - --log.level=info - --log.format=logfmtms - - --l2=http://op-reth-seq3:8552 + - --l2=http://op-geth-seq3:8552 - --l2.jwt-secret=/jwt.txt - --sequencer.enabled - --sequencer.stopped @@ -866,9 +836,9 @@ services: - --safedb.path=/data/safedb - --conductor.enabled=${CONDUCTOR_ENABLED:-false} - --conductor.rpc=http://op-conductor3:8547 - - --l2.enginekind=reth + - --l2.enginekind=geth depends_on: - - op-reth-seq3 + - op-geth-seq3 healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9545"] interval: 3s @@ -890,7 +860,7 @@ services: - --log.level=debug # already existed service - --node.rpc=http://op-seq3:9545 - - --execution.rpc=http://op-reth-seq3:8545 + - --execution.rpc=http://op-geth-seq3:8545 # Raft Config - --raft.server.id=conductor-3 - --raft.storage.dir=/data/raft