Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ CUSTOM_CHAINS_CONFIG_PATH=# optional
DEFAULT_USER_OPS_BATCH_GAS_LIMIT=# default: 8_000_000
DEFAULT_NUM_SIMULATOR_WORKERS_PER_CHAIN=# default: 1
DEFAULT_SIMULATOR_WORKER_CONCURRENCY=# default: 10
DEFAULT_SIMULATOR_TRACE_CALL_RETRY_DELAY=# default: 2
DEFAULT_SIMULATOR_TRACE_CALL_RETRY_DELAY=# default: 75 milliseconds

DEFAULT_SIMULATOR_STALLED_JOBS_RETRY_INTERVAL=# default: 5
DEFAULT_SIMULATOR_STALLED_JOBS_RETRY_INTERVAL=# default: 1 second
DEFAULT_SIMULATOR_RATE_LIMIT_MAX_REQUESTS_PER_INTERVAL=# default: 100
DEFAULT_SIMULATOR_RATE_LIMIT_DURATION=# default: 1

DEFAULT_EXECUTOR_STALLED_JOBS_RETRY_INTERVAL=# default: 5
DEFAULT_EXECUTOR_STALLED_JOBS_RETRY_INTERVAL=# default: 1 second
DEFAULT_EXECUTOR_RATE_LIMIT_MAX_REQUESTS_PER_INTERVAL=# default: 100
DEFAULT_EXECUTOR_RATE_LIMIT_DURATION=# default: 1

Expand All @@ -34,7 +34,7 @@ REDIS_PORT=# default: 6379
## executor
EXECUTOR_QUEUE_JOB_ATTEMPTS=# default: 3
EXECUTOR_QUEUE_JOB_BACKOFF_TYPE=# default: fixed, options: fixed|exponential
EXECUTOR_QUEUE_JOB_BACKOFF_DELAY=# default: 1 (second)
EXECUTOR_QUEUE_JOB_BACKOFF_DELAY=# default: 75 milliseconds

## health check
INITIAL_HEALTH_CHECK_DELAY=# default: 1
Expand All @@ -54,7 +54,7 @@ ORACLE_PRICE_FEED_TTL=# default: 60 (seconds)
## simulator
SIMULATOR_QUEUE_JOB_ATTEMPTS=# default: 10
SIMULATOR_QUEUE_JOB_BACKOFF_TYPE=# default: fixed, options: fixed|exponential
SIMULATOR_QUEUE_JOB_BACKOFF_DELAY=# default: 1 (seconds)
SIMULATOR_QUEUE_JOB_BACKOFF_DELAY=# default: 75 milliseconds

## workers
NUM_CLUSTER_WORKERS=# default: 1
Expand Down
7 changes: 7 additions & 0 deletions src/api/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import process from "node:process";
import { ChainsService } from "@/chains";
import { parsePort } from "@/common";
import { Logger } from "@/core/logger";
import { GasManagerService } from "@/gas-manager";
import { HealthCheckService } from "@/health-check";
import { NodeService } from "@/node";
import { RpcManagerService } from "@/rpc-manager";
Expand All @@ -25,6 +26,7 @@ export async function bootstrap() {
const healthCheckService = Container.get(HealthCheckService);
const nodeService = Container.get(NodeService);
const rpcManagerService = Container.get(RpcManagerService);
const gasManagerService = Container.get(GasManagerService);

await nodeService.readVersion();

Expand Down Expand Up @@ -76,6 +78,11 @@ export async function bootstrap() {
);
break;
}

case "gasInfoSync": {
gasManagerService.syncGasInfo(data.chainId, data.gasInfo);
break;
}
}
});
}
49 changes: 49 additions & 0 deletions src/master/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { BatcherService } from "@/batcher";
import { ChainsService } from "@/chains";
import { Logger } from "@/core/logger";
import { ExecutorService } from "@/executor";
import { GasManagerService } from "@/gas-manager";
import { HealthCheckService } from "@/health-check";
import { NodeService } from "@/node";
import { RpcChainConfig, RpcManagerService } from "@/rpc-manager";
Expand All @@ -16,12 +17,16 @@ export async function bootstrap() {
const logger = Container.get(Logger);
const batcherService = Container.get(BatcherService);
const chainsService = Container.get(ChainsService);
const gasManagerService = Container.get(GasManagerService);
const healthCheckService = Container.get(HealthCheckService);
const nodeService = Container.get(NodeService);
const workersService = Container.get(WorkersService);
const rpcManagerService = Container.get(RpcManagerService);

// Chain settings will be initialized
await chainsService.initialize();

// Workers will be spawned
await workersService.initialize();

const chainsSettings = chainsService.getChainsSettings();
Expand Down Expand Up @@ -98,9 +103,52 @@ export async function bootstrap() {
},
);
})
// RPC providers will be initialized
.setup(rpcConfigs, true);

const gasManagerConfigs = chainsSettings.map((chainSettings) => ({
chainId: chainSettings.chainId,
gasFetchInterval: chainSettings.gasCacheDuration,
}));

const threadGasInfoSync = true;

// Gas manager will be initialized
await gasManagerService.initialize(gasManagerConfigs, threadGasInfoSync);

gasManagerService.on("sync", (gasInfoByChain) => {
workersService.notifyClusterWorkers({
type: "gasInfoSync",
data: gasInfoByChain,
});

workersService.notifyThreadWorkers(
{
type: "gasInfoSync",
data: gasInfoByChain,
},
{
chainId: gasInfoByChain.chainId,
workerType: "simulator",
},
);

workersService.notifyThreadWorkers(
{
type: "gasInfoSync",
data: gasInfoByChain,
},
{
chainId: gasInfoByChain.chainId,
workerType: "executor",
},
);
});

// Batcher will be initialized and this can be initialzed anytime after chain service.
await batcherService.initialize();

// node service will be initialized which depends on RPC manager.
await nodeService.initialize();

await healthCheckService
Expand Down Expand Up @@ -153,5 +201,6 @@ export async function bootstrap() {
);
}
})
// Finally the health service is initialized and workers will be activated for execution processing
.initialize();
}
192 changes: 100 additions & 92 deletions src/modules/batcher/batcher.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ChainsService } from "@/chains";
import { withTrace } from "@/common";
import { type ConfigType, InjectConfig } from "@/core/config";
import { Logger } from "@/core/logger";
import { ExecutorService } from "@/executor";
Expand Down Expand Up @@ -68,100 +69,107 @@ export class BatcherService {
maxBatchGasLimitCap: bigint,
) {
try {
const { gasLimitOverrides } =
this.chainsService.getChainSettings(chainId);

const executorJobCount = await this.executorService.getJobCounts(chainId);

// UserOp blocks are still waiting to be executed. So wait for new block creation which will result in
// more userOps being batched. This is a smart batching feature
if (executorJobCount.waiting) {
return;
}

const simulatorCompleteJobs =
await this.simulatorService.getCompletedJobs(chainId);

// Simulated jobs will be fetched from completed simulator queue
if (!simulatorCompleteJobs.length) {
return;
}

const batches: MeeUserOpBatch[] = [];

let batch: MeeUserOpBatch = {
batchGasLimit: 0n,
meeUserOps: [],
};

for (const { data: simulatedJobData } of simulatorCompleteJobs) {
const { meeUserOp } = simulatedJobData;
const { meeUserOpHash } = meeUserOp;
const unpackedUserOp = unpackPackedUserOp(meeUserOp.userOp);

const paymasterVerificationGasLimit =
gasLimitOverrides?.paymasterVerificationGasLimit ||
BigInt(this.gasEstimatorConfiguration.paymasterVerificationGas);

const maxGasLimit = // add all limits except preVerificationGas (that one is not spent during execution)
unpackedUserOp.verificationGasLimit +
unpackedUserOp.callGasLimit +
paymasterVerificationGasLimit +
BigInt(this.gasEstimatorConfiguration.paymasterPostOpGas);

// If a userOp's gas limit is more than batch gas limit cap. There is something wrong in
// the gas limit itself. This userOp will be skipped from being executed
if (maxGasLimit > maxBatchGasLimitCap) {
this.logger.error(
{
meeUserOpHash,
maxGasLimit,
maxBatchGasLimitCap,
},
"Invalid maxGasLimit is being used",
);

// Update error status in background to improve latency
this.storageService.setUserOpCustomField(
meeUserOpHash,
"error",
"Invalid maxGasLimit",
);

continue;
}
await withTrace(
"batcher.userOpBatching",
async () => {
const { gasLimitOverrides } =
this.chainsService.getChainSettings(chainId);

const executorJobCount =
await this.executorService.getJobCounts(chainId);

// UserOp blocks are still waiting to be executed. So wait for new block creation which will result in
// more userOps being batched. This is a smart batching feature
if (executorJobCount.waiting) {
return;
}

const simulatorCompleteJobs =
await this.simulatorService.getCompletedJobs(chainId);

// Simulated jobs will be fetched from completed simulator queue
if (!simulatorCompleteJobs.length) {
return;
}

const batches: MeeUserOpBatch[] = [];

let batch: MeeUserOpBatch = {
batchGasLimit: 0n,
meeUserOps: [],
};

for (const { data: simulatedJobData } of simulatorCompleteJobs) {
const { meeUserOp } = simulatedJobData;
const { meeUserOpHash } = meeUserOp;
const unpackedUserOp = unpackPackedUserOp(meeUserOp.userOp);

const paymasterVerificationGasLimit =
gasLimitOverrides?.paymasterVerificationGasLimit ||
BigInt(this.gasEstimatorConfiguration.paymasterVerificationGas);

const maxGasLimit = // add all limits except preVerificationGas (that one is not spent during execution)
unpackedUserOp.verificationGasLimit +
unpackedUserOp.callGasLimit +
paymasterVerificationGasLimit +
BigInt(this.gasEstimatorConfiguration.paymasterPostOpGas);

// If a userOp's gas limit is more than batch gas limit cap. There is something wrong in
// the gas limit itself. This userOp will be skipped from being executed
if (maxGasLimit > maxBatchGasLimitCap) {
this.logger.error(
{
meeUserOpHash,
maxGasLimit,
maxBatchGasLimitCap,
},
"Invalid maxGasLimit is being used",
);

// Update error status in background to improve latency
this.storageService.setUserOpCustomField(
meeUserOpHash,
"error",
"Invalid maxGasLimit",
);

continue;
}

// If a userOp doesn't fit into the block, the existing block will be finalized as full block
// and the userOp will be assigned for a new batch
if (batch.batchGasLimit + maxGasLimit > maxBatchGasLimitCap) {
batches.push(batch);
batch = { batchGasLimit: 0n, meeUserOps: [] };
}

batch.meeUserOps.push(meeUserOp);
batch.batchGasLimit += maxGasLimit;
}

if (batch.meeUserOps.length > 0) {
batches.push(batch);
}

for (const batch of batches) {
this.logger.info(
{
from: "batcher",
chainId,
meeUserOpHashes: batch.meeUserOps.map(
(meeUserOp) => meeUserOp.meeUserOpHash,
),
},
`Generated a block of ${batch.meeUserOps.length} meeUserOp(s) for chain (${chainId}) with blockGasLimit (${batch.batchGasLimit})`,
);
}

// If a userOp doesn't fit into the block, the existing block will be finalized as full block
// and the userOp will be assigned for a new batch
if (batch.batchGasLimit + maxGasLimit > maxBatchGasLimitCap) {
batches.push(batch);
batch = { batchGasLimit: 0n, meeUserOps: [] };
}
await this.executorService.addJobs(chainId, batches);

batch.meeUserOps.push(meeUserOp);
batch.batchGasLimit += maxGasLimit;
}

if (batch.meeUserOps.length > 0) {
batches.push(batch);
}

for (const batch of batches) {
this.logger.info(
{
from: "batcher",
chainId,
meeUserOpHashes: batch.meeUserOps.map(
(meeUserOp) => meeUserOp.meeUserOpHash,
),
},
`Generated a block of ${batch.meeUserOps.length} meeUserOp(s) for chain (${chainId}) with blockGasLimit (${batch.batchGasLimit})`,
);
}

await this.executorService.addJobs(chainId, batches);

await Promise.all(simulatorCompleteJobs.map((job) => job.remove()));
await Promise.all(simulatorCompleteJobs.map((job) => job.remove()));
},
{ chainId },
)();
} catch (error) {
this.logger.error(error);
}
Expand Down
18 changes: 9 additions & 9 deletions src/modules/chains/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,10 @@ export const CHAIN_CONFIG_DEFAULTS = {
simulator: {
stalledJobsRetryInterval: parseSeconds(
process.env.DEFAULT_SIMULATOR_STALLED_JOBS_RETRY_INTERVAL,
5,
1,
{
min: 5,
max: 30,
min: 1,
max: 10,
},
),
rateLimitMaxRequestsPerInterval: parseNum(
Expand Down Expand Up @@ -276,21 +276,21 @@ export const CHAIN_CONFIG_DEFAULTS = {
min: 1,
},
),
traceCallRetryDelay: parseSeconds(
traceCallRetryDelay: parseNum(
process.env.DEFAULT_SIMULATOR_TRACE_CALL_RETRY_DELAY,
2,
75,
{
min: 1,
min: 75,
},
),
},
executor: {
stalledJobsRetryInterval: parseSeconds(
process.env.DEFAULT_EXECUTOR_STALLED_JOBS_RETRY_INTERVAL,
5,
1,
{
min: 5,
max: 30,
min: 1,
max: 10,
},
),
rateLimitMaxRequestsPerInterval: parseNum(
Expand Down
Loading