Skip to content

Composition Parallel

skobeltsyn edited this page Mar 28, 2026 · 1 revision

Composition: Parallel (/)

Overview

A Parallel fans out the same input to multiple agents and runs them concurrently. All agents must share the same IN and OUT types. The result is a List<OUT> preserving the declaration order.

agentA / agentB / agentC

Under the hood, each agent is launched as a coroutine via async on Dispatchers.Default. The caller blocks until every agent finishes. This means three agents that each take 200ms will complete in roughly 200ms total, not 600ms.

A Parallel<IN, OUT> is invokable: call it with parallel(input) and you get List<OUT>.

Type Signature

// Two agents -> Parallel
operator fun <A, B> Agent<A, B>.div(other: Agent<A, B>): Parallel<A, B>

// Extend a Parallel with one more agent
operator fun <A, B> Parallel<A, B>.div(other: Agent<A, B>): Parallel<A, B>

All agents in a Parallel must have the same input type A and the same output type B. The / operator enforces this at compile time.

Basic Example

Two agents receive the same string and process it independently:

import agents_engine.core.*
import agents_engine.composition.parallel.div

val upper = agent<String, String>("upper") {
    skills {
        skill<String, String>("upper", "Uppercase") {
            implementedBy { it.uppercase() }
        }
    }
}

val lower = agent<String, String>("lower") {
    skills {
        skill<String, String>("lower", "Lowercase") {
            implementedBy { it.lowercase() }
        }
    }
}

val result = (upper / lower)("Hello")
// result == listOf("HELLO", "hello")

Three or more agents chain with /:

val result = (a / b / c)("same")
// All three receive "same", result is [a("same"), b("same"), c("same")]

Connecting to a Pipeline

A Parallel produces List<OUT>. To continue processing, pipe the list into an aggregator agent whose input type is List<OUT>. Use then from the Pipeline composition:

import agents_engine.composition.pipeline.then

val charCount = agent<String, Int>("chars") {
    skills { skill<String, Int>("chars", "Char count") { implementedBy { it.length } } }
}
val wordCount = agent<String, Int>("words") {
    skills { skill<String, Int>("words", "Word count") { implementedBy { it.split(" ").size } } }
}
val sum = agent<List<Int>, Int>("sum") {
    skills { skill<List<Int>, Int>("sum", "Sum") { implementedBy { it.sum() } } }
}

val pipeline = (charCount / wordCount) then sum
val result = pipeline("hello world")
// [11, 2] -> 13

You can also place a pipeline before the parallel stage:

val pipeline = prepare then (reviewerA / reviewerB / reviewerC) then aggregator

The full type flow is:

prepare: Agent<Input, Spec>
reviewerA / reviewerB / reviewerC: Parallel<Spec, Review>
aggregator: Agent<List<Review>, Final>

pipeline: Pipeline<Input, Final>

Practical Example: Multi-Perspective Code Review

Six specialist agents analyze the same specification in parallel, then an aggregator assembles the results:

open class Spec
class UseCase : Spec()
class GlossaryTerm : Spec()
class OuterReference : Spec()
class SystemActor : Spec()
class Feature : Spec()
class Requirement : Spec()

data class SpecsParcel(
    val description: String,
    val useCases: List<UseCase> = emptyList(),
    val glossaryTerms: List<GlossaryTerm> = emptyList(),
    val outerReferences: List<OuterReference> = emptyList(),
    val systemActors: List<SystemActor> = emptyList(),
    val features: List<Feature> = emptyList(),
    val requirements: List<Requirement> = emptyList(),
)

val useCasesMaster     = agent<SpecsParcel, List<Spec>>("uc") { skills { skill<SpecsParcel, List<Spec>>("uc") { implementedBy { listOf(UseCase()) } } } }
val glossaryMaster     = agent<SpecsParcel, List<Spec>>("gl") { skills { skill<SpecsParcel, List<Spec>>("gl") { implementedBy { listOf(GlossaryTerm()) } } } }
val outerRefsMaster    = agent<SpecsParcel, List<Spec>>("or") { skills { skill<SpecsParcel, List<Spec>>("or") { implementedBy { listOf(OuterReference()) } } } }
val systemActorsMaster = agent<SpecsParcel, List<Spec>>("sa") { skills { skill<SpecsParcel, List<Spec>>("sa") { implementedBy { listOf(SystemActor()) } } } }
val featuresMaster     = agent<SpecsParcel, List<Spec>>("ft") { skills { skill<SpecsParcel, List<Spec>>("ft") { implementedBy { listOf(Feature()) } } } }
val requirementsMaster = agent<SpecsParcel, List<Spec>>("rq") { skills { skill<SpecsParcel, List<Spec>>("rq") { implementedBy { listOf(Requirement()) } } } }

val parallel = useCasesMaster / glossaryMaster / outerRefsMaster /
    systemActorsMaster / featuresMaster / requirementsMaster

val gatherer = agent<List<List<Spec>>, SpecsParcel>("gather") {
    skills { skill<List<List<Spec>>, SpecsParcel>("gather") { implementedBy { specLists ->
        val all = specLists.flatten()
        SpecsParcel(
            description     = "assembled",
            useCases        = all.filterIsInstance<UseCase>(),
            glossaryTerms   = all.filterIsInstance<GlossaryTerm>(),
            outerReferences = all.filterIsInstance<OuterReference>(),
            systemActors    = all.filterIsInstance<SystemActor>(),
            features        = all.filterIsInstance<Feature>(),
            requirements    = all.filterIsInstance<Requirement>(),
        )
    } } }
}

val pipeline = parallel then gatherer
val result = pipeline(SpecsParcel("Build a CRM"))
// result.useCases, result.glossaryTerms, etc. are all populated

Performance

The Parallel.invoke implementation uses Kotlin coroutines:

operator fun invoke(input: IN): List<OUT> = runBlocking(Dispatchers.Default) {
    executions.map { exec -> async { exec(input) } }.map { it.await() }
}

Key characteristics:

  • runBlocking(Dispatchers.Default) -- The calling thread blocks until all agents finish. Dispatchers.Default uses a thread pool sized to the number of CPU cores.
  • async { exec(input) } -- Each agent runs in its own coroutine, potentially on a different thread. This gives true parallel execution for CPU-bound or I/O-bound agent work.
  • Order preserved -- Results come back in declaration order (a / b / c returns [a_result, b_result, c_result]), even though execution happens concurrently.
  • Failure propagation -- If any agent throws, the exception propagates to the caller after await().

For LLM-backed agents that make network calls, parallel execution can dramatically reduce wall-clock time since each agent's HTTP request overlaps with the others.

Agent Placement

Like all compositions, each agent instance can only participate once. An agent placed in a Parallel cannot be reused in another Parallel, Pipeline, Forum, or Loop:

val a = agent<String, String>("a") {}
val b = agent<String, String>("b") {}
val c = agent<String, String>("c") {}

a / b  // a and b are now placed

// This throws IllegalArgumentException:
// a / c

// Create a new instance instead:
val a2 = agent<String, String>("a") {}
a2 / c  // works

Next: Pipeline | Loop | Branch | Forum | While Loops

Clone this wiki locally