Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ dist
.env.*
*.log
.DS_Store
.agentpay-store/
.agentpay-store-test-*/
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,30 @@ API gateway, metering, and billing backend for the AgentPay protocol (machine-to
```
Server runs at `http://localhost:3001`. Try `GET /health` and `GET /api/v1/version`.

## Persistence

AgentPay uses in-memory stores by default:

```bash
STORAGE_DRIVER=memory
```

To keep usage counters, registered services, disabled-service flags, service
metadata, API keys, and webhooks across process restarts, enable the file
storage driver:

```bash
STORAGE_DRIVER=file STORAGE_PATH=.agentpay-store npm start
```

`STORAGE_PATH` must resolve inside the project directory. The file driver writes
one JSON file per store and replaces files atomically via a temporary file. Store
contents are flushed on every write and again during the SIGTERM/SIGINT graceful
shutdown path.

The on-disk files contain application state, including API key records. Keep the
storage directory private and out of version control.

## Project structure

```
Expand Down
9 changes: 8 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { createMetricsRouter } from "./routes/metrics.js";
import { createServicesRouter } from "./routes/services.js";
import { createUsageRouter } from "./routes/usage.js";
import { createWebhooksRouter } from "./routes/webhooks.js";
import { flushStores } from "./store/state.js";

const PORT = process.env.PORT ?? 3001;

Expand Down Expand Up @@ -50,12 +51,18 @@ if (process.argv[1]?.endsWith("index.js") || process.argv[1]?.endsWith("index.ts
});

const shutdown = (signal: string) => {
console.log(`Received ${signal}, draining`);
console.log(`Received ${signal}, draining...`);
server.close((err) => {
if (err) {
console.error("server.close error:", err);
process.exit(1);
}
try {
flushStores();
} catch (flushErr) {
console.error("store flush error:", flushErr);
process.exit(1);
}
process.exit(0);
});
setTimeout(() => {
Expand Down
237 changes: 237 additions & 0 deletions src/store/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import {
existsSync,
mkdirSync,
readFileSync,
renameSync,
writeFileSync,
} from "node:fs";
import path from "node:path";

export interface Store<V> {
/** Return the value for a key, if present. */
get(key: string): V | undefined;
/** Persist a value under a key. */
set(key: string, value: V): void;
/** Remove a key and return whether it existed. */
delete(key: string): boolean;
/** Iterate every key/value pair in insertion order. */
entries(): IterableIterator<[string, V]>;
/** Iterate key/value pairs whose key starts with the supplied prefix. */
scanByPrefix(prefix: string): IterableIterator<[string, V]>;
/** Remove all entries. */
clear(): void;
/** Flush any buffered state to durable storage. */
flush(): void;
}

type Flushable = { flush(): void };

const flushables: Flushable[] = [];

export class InMemoryStore<V> implements Store<V> {
protected readonly data = new Map<string, V>();

get(key: string) {
return this.data.get(key);
}

set(key: string, value: V) {
this.data.set(key, value);
}

delete(key: string) {
return this.data.delete(key);
}

entries() {
return this.data.entries();
}

*scanByPrefix(prefix: string) {
for (const [key, value] of this.data.entries()) {
if (key.startsWith(prefix)) yield [key, value] as [string, V];
}
}

clear() {
this.data.clear();
}

flush() {
// In-memory stores have nothing to flush.
}
}

type FilePayload<V> = {
version: 1;
entries: [string, V][];
};

const safeStoreName = /^[A-Za-z0-9._-]+$/;

const resolveStorageDir = (rawDir: string) => {
const base = process.cwd();
const resolved = path.resolve(base, rawDir);
const relative = path.relative(base, resolved);
if (relative.startsWith("..") || path.isAbsolute(relative)) {
throw new Error("STORAGE_PATH must stay inside the project directory");
}
return resolved;
};

export class JsonFileStore<V> extends InMemoryStore<V> {
private dirty = false;
private readonly filePath: string;
private readonly dirPath: string;

constructor(storeName: string, storageDir: string) {
super();
if (!safeStoreName.test(storeName)) {
throw new Error(`invalid store name: ${storeName}`);
}
this.dirPath = resolveStorageDir(storageDir);
this.filePath = path.join(this.dirPath, `${storeName}.json`);
this.load();
}

override set(key: string, value: V) {
super.set(key, value);
this.dirty = true;
this.flush();
}

override delete(key: string) {
const deleted = super.delete(key);
if (deleted) {
this.dirty = true;
this.flush();
}
return deleted;
}

override clear() {
super.clear();
this.dirty = true;
this.flush();
}

override flush() {
if (!this.dirty) return;
mkdirSync(this.dirPath, { recursive: true });
const payload: FilePayload<V> = {
version: 1,
entries: Array.from(this.entries()),
};
const tempPath = `${this.filePath}.tmp`;
writeFileSync(tempPath, `${JSON.stringify(payload, null, 2)}\n`, "utf8");
renameSync(tempPath, this.filePath);
this.dirty = false;
}

private load() {
if (!existsSync(this.filePath)) return;
const raw = readFileSync(this.filePath, "utf8");
const parsed = JSON.parse(raw) as Partial<FilePayload<V>>;
if (parsed.version !== 1 || !Array.isArray(parsed.entries)) {
throw new Error(`invalid storage file format: ${this.filePath}`);
}
for (const [key, value] of parsed.entries) {
if (typeof key !== "string") {
throw new Error(`invalid storage key in ${this.filePath}`);
}
this.data.set(key, value);
}
}
}

export class StoreMap<V> extends Map<string, V> {
constructor(private readonly backend: Store<V>) {
super();
for (const [key, value] of backend.entries()) {
super.set(key, value);
}
}

override set(key: string, value: V) {
super.set(key, value);
this.backend.set(key, value);
return this;
}

override delete(key: string) {
const deleted = super.delete(key);
const backendDeleted = this.backend.delete(key);
return deleted || backendDeleted;
}

override clear() {
super.clear();
this.backend.clear();
}

scanByPrefix(prefix: string) {
return this.backend.scanByPrefix(prefix);
}

flush() {
this.backend.flush();
}
}

export class StoreSet extends Set<string> {
constructor(private readonly backend: Store<boolean>) {
super();
for (const [key, enabled] of backend.entries()) {
if (enabled) super.add(key);
}
}

override add(value: string) {
super.add(value);
this.backend.set(value, true);
return this;
}

override delete(value: string) {
const deleted = super.delete(value);
const backendDeleted = this.backend.delete(value);
return deleted || backendDeleted;
}

override clear() {
super.clear();
this.backend.clear();
}

flush() {
this.backend.flush();
}
}

const createBackend = <V>(storeName: string): Store<V> => {
const driver = (process.env.STORAGE_DRIVER ?? "memory").toLowerCase();
if (driver === "memory") return new InMemoryStore<V>();
if (driver === "file") {
return new JsonFileStore<V>(
storeName,
process.env.STORAGE_PATH ?? ".agentpay-store"
);
}
throw new Error("STORAGE_DRIVER must be either memory or file");
};

export const createStoreMap = <V>(storeName: string) => {
const store = new StoreMap<V>(createBackend<V>(storeName));
flushables.push(store);
return store;
};

export const createStoreSet = (storeName: string) => {
const store = new StoreSet(createBackend<boolean>(storeName));
flushables.push(store);
return store;
};

export const flushStores = () => {
for (const store of flushables) store.flush();
};
23 changes: 14 additions & 9 deletions src/store/state.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import { createStoreMap, createStoreSet, flushStores } from "./index.js";

export { flushStores };

/**
* Mutable process-local stores used by the in-memory AgentPay API.
* Mutable stores used by the AgentPay API.
*
* These exports preserve the existing development behavior: state lives only
* for the lifetime of the Node process and resets on restart.
* The default storage driver is in-memory. Set STORAGE_DRIVER=file to back
* service, usage, API-key, metadata, disabled-service, and webhook stores with
* JSON files that survive process restarts.
*/

export type ApiKeyRecord = { label: string; createdAt: number };
Expand All @@ -21,25 +26,25 @@ export const config: Record<string, number> = {
};

/** Opaque API keys keyed by full secret token. */
export const apiKeyStore = new Map<string, ApiKeyRecord>();
export const apiKeyStore = createStoreMap<ApiKeyRecord>("api-keys");

/** Outstanding usage counters keyed by `${agent}::${serviceId}`. */
export const usageStore = new Map<string, number>();
export const usageStore = createStoreMap<number>("usage");

/** Builds the shared in-memory usage key for an agent/service pair. */
export const usageKey = (agent: string, serviceId: string) => `${agent}::${serviceId}`;

/** Registered services and their per-request prices. */
export const servicesStore = new Map<string, { priceStroops: number }>();
export const servicesStore = createStoreMap<{ priceStroops: number }>("services");

/** Services currently disabled for write traffic. */
export const servicesDisabled = new Set<string>();
export const servicesDisabled = createStoreSet("services-disabled");

/** Optional service description/owner metadata. */
export const servicesMetadata = new Map<string, ServiceMetadataDto>();
export const servicesMetadata = createStoreMap<ServiceMetadataDto>("services-metadata");

/** Registered webhooks and their event subscriptions. */
export const webhookStore = new Map<string, WebhookRecord>();
export const webhookStore = createStoreMap<WebhookRecord>("webhooks");

/** Rate-limiter windows keyed by source IP. */
export const rateBuckets = new Map<string, number[]>();
Expand Down
Loading
Loading