-
Notifications
You must be signed in to change notification settings - Fork 0
Composition Parallel
A Parallel fans out the same input to multiple agents and runs them concurrently. All agents must share the same IN and OUT types. The result is a List<OUT> preserving the declaration order.
agentA / agentB / agentC
Under the hood, each agent is launched as a coroutine via async on Dispatchers.Default. The caller blocks until every agent finishes. This means three agents that each take 200ms will complete in roughly 200ms total, not 600ms.
A Parallel<IN, OUT> is invokable: call it with parallel(input) and you get List<OUT>.
// Two agents -> Parallel
operator fun <A, B> Agent<A, B>.div(other: Agent<A, B>): Parallel<A, B>
// Extend a Parallel with one more agent
operator fun <A, B> Parallel<A, B>.div(other: Agent<A, B>): Parallel<A, B>All agents in a Parallel must have the same input type A and the same output type B. The / operator enforces this at compile time.
Two agents receive the same string and process it independently:
import agents_engine.core.*
import agents_engine.composition.parallel.div
val upper = agent<String, String>("upper") {
skills {
skill<String, String>("upper", "Uppercase") {
implementedBy { it.uppercase() }
}
}
}
val lower = agent<String, String>("lower") {
skills {
skill<String, String>("lower", "Lowercase") {
implementedBy { it.lowercase() }
}
}
}
val result = (upper / lower)("Hello")
// result == listOf("HELLO", "hello")Three or more agents chain with /:
val result = (a / b / c)("same")
// All three receive "same", result is [a("same"), b("same"), c("same")]A Parallel produces List<OUT>. To continue processing, pipe the list into an aggregator agent whose input type is List<OUT>. Use then from the Pipeline composition:
import agents_engine.composition.pipeline.then
val charCount = agent<String, Int>("chars") {
skills { skill<String, Int>("chars", "Char count") { implementedBy { it.length } } }
}
val wordCount = agent<String, Int>("words") {
skills { skill<String, Int>("words", "Word count") { implementedBy { it.split(" ").size } } }
}
val sum = agent<List<Int>, Int>("sum") {
skills { skill<List<Int>, Int>("sum", "Sum") { implementedBy { it.sum() } } }
}
val pipeline = (charCount / wordCount) then sum
val result = pipeline("hello world")
// [11, 2] -> 13You can also place a pipeline before the parallel stage:
val pipeline = prepare then (reviewerA / reviewerB / reviewerC) then aggregatorThe full type flow is:
prepare: Agent<Input, Spec>
reviewerA / reviewerB / reviewerC: Parallel<Spec, Review>
aggregator: Agent<List<Review>, Final>
pipeline: Pipeline<Input, Final>
Six specialist agents analyze the same specification in parallel, then an aggregator assembles the results:
open class Spec
class UseCase : Spec()
class GlossaryTerm : Spec()
class OuterReference : Spec()
class SystemActor : Spec()
class Feature : Spec()
class Requirement : Spec()
data class SpecsParcel(
val description: String,
val useCases: List<UseCase> = emptyList(),
val glossaryTerms: List<GlossaryTerm> = emptyList(),
val outerReferences: List<OuterReference> = emptyList(),
val systemActors: List<SystemActor> = emptyList(),
val features: List<Feature> = emptyList(),
val requirements: List<Requirement> = emptyList(),
)
val useCasesMaster = agent<SpecsParcel, List<Spec>>("uc") { skills { skill<SpecsParcel, List<Spec>>("uc") { implementedBy { listOf(UseCase()) } } } }
val glossaryMaster = agent<SpecsParcel, List<Spec>>("gl") { skills { skill<SpecsParcel, List<Spec>>("gl") { implementedBy { listOf(GlossaryTerm()) } } } }
val outerRefsMaster = agent<SpecsParcel, List<Spec>>("or") { skills { skill<SpecsParcel, List<Spec>>("or") { implementedBy { listOf(OuterReference()) } } } }
val systemActorsMaster = agent<SpecsParcel, List<Spec>>("sa") { skills { skill<SpecsParcel, List<Spec>>("sa") { implementedBy { listOf(SystemActor()) } } } }
val featuresMaster = agent<SpecsParcel, List<Spec>>("ft") { skills { skill<SpecsParcel, List<Spec>>("ft") { implementedBy { listOf(Feature()) } } } }
val requirementsMaster = agent<SpecsParcel, List<Spec>>("rq") { skills { skill<SpecsParcel, List<Spec>>("rq") { implementedBy { listOf(Requirement()) } } } }
val parallel = useCasesMaster / glossaryMaster / outerRefsMaster /
systemActorsMaster / featuresMaster / requirementsMaster
val gatherer = agent<List<List<Spec>>, SpecsParcel>("gather") {
skills { skill<List<List<Spec>>, SpecsParcel>("gather") { implementedBy { specLists ->
val all = specLists.flatten()
SpecsParcel(
description = "assembled",
useCases = all.filterIsInstance<UseCase>(),
glossaryTerms = all.filterIsInstance<GlossaryTerm>(),
outerReferences = all.filterIsInstance<OuterReference>(),
systemActors = all.filterIsInstance<SystemActor>(),
features = all.filterIsInstance<Feature>(),
requirements = all.filterIsInstance<Requirement>(),
)
} } }
}
val pipeline = parallel then gatherer
val result = pipeline(SpecsParcel("Build a CRM"))
// result.useCases, result.glossaryTerms, etc. are all populatedThe Parallel.invoke implementation uses Kotlin coroutines:
operator fun invoke(input: IN): List<OUT> = runBlocking(Dispatchers.Default) {
executions.map { exec -> async { exec(input) } }.map { it.await() }
}Key characteristics:
-
runBlocking(Dispatchers.Default)-- The calling thread blocks until all agents finish.Dispatchers.Defaultuses a thread pool sized to the number of CPU cores. -
async { exec(input) }-- Each agent runs in its own coroutine, potentially on a different thread. This gives true parallel execution for CPU-bound or I/O-bound agent work. -
Order preserved -- Results come back in declaration order (
a / b / creturns[a_result, b_result, c_result]), even though execution happens concurrently. -
Failure propagation -- If any agent throws, the exception propagates to the caller after
await().
For LLM-backed agents that make network calls, parallel execution can dramatically reduce wall-clock time since each agent's HTTP request overlaps with the others.
Like all compositions, each agent instance can only participate once. An agent placed in a Parallel cannot be reused in another Parallel, Pipeline, Forum, or Loop:
val a = agent<String, String>("a") {}
val b = agent<String, String>("b") {}
val c = agent<String, String>("c") {}
a / b // a and b are now placed
// This throws IllegalArgumentException:
// a / c
// Create a new instance instead:
val a2 = agent<String, String>("a") {}
a2 / c // worksNext: Pipeline | Loop | Branch | Forum | While Loops
Getting Started
Core Concepts
Composition Operators
LLM Integration
- Model & Tool Calling
- Tool Error Recovery
- Skill Selection & Routing
- Budget Controls
- Observability Hooks
Guided Generation
Agent Memory
Reference