-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcreateBasicCommandIssuer.ts
More file actions
50 lines (46 loc) · 2.05 KB
/
createBasicCommandIssuer.ts
File metadata and controls
50 lines (46 loc) · 2.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import type { CommandIssuer } from "./CommandIssuer.ts";
import type {
AggregateRootDefinitionMap,
AggregateRootDefinitionMapTypes,
} from "../aggregate/AggregateRootDefinition.ts";
import type { AggregateRootRepository } from "../aggregate/AggregateRootRepository.ts";
import { AggregateRootVersionIntegrityError } from "../eventStore/error/AggregateRootVersionIntegrityError.ts";
/**
* An immediate command issuer processes commands right away. This is in contrast to other kinds
* of command issuers which may acknowledge commands to be processed later or provide additional
* features.
*/
export function createBasicCommandIssuer<
TAggregateMap extends AggregateRootDefinitionMap<TAggregateMapTypes>,
TAggregateMapTypes extends AggregateRootDefinitionMapTypes,
>(
{ aggregateRootRepository, aggregateRoots }: {
aggregateRoots: TAggregateMap;
aggregateRootRepository: AggregateRootRepository<TAggregateMap, TAggregateMapTypes>;
},
): CommandIssuer<TAggregateMap, TAggregateMapTypes> {
return async function issueCommand({ aggregateRootType, aggregateRootId, command, data }) {
const aggregate = await aggregateRootRepository.retrieve({
aggregateRootId,
aggregateRootType,
});
const commandMap = aggregateRoots[aggregateRootType].commands;
const commandFunction = commandMap[command as keyof typeof commandMap];
const commandResult = commandFunction(aggregate.state, data);
const raisedEvents = Array.isArray(commandResult) ? commandResult : [commandResult];
try {
await aggregateRootRepository.persist({
aggregateRoot: aggregate,
pendingEventPayloads: raisedEvents,
});
} catch (e) {
// In cases where there was a version integrity error, we can retry the command. This
// will retrieve the latest version of the aggregate, and deriver a new command outcome
// based on up-to-date data.
if (e instanceof AggregateRootVersionIntegrityError) {
return await issueCommand({ aggregateRootType, aggregateRootId, command, data });
}
throw e;
}
};
}