-
Notifications
You must be signed in to change notification settings - Fork 3
Entities
Find below the main dpcp library entities.
The Agent class is the foundation for event-emitting entities in the dpcp-library. It extends Node.js's EventEmitter.
- Purpose: Provides a base for emitting and handling events with a unique ID.
-
Key Features:
- Generates a unique ID using
crypto.randomUUID(). - Used as a parent class for
ReportingAgentandMonitoringAgent.
- Generates a unique ID using
ReportingAgent is an agent responsible for relaying status updates (signals) from individual nodes to the rest of the system, particularly the MonitoringAgent. It acts as a communication bridge, ensuring that node-specific events are reported and tracked across distributed chains.
-
Purpose: The
ReportingAgenttracks and communicates the state of a specific node within a chain. It’s instantiated with achainIdidentifying the processing chain and anodeIdwhich identifies the node. This provides a link between events and their sources. -
Signal Types: It supports two types of signals:
- Local Signals: Events relevant only to the specific local node or chain, such as a status update like "node_completed".
- Global Signals: Events broadcasted to the remote root monitoring, such as a chain-wide notification like "chain_deployed".
-
Key Features:
-
Event Emission: It emits signals like
global-signalorlocal-signal, which are picked up by listeners to update the system state. -
Authorization: It requires authorization from the parent agent (
MonitoringAgent) to ensure only legitimate instances are created.
-
-
Usage Example: When a node finishes processing data, the
ReportingAgentmight emit alocal-signalwith{ status: "node_completed" }, which theMonitoringAgentthen uses to update the chain’s workflow status.
MonitoringAgent is the central agent service for tracking the state of all chains and nodes in the system. It maintains a workflow object that serves as a live representation of the distributed pipeline’s status.
-
Purpose: This singleton agent oversees the entire pipeline, collecting and combining status updates from
ReportingAgentinstances, to give an overall view of the system. It’s designed to ensure that developers can monitor and react to the health and progress of chains and nodes, whether local or remote. -
Key Components:
-
Workflow Structure: The
workflowis a map where each key is achainId, and the value is aWorkflowNodecontaining:-
status: The detail of the node states within the chain (e.g.,{ "node1": { "node_completed": true } }). -
setupCount: Tracks how many nodes in the chain have completed setup. -
setupCompleted: Indicates if all nodes in the chain are fully configured. -
deployed: A flags indicating if the chain has been deployed.
-
-
Remote Monitoring: It maintains a
remoteMonitoringHostmap to link chains to their respective monitoring endpoints, supporting distributed setups. -
Callbacks: It uses
reportingCallbackfor local updates andbroadcastReportingCallbackfor global notifications, allowing custom handling of signals.
-
Workflow Structure: The
-
Key Features:
-
Signal Processing: Listens to
ReportingAgentevents, updating theworkflowaccordingly. For example, aglobal-signalmight trigger a broadcast to remote hosts. While alocal-signalwould update the chain’s status locally. -
Chain Status Management: Offers methods like
getChainStatus,setChainDeployed, andsetChainSetupCompletedto query and manipulate chain states.
-
Signal Processing: Listens to
-
Usage Example: When a node emits a
local-signalindicating completion,MonitoringAgentupdates theworkflowand if the chain is fully set up, might trigger a pending chain start via a callback.
The Node class represents an individual processing unit within a chain, tasked with executing a series of data processing pipelines. This is the Core processing unit within a chain.
-
Responsibilities:
- Executes multiple pipelines concurrently.
- Tracks execution state and manages data flow.
- Reports status updates via the ReportingAgent.
- Routes output data to subsequent nodes.
- Supports child chains for nested workflows.
-
Purpose: A
Nodeis responsible for transforming input data through a sequence of processors (ProcessorPipeline). It operates within a chain, coordinating with other nodes and reporting its progress. -
Key Components:
-
Pipelines: An array of
ProcessorPipelineobjects, where each pipeline is a series ofPipelineProcessorinstances that transform data sequentially. -
Status: Tracks the node’s state (e.g.,
NODE_PENDING,NODE_IN_PROGRESS,NODE_COMPLETED) and any errors. -
Execution Queue: A promise-based queue (
executionQueue) ensures sequential processing, even in asynchronous environments. -
Reporting: Integrates with a
ReportingAgentto emit status updates vianotify.
-
Pipelines: An array of
-
Key Features:
- Manages dependencies and execution queues.
- Provides execution control (suspend, resume) via the NodeStatusManager.
- Configurable for both local and remote routing.
-
Data Processing: The
executemethod processes data through pipelines in batches, with progress tracking (progress) and support for suspension/resumption viaNodeStatusManager. -
Child Chains: Supports nested chains (
chainConfig) that can run in parallel or serial. -
Next Node Linking: Uses
setNextNodeInfoto define the next node in the chain (local or remote). -
Termination: Handles cleanup and data forwarding (
sendData,terminate) based on chain type (persistent, auto-deleting...).
-
Usage Example: A
Nodemight receive{ input: "data" }, process it through three pipelines, and forward the result to the next node, notifyingNODE_COMPLETEDvia itsReportingAgent.
The NodeStatusManager handles status updates and signals for a Node.
- Purpose: Manages node state transitions like suspend or resume.
-
Key Features:
- Maintains a signal queue (
NODE_SUSPEND,NODE_RESUME, etc...). - Suspends or resumes node execution as needed.
- Used internally by
Node.
- Maintains a signal queue (
NodeSupervisor is the orchestrator of the framework, managing the lifecycle of nodes and chains, and coordinating their execution across local and remote environments. It is the central orchestrator of the dpcp library.
-
Responsibilities:
- Manages the lifecycle of nodes (creation, execution, deletion).
- Creates and coordinates processing chains.
- Distributes chains across local and remote nodes.
- Handles execution and maintains relationships between nodes.
- Processes signals for node and chain actions.
-
Signal Handling:
The NodeSupervisor intercepts and manages a range of signals to control node and chain behavior. Key signals include:-
NODE_SETUP: Initializes a node with a provided configuration. -
NODE_CREATE: Creates a new node with specified parameters. -
NODE_DELETE: Removes an existing node by its ID. -
NODE_PAUSE: Pauses the execution of a node. -
NODE_DELAY: Delays the execution of a node for a specified duration. -
NODE_RUN: Executes a node with provided data. -
NODE_SEND_DATA: Sends data to a node. -
CHAIN_PREPARE: Prepares chain distribution for execution. -
CHAIN_START: Starts a chain with the provided data. -
CHAIN_START_PENDING: Begins execution of a chain that was pending. -
CHAIN_DEPLOY: Deploys a chain with specific configurations and data. -
NODE_SUSPEND/NODE_RESUME: Suspends or resumes node execution.
-
-
Purpose: This singleton class supervises the creation, setup, execution, and deletion of nodes and chains, acting as the control plane for the distributed system. It ensures that nodes are properly configured, executed in the correct order, and effectively monitored.
-
Key Components:
-
Nodes Map: A
Mapstoring all localNodeinstances, keyed by theirid. -
Chains Map: Tracks chain relationships (
ChainRelation), including root nodes and configurations. - Child Chains: Manages parent-child chain relationships for nested workflows.
-
Callbacks: Supports
remoteServiceCallback,broadcastSetupCallback, andnodeStatusCallbackfor interacting with external systems.
-
Nodes Map: A
-
Key Features:
-
Node Lifecycle: Methods like
createNode,setupNode,runNode, anddeleteNodehandle the full lifecycle, with signals likeNODE_SETUPorNODE_RUNtriggering actions. -
Chain Management:
deployChainandstartChainorchestrate chain deployment and execution, handling both local and remote nodes viaprepareChainDistribution. - Remote Coordination: Broadcasts setup signals to remote hosts and forwards data to the next node in the chain.
-
Logging: Integrates with
NodeSupervisorLoggerproviding insights into chains and workflows.
-
Node Lifecycle: Methods like
-
Usage Example: A developer might call
handleRequest({ signal: "CHAIN_DEPLOY", config: [...], data: {...} }), promptingNodeSupervisorto create nodes, set up a chain, and start execution, with status updates reported viaMonitoringAgent.
The PipelineProcessor processes data within a node’s pipeline. It executes processes that interact with external services.
-
Responsibilities:
- Integrates with external services using a callback mechanism.
- Optionally transforms data before and after service calls.
- Manages sequential request–response flows within a pipeline.
-
Key Features:
-
Callback-Based Interaction: Supports service integration through methods like
PipelineProcessor.setCallbackService. - Dynamic Configuration: Allows configuration of service targets.
- Metadata Handling: Supports enriched processing by incorporating metadata.
-
Callback-Based Interaction: Supports service integration through methods like
-
Purpose: Executes data processing logic using a callback.
-
Key Features:
- Uses a static
callbackServiceto process data. - Configured with a
targetIdand optional metadata. - Method:
digest(data)returns processed data.
- Uses a static
The LiteConnector is a test connector for the library.
- Purpose: Provides an Express-based server for testing chain and node operations.
-
Key Features:
- Creates and starts chains via HTTP endpoints.
- Supports node communication (e.g., suspend, resume).
- Usage: See Using the LiteConnector.