Skip to content

Entities

Felix Bole edited this page Mar 20, 2025 · 1 revision

Entities

Find below the main dpcp library entities.

Agent

The Agent class is the foundation for event-emitting entities in the dpcp-library. It extends Node.js's EventEmitter.

  • Purpose: Provides a base for emitting and handling events with a unique ID.
  • Key Features:
    • Generates a unique ID using crypto.randomUUID().
    • Used as a parent class for ReportingAgent and MonitoringAgent.

Overview

ReportingAgent is an agent responsible for relaying status updates (signals) from individual nodes to the rest of the system, particularly the MonitoringAgent. It acts as a communication bridge, ensuring that node-specific events are reported and tracked across distributed chains.

Definition

  • Purpose: The ReportingAgent tracks and communicates the state of a specific node within a chain. It’s instantiated with a chainId identifying the processing chain and a nodeId which identifies the node. This provides a link between events and their sources.

  • Signal Types: It supports two types of signals:

    • Local Signals: Events relevant only to the specific local node or chain, such as a status update like "node_completed".
    • Global Signals: Events broadcasted to the remote root monitoring, such as a chain-wide notification like "chain_deployed".
  • Key Features:

    • Event Emission: It emits signals like global-signal or local-signal, which are picked up by listeners to update the system state.

    • Authorization: It requires authorization from the parent agent (MonitoringAgent) to ensure only legitimate instances are created.

  • Usage Example: When a node finishes processing data, the ReportingAgent might emit a local-signal with { status: "node_completed" }, which the MonitoringAgent then uses to update the chain’s workflow status.


MonitoringAgent

Overview

MonitoringAgent is the central agent service for tracking the state of all chains and nodes in the system. It maintains a workflow object that serves as a live representation of the distributed pipeline’s status.

Definition

  • Purpose: This singleton agent oversees the entire pipeline, collecting and combining status updates from ReportingAgent instances, to give an overall view of the system. It’s designed to ensure that developers can monitor and react to the health and progress of chains and nodes, whether local or remote.

  • Key Components:

    • Workflow Structure: The workflow is a map where each key is a chainId, and the value is a WorkflowNode containing:
      • status: The detail of the node states within the chain (e.g., { "node1": { "node_completed": true } }).
      • setupCount: Tracks how many nodes in the chain have completed setup.
      • setupCompleted: Indicates if all nodes in the chain are fully configured.
      • deployed: A flags indicating if the chain has been deployed.
    • Remote Monitoring: It maintains a remoteMonitoringHost map to link chains to their respective monitoring endpoints, supporting distributed setups.
    • Callbacks: It uses reportingCallback for local updates and broadcastReportingCallback for global notifications, allowing custom handling of signals.
  • Key Features:

    • Signal Processing: Listens to ReportingAgent events, updating the workflow accordingly. For example, a global-signal might trigger a broadcast to remote hosts. While a local-signal would update the chain’s status locally.
    • Chain Status Management: Offers methods like getChainStatus, setChainDeployed, and setChainSetupCompleted to query and manipulate chain states.
  • Usage Example: When a node emits a local-signal indicating completion, MonitoringAgent updates the workflow and if the chain is fully set up, might trigger a pending chain start via a callback.


Node

Overview

The Node class represents an individual processing unit within a chain, tasked with executing a series of data processing pipelines. This is the Core processing unit within a chain.

Definition

  • Responsibilities:

    • Executes multiple pipelines concurrently.
    • Tracks execution state and manages data flow.
    • Reports status updates via the ReportingAgent.
    • Routes output data to subsequent nodes.
    • Supports child chains for nested workflows.
  • Purpose: A Node is responsible for transforming input data through a sequence of processors (ProcessorPipeline). It operates within a chain, coordinating with other nodes and reporting its progress.

  • Key Components:

    • Pipelines: An array of ProcessorPipeline objects, where each pipeline is a series of PipelineProcessor instances that transform data sequentially.
    • Status: Tracks the node’s state (e.g., NODE_PENDING, NODE_IN_PROGRESS, NODE_COMPLETED) and any errors.
    • Execution Queue: A promise-based queue (executionQueue) ensures sequential processing, even in asynchronous environments.
    • Reporting: Integrates with a ReportingAgent to emit status updates via notify.
  • Key Features:

    • Manages dependencies and execution queues.
    • Provides execution control (suspend, resume) via the NodeStatusManager.
    • Configurable for both local and remote routing.
    • Data Processing: The execute method processes data through pipelines in batches, with progress tracking (progress) and support for suspension/resumption via NodeStatusManager.
    • Child Chains: Supports nested chains (chainConfig) that can run in parallel or serial.
    • Next Node Linking: Uses setNextNodeInfo to define the next node in the chain (local or remote).
    • Termination: Handles cleanup and data forwarding (sendData, terminate) based on chain type (persistent, auto-deleting...).
  • Usage Example: A Node might receive { input: "data" }, process it through three pipelines, and forward the result to the next node, notifying NODE_COMPLETED via its ReportingAgent.


NodeStatusManager

The NodeStatusManager handles status updates and signals for a Node.

  • Purpose: Manages node state transitions like suspend or resume.
  • Key Features:
    • Maintains a signal queue (NODE_SUSPEND, NODE_RESUME, etc...).
    • Suspends or resumes node execution as needed.
    • Used internally by Node.

NodeSupervisor

Overview

NodeSupervisor is the orchestrator of the framework, managing the lifecycle of nodes and chains, and coordinating their execution across local and remote environments. It is the central orchestrator of the dpcp library.

Definition

  • Responsibilities:

    • Manages the lifecycle of nodes (creation, execution, deletion).
    • Creates and coordinates processing chains.
    • Distributes chains across local and remote nodes.
    • Handles execution and maintains relationships between nodes.
    • Processes signals for node and chain actions.
  • Signal Handling:
    The NodeSupervisor intercepts and manages a range of signals to control node and chain behavior. Key signals include:

    • NODE_SETUP: Initializes a node with a provided configuration.
    • NODE_CREATE: Creates a new node with specified parameters.
    • NODE_DELETE: Removes an existing node by its ID.
    • NODE_PAUSE: Pauses the execution of a node.
    • NODE_DELAY: Delays the execution of a node for a specified duration.
    • NODE_RUN: Executes a node with provided data.
    • NODE_SEND_DATA: Sends data to a node.
    • CHAIN_PREPARE: Prepares chain distribution for execution.
    • CHAIN_START: Starts a chain with the provided data.
    • CHAIN_START_PENDING: Begins execution of a chain that was pending.
    • CHAIN_DEPLOY: Deploys a chain with specific configurations and data.
    • NODE_SUSPEND/NODE_RESUME: Suspends or resumes node execution.
  • Purpose: This singleton class supervises the creation, setup, execution, and deletion of nodes and chains, acting as the control plane for the distributed system. It ensures that nodes are properly configured, executed in the correct order, and effectively monitored.

  • Key Components:

    • Nodes Map: A Map storing all local Node instances, keyed by their id.
    • Chains Map: Tracks chain relationships (ChainRelation), including root nodes and configurations.
    • Child Chains: Manages parent-child chain relationships for nested workflows.
    • Callbacks: Supports remoteServiceCallback, broadcastSetupCallback, and nodeStatusCallback for interacting with external systems.
  • Key Features:

    • Node Lifecycle: Methods like createNode, setupNode, runNode, and deleteNode handle the full lifecycle, with signals like NODE_SETUP or NODE_RUN triggering actions.
    • Chain Management: deployChain and startChain orchestrate chain deployment and execution, handling both local and remote nodes via prepareChainDistribution.
    • Remote Coordination: Broadcasts setup signals to remote hosts and forwards data to the next node in the chain.
    • Logging: Integrates with NodeSupervisorLogger providing insights into chains and workflows.
  • Usage Example: A developer might call handleRequest({ signal: "CHAIN_DEPLOY", config: [...], data: {...} }), prompting NodeSupervisor to create nodes, set up a chain, and start execution, with status updates reported via MonitoringAgent.


PipelineProcessor

The PipelineProcessor processes data within a node’s pipeline. It executes processes that interact with external services.

  • Responsibilities:

    • Integrates with external services using a callback mechanism.
    • Optionally transforms data before and after service calls.
    • Manages sequential request–response flows within a pipeline.
  • Key Features:

    • Callback-Based Interaction: Supports service integration through methods like PipelineProcessor.setCallbackService.
    • Dynamic Configuration: Allows configuration of service targets.
    • Metadata Handling: Supports enriched processing by incorporating metadata.
  • Purpose: Executes data processing logic using a callback.

  • Key Features:

    • Uses a static callbackService to process data.
    • Configured with a targetId and optional metadata.
    • Method: digest(data) returns processed data.

LiteConnector

The LiteConnector is a test connector for the library.

  • Purpose: Provides an Express-based server for testing chain and node operations.
  • Key Features:
    • Creates and starts chains via HTTP endpoints.
    • Supports node communication (e.g., suspend, resume).
    • Usage: See Using the LiteConnector.

Clone this wiki locally