Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ lib
*.log
.idea
.claude/settings.local.json
temp
38 changes: 30 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ Run a promise graph with concurrency control.
$ npm install p-graph
```

`p-graph` does not have a strict Node version requirement, but the syntax used is currently intended to remain compatible with Node 12+.
`p-graph` does not have a strict Node version requirement, but the syntax used is currently intended to remain compatible with Node 14+.

## Usage

The p-graph library takes in a map of of nodes and a list of dependencies. The keys in the map are unique string identifiers for each node in the graph. The value of the map is the definition of the task, including the function that should be executed by that task in it's run argument. The dependencies list is an array of tuples, each tuple contains the two values that must correspond to ids in the node map. The run function corresponding to the first item in the tuple must complete before the second item in the tuple can begin.

The return value of pGraph is a class with a `run()` function. Calling the `run()` function will return a promise that resolves after all the tasks in the graph have finished completed. Tasks are run in dependency order.
The `PGraph` class takes in a map (or record) of nodes and a list of dependencies.

```ts
import { pGraph, type DependencyList, type PGraphNodeRecord } from "p-graph";
import { PGraph, type DependencyList, type PGraphNodeRecord } from "p-graph";

// This can be either an object or map (PGraphNodeMap).
// Functions can be sync or async.
// Mapping from node IDs to definitions (can be either an object or map).
// `run` functions can be sync or async. Nodes can optionally define a `priority`.
// (Alternatively, you can omit the `run` functions here and specify a single
// function to `pGraph.run()`.)
const nodeMap: PGraphNodeRecord = {
putOnShirt: { run: () => console.log("put on your shirt") },
putOnShorts: { run: () => console.log("put on your shorts") },
Expand All @@ -29,6 +29,8 @@ const nodeMap: PGraphNodeRecord = {
tieShoes: { run: () => console.log("tie your shoes") },
};

// List of tuples describing dependencies (edges) between node IDs:
// the first task must complete before the second one begins.
const dependencies: DependencyList = [
// You need to put your shoes on before you tie them!
["putOnShoes", "tieShoes"],
Expand All @@ -37,9 +39,24 @@ const dependencies: DependencyList = [
["putOnShorts", "putOnShoes"],
];

await pGraph(nodeMap, dependencies).run();
// Run the tasks (log to console) in dependency order
await new PGraph(nodeMap, dependencies).run();

// Alternative API: reuse a single run() function
const simpleNodeMap: PGraphNodeRecord = {
putOnShirt: {},
putOnShorts: {},
putOnJacket: {},
putOnShoes: {},
tieShoes: {},
};
await new PGraph(simpleNodeMap, dependencies).run({
run: (taskId) => console.log(taskId),
});
```

If a task fails, the graph will throw a `PGraphError` wrapping the original error(s). See `run()` comments for more details.

### Concurrency

There are some contexts where you may want to limit the number of functions running concurrently. One example would be to prevent overloading the CPU with too many parallel tasks. The concurrency argument to `run` will limit the number of functions that start running at a given time. If no concurrency option is set, the concurrency is not limited and tasks are run as soon as they are unblocked.
Expand All @@ -59,3 +76,8 @@ const nodeMap: PGraphNodeRecord = {
unspecified: { run: () => Promise.resolve() } // treated as 0
}
```

## Breaking changes in v2

- The default export function and the `pGraph` function have been removed. Use `new PGraph()` instead.
- If a task fails, `run()` will reject with a single `PGraphError` instead of an array of errors. The original errors are available under `pGraphError.taskErrors`. (Note a regular `Error` may also be thrown if initial validation fails. `PGraphError` is exported for `instanceof` checks.)
7 changes: 7 additions & 0 deletions change/p-graph-3b4c7c88-80f7-4100-8fd6-83594053d111.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "major",
"comment": "The default export function and the `pGraph` function have been removed. Use `new PGraph()` instead.",
"packageName": "p-graph",
"email": "elcraig@microsoft.com",
"dependentChangeType": "patch"
}
7 changes: 7 additions & 0 deletions change/p-graph-88a9d120-cc64-4664-88df-0674f8236575.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"comment": "If a task fails, `run()` will reject with a single `PGraphError` instead of an array of errors. The original errors are available under `pGraphError.taskErrors`. (Note a regular `Error` may also be thrown if initial validation fails. `PGraphError` is exported for `instanceof` checks.)",
"type": "major",
"packageName": "p-graph",
"email": "elcraig@microsoft.com",
"dependentChangeType": "patch"
}
7 changes: 7 additions & 0 deletions change/p-graph-90fb77ef-8e40-44b1-9ebc-40057bf3475e.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"comment": "As an alternative to specifying a `run()` function for each node, there's now an option to specify a single `run` function: `pGraph.run({ run: () => {...} })`. This allows reusing the same graph setup for multiple operations.",
"type": "minor",
"packageName": "p-graph",
"email": "elcraig@microsoft.com",
"dependentChangeType": "patch"
}
33 changes: 23 additions & 10 deletions src/PGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
} from "./types";
import { PriorityQueue } from "./PriorityQueue";
import { getNodeCumulativePriorities } from "./getNodeCumulativePriorities";
import { PGraphError } from "./PGraphError";

export class PGraph {
/** Original dependency map for the graph */
Expand All @@ -18,6 +19,7 @@ export class PGraph {
*/
readonly #nodesWithNoDependencies: ReadonlyArray<string>;

/** Cumulative priority for each node (its priority, plus max cumulative priority of children) */
readonly #nodeCumulativePriorities: Readonly<Record<string, number>>;

/**
Expand Down Expand Up @@ -84,24 +86,35 @@ export class PGraph {
* Runs all the tasks in the promise graph in dependency order.
* The graph can be run multiple times.
*
* If one or more tasks fail, it throws a `PGraphError` containing the original error(s).
* (It could also throw a regular `Error` on initial validation failure.)
*
* Failure behavior:
* - Throws an `Error` on initial validation failure
* - If `continue` is false or unset and a task fails, the promise will reject immediately with
* a **single error**.
* a `PGraphError` containing the original error.
* - If `continue` is true and a task fails, any tasks not dependent on the failed task will
* continue running, and an **array of errors** will be thrown at the end.
* continue running, and a `PGraphError` containing all original errors will be thrown at the end.
*/
run(options?: RunOptions): Promise<void> {
// Copy the dependency map so the graph can be reused
const dependencyMap = new Map(
const dependencyMap: Map<string, PGraphNodeWithDependencies> = new Map(
[...this.#dependencyMap.entries()].map(([key, node]) => [
key,
{ ...node, dependsOn: new Set(node.dependsOn), dependedOnBy: new Set(node.dependedOnBy) },
{
...node,
// Use the override run function if provided, or fall back to the original
run: options?.run?.bind(null, key) || node.run,
dependsOn: new Set(node.dependsOn),
dependedOnBy: new Set(node.dependedOnBy),
},
]),
);

const nodeCumulativePriorities = this.#nodeCumulativePriorities;
const concurrency = options?.concurrency;

if (concurrency !== undefined && concurrency < 0) {
if (concurrency !== undefined && concurrency < 1) {
throw new Error(
`concurrency must be either undefined or a positive integer; received ${options?.concurrency}`,
);
Expand All @@ -110,7 +123,7 @@ export class PGraph {
const priorityQueue = new PriorityQueue<string>();

for (const itemId of this.#nodesWithNoDependencies) {
priorityQueue.insert(itemId, this.#nodeCumulativePriorities[itemId]);
priorityQueue.insert(itemId, nodeCumulativePriorities[itemId]);
}

let currentlyRunningTaskCount = 0;
Expand All @@ -127,7 +140,7 @@ export class PGraph {
currentlyRunningTaskCount += 1;

if (!taskToRun.failed) {
await taskToRun.run();
await taskToRun.run?.();
}
} catch (e) {
// mark node and its children to be "failed" in the case of continue, we'll traverse, but not run the nodes
Expand Down Expand Up @@ -156,7 +169,7 @@ export class PGraph {
// If the task that just completed was the last remaining dependency for a node,
// add it to the set of unblocked nodes
if (dependentNode.dependsOn.size === 0) {
priorityQueue.insert(dependentId, this.#nodeCumulativePriorities[dependentId]);
priorityQueue.insert(dependentId, nodeCumulativePriorities[dependentId]);
}
}
}
Expand All @@ -172,7 +185,7 @@ export class PGraph {
if (errors.length === 0) {
resolve();
} else {
reject(errors);
reject(new PGraphError(errors));
}
return;
}
Expand All @@ -192,7 +205,7 @@ export class PGraph {
trySchedulingTasks();
} else {
// immediately reject, if not using "continue" option
reject(e);
reject(new PGraphError([e]));
}
});
}
Expand Down
14 changes: 14 additions & 0 deletions src/PGraphError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Error thrown if PGraph task execution fails.
* Contains the original errors thrown by the tasks.
*
* (For validation failures outside of task execution, `p-graph` will throw a regular `Error`.)
*/
export class PGraphError extends Error {
constructor(public readonly taskErrors: unknown[]) {
super(
"Error(s) occurred during task execution:\n" +
taskErrors.map((e) => `- ${String(e)}`).join("\n"),
);
}
}
59 changes: 41 additions & 18 deletions src/__tests__/PGraph.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { DependencyList, PGraphNodeMap, PGraphNodeRecord } from "../types";
import { FunctionScheduler } from "./FunctionScheduler";
import { PGraph } from "../PGraph";
import { PGraphError } from "../PGraphError";

describe("PGraph", () => {
/** Make a map with the given keys and no-op runner functions (`jest.fn()`) */
Expand Down Expand Up @@ -278,8 +279,7 @@ describe("PGraph", () => {
});

describe("error handling", () => {
// TODO combine?
it("throws a single error if a task fails when continue is unset/false", async () => {
it("throws if a task fails when continue is unset/false", async () => {
const nodeMap = makeNoOpMap(["A", "B"]);
nodeMap.set("C", { run: () => Promise.reject("C rejected") });

Expand All @@ -290,8 +290,14 @@ describe("PGraph", () => {
["A", "C"],
];

// This would be toThrow() if it was an Error, but the example in this case is a string
await expect(new PGraph(nodeMap, dependencies).run()).rejects.toEqual("C rejected");
const error = (await new PGraph(nodeMap, dependencies).run().catch((e) => e)) as PGraphError;
expect(error).toBeInstanceOf(PGraphError);
expect(error.taskErrors.map((e) => String(e))).toEqual(["C rejected"]);
// Check the message format here
expect(error.message).toMatchInlineSnapshot(`
"Error(s) occurred during task execution:
- C rejected"
`);
});

it("if continue is true and a task fails, continues to run other tasks and throws at end", async () => {
Expand All @@ -307,17 +313,20 @@ describe("PGraph", () => {
["E", "F"],
];

await expect(
new PGraph(nodeMap, dependencies).run({ concurrency: 1, continue: true }),
).rejects.toEqual(["C rejected"]);
const error = (await new PGraph(nodeMap, dependencies)
.run({ concurrency: 1, continue: true })
.catch((e) => e)) as PGraphError;
expect(error).toBeInstanceOf(PGraphError);
expect(error.taskErrors.map((e) => String(e))).toEqual(["C rejected"]);

expect(nodeMap.get("E")!.run).toHaveBeenCalled();
expect(nodeMap.get("F")!.run).toHaveBeenCalled();
expect(nodeMap.get("D")!.run).not.toHaveBeenCalled();
});

it("if continue is true, returns multiple independent failures", async () => {
it("if continue is true, throws at end for multiple independent failures", async () => {
const nodeMap = makeNoOpMap(["A", "D", "F", "G"]);
nodeMap.set("B", { run: () => Promise.reject("B rejected") });
nodeMap.set("B", { run: () => Promise.reject(new Error("B rejected")) });
nodeMap.set("C", { run: () => Promise.reject("C rejected") });
nodeMap.set("E", { run: () => Promise.reject("E rejected") });
// A
Expand All @@ -334,9 +343,18 @@ describe("PGraph", () => {
];

// Only B and C should fail (E is skipped because C failed)
await expect(
new PGraph(nodeMap, dependencies).run({ concurrency: 2, continue: true }),
).rejects.toEqual(["B rejected", "C rejected"]);
const error = (await new PGraph(nodeMap, dependencies)
.run({ concurrency: 2, continue: true })
.catch((e) => e)) as PGraphError;
expect(error).toBeInstanceOf(PGraphError);
// Check the message formatting. It converts the original errors to strings, so a thrown
// Error will have a prefix, but a thrown string won't.
expect(error.message).toMatchInlineSnapshot(`
"Error(s) occurred during task execution:
- Error: B rejected
- C rejected"
`);
expect(error.taskErrors.map((e) => String(e))).toEqual(["Error: B rejected", "C rejected"]);

// Independent successful paths should still execute
expect(nodeMap.get("A")!.run).toHaveBeenCalled();
Expand All @@ -361,9 +379,11 @@ describe("PGraph", () => {
["A", "E"],
];

await expect(
new PGraph(scheduler.nodeMap, dependencies).run({ continue: true, concurrency: 10 }),
).rejects.toContain("B rejected");
const error = (await new PGraph(scheduler.nodeMap, dependencies)
.run({ concurrency: 10, continue: true })
.catch((e) => e)) as PGraphError;
expect(error).toBeInstanceOf(PGraphError);
expect(error.taskErrors).toEqual(["B rejected"]);

// All non-failing tasks should execute
expect(scheduler.didCompleteTask("C")).toBe(true);
Expand All @@ -386,9 +406,12 @@ describe("PGraph", () => {
["A", "C"],
];

await expect(new PGraph(nodeMap, dependencies).run({ continue: true })).rejects.toEqual([
new Error("B threw synchronously"),
]);
const error = (await new PGraph(nodeMap, dependencies)
.run({ continue: true })
.catch((e) => e)) as PGraphError;
expect(error).toBeInstanceOf(PGraphError);
expect(error.taskErrors.map((e) => String(e))).toEqual(["Error: B threw synchronously"]);

expect(nodeMap.get("C")!.run).toHaveBeenCalled();
});
});
Expand Down
7 changes: 3 additions & 4 deletions src/getNodeCumulativePriorities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ import { findCycle } from "./findCycle";
import type { PGraphNodeWithDependencies } from "./types";

/**
* Returns a JS map that has the "cumulative" priority for each node, which is defined as the
* priority of the current node plus the maximum cumulative priority amongst all children.
* This is helpful for identifying which nodes to schedule first in order to get to higher
* priority nodes more quickly.
* Calculates the "cumulative" priority for each node: the priority of the current node plus the
* maximum cumulative priority amongst all children. This is helpful for identifying which nodes
* to schedule first in order to get to higher priority nodes more quickly.
*
* Uses a reverse Kahn's algorithm (BFS from leaves to roots) to calculate priorities in a single pass.
* Throws if a cycle is detected.
Expand Down
22 changes: 2 additions & 20 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,5 @@
import { PGraph } from "./PGraph";
import type { PGraphNodeMap, DependencyList, PGraphNodeRecord } from "./types";

export { PGraph };

/**
* Create a new graph runner from a list of nodes and dependencies (edges).
* Throws an error if a cycle is detected. (This is the same as `new PGraph(...)`.)
*
* @param nodeMap Mapping from node ID to function and priority
* @param dependencies Each tuple describes a dependency between two nodes in the p-graph:
* the first task must complete before the second one begins.
* @returns The graph ready to run
*/
export function pGraph(nodeMap: PGraphNodeMap | PGraphNodeRecord, dependencies: DependencyList) {
return new PGraph(nodeMap, dependencies);
}

export default pGraph;

export { PGraph } from "./PGraph";
export { PGraphError } from "./PGraphError";
export type {
DependencyList,
PGraphNode,
Expand Down
Loading