Skip to content

Add event-storage-http REST API layer with NDJSON reads, stateful consumer endpoints, and raw buffer streaming#316

Draft
Copilot wants to merge 52 commits into
mainfrom
copilot/add-event-storage-http-api-layer
Draft

Add event-storage-http REST API layer with NDJSON reads, stateful consumer endpoints, and raw buffer streaming#316
Copilot wants to merge 52 commits into
mainfrom
copilot/add-event-storage-http-api-layer

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented May 16, 2026

  • ReadablePartition.readFrom → return Buffer (logic inlined from removed extractDocumentPayload); readAllBackwards → add headerOut param; remove iterateFileBuffers/createFileReadStream/iterateDocumentBuffers/createDocumentReadStream
  • WritablePartition.readAllBackwards → add headerOut param
  • ReadableStorage: fix backwards iteration (indexed backwards iterates entries array in reverse; no-index backwards uses readAllBackwards k-way max-merge via extended kWayMerge); extract buildDocumentEntry helper shared between forward/backward no-index iteration; add readRangeBuffers/iterateRangeBuffers/readBufferFrom; remove reverse() helper
  • src/util.js: extend kWayMerge with ascending boolean to support both min-merge and max-merge
  • ReadablePartition.readFrom — removed implicit Buffer.from() copy; returns a subarray view directly; readFrom now denormalizes time64 with this.metadata.epoch so the epoch stays an implementation detail of the partition
  • EventStream — constructor predicate param doubles as the raw-mode toggle: passing true activates raw mode (switches objectMode off, uses storage.readRangeBuffers(), pushes Buffer chunks in _read()); filter() throws when called on a raw stream; _read() simplified to next ? (this.raw ? next : next.payload) : null
  • JoinEventStreamfetch() uses storage.readRange() in object mode (storage handles deserialization) and storage.iterateRangeBuffers() in raw mode; ordering uses epoch-denormalized time64 from the binary document header with sequenceNumber as a tiebreaker — no JSON deserialization needed for ordering; raw mode pushes raw on-disk bytes directly
  • ReadableStorage.iterateRangeBuffers — yields { buffer, time64, sequenceNumber } per document (epoch denormalization done by the partition); used by JoinEventStream as its merge feed; separate iterateRangeWithHeaders removed
  • ReadableStorage.readBufferFrom — simplified pass-through; no longer computes globalTime (partition owns the epoch)
  • ReadableStorage.readRangeBuffers — destructures { buffer } from iterateRangeBuffers
  • EventStore.getEventStream / getAllEvents / fromStreamsraw = false parameter forwarded to the underlying stream classes
  • src/RawEventStream.js removed; index.js export removed
  • AsyncReadablePartition.readFrom → return Buffer
  • event-storage-http/src/http/ndjson.jswriteNdjson checks eventStream.raw and pipes directly; separate writeRawNdjson removed
  • event-storage-http/bench → rewritten to use getEventStream(..., true) / getAllEvents(..., true)
  • test/Partition.spec.js → update for Buffer API; remove partition-level streaming tests
  • metadataUtil — doc blocks added to preCheck, buildMatcherTree, matchesNode, and indexOfSameLevel
  • EventStream and EventStore test coverage improved (EventStream at 100%; EventStore ~99% statements, ~97% branches)
  • PUT /consumers/:id — accepts { state, handler } body where handler is a serialized function string; the compiled handler is registered on the consumer and the consumer is kept alive in an in-memory consumerRegistry so it continues processing events; errors remove the entry from the registry
  • EventStoreHttpApi.js — creates a consumerRegistry Map and passes it to registerPutConsumerRoute
  • All tests passing
  • Code review and CodeQL passed

Copy link
Copy Markdown
Owner

@albe albe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot address the comments

Comment thread event-storage-http/src/EventStoreHttpApi.js Outdated
Comment thread event-storage-http/src/EventStoreHttpApi.js Outdated
Comment thread event-storage-http/src/EventStoreHttpApi.js Outdated
Comment thread event-storage-http/src/EventStoreHttpApi.js
Comment thread event-storage-http/src/http/routeUtils.js Fixed
Copy link
Copy Markdown
Contributor Author

Copilot AI commented May 16, 2026

@copilot address the comments

Done in b20ca80, 0a80dbc, and 577b585. The HTTP API now uses Express routing, applies matchers on EventStream instances, supports nested category paths, and is split into route-focused handler files.

Copilot finished work on behalf of albe May 16, 2026 11:26
Copilot AI requested a review from albe May 16, 2026 11:26
@albe
Copy link
Copy Markdown
Owner

albe commented May 16, 2026

@copilot address the codeQL concern that the identifier and stream names are user controlled input and end up in file operations. We should sanitize the inputs to sensible values. For streamName that would be alphanumeric plus -,/,_; for consumer identifier we don't allow /

Copy link
Copy Markdown
Contributor Author

Copilot AI commented May 16, 2026

@copilot address the codeQL concern that the identifier and stream names are user controlled input and end up in file operations. We should sanitize the inputs to sensible values. For streamName that ...

Handled in ccdb22c, cb35d4c, 9a33a38, and eb47d63. The HTTP routes now validate stream names against letters/numbers plus -, _, / with non-empty path segments, and consumer identifiers against letters/numbers plus -, _, so invalid values are rejected with 400 before reaching file-backed operations.

Copilot finished work on behalf of albe May 16, 2026 13:33
@albe
Copy link
Copy Markdown
Owner

albe commented May 16, 2026

@copilot typeAccessor should make sure to return a string. Also that should be constrained to a streamName match, since want to use it as a stream. A streamName should also allow single dots in the name, but always start with a an alphanumeric character. In fact each segment of the streamName (separated by either /-.) should start with an alphanumeric.

Copy link
Copy Markdown
Contributor Author

Copilot AI commented May 16, 2026

@copilot typeAccessor should make sure to return a string. Also that should be constrained to a streamName match, since want to use it as a stream. A streamName should also allow single dots in the na...

Handled in 386e8bc and polished in f96754a. Stream names now allow single . separators while requiring every /, -, or . segment to start with an alphanumeric, the query route accepts the same valid type names, and typeAccessor now only creates type streams from string values that match that stream-name grammar.

Copy link
Copy Markdown
Owner

@albe albe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot address the comments

Comment thread src/JoinEventStream.js
Comment thread event-storage-http/src/EventStoreHttpApi.js Outdated
Comment thread src/Storage/ReadableStorage.js Outdated
Comment thread src/EventStore.js Outdated
Comment thread src/EventStore.js Outdated
Comment thread src/metadataUtil.js
Copy link
Copy Markdown
Contributor Author

Copilot AI commented May 23, 2026

@copilot address the comments

Done in 88893fe:

  • src/JoinEventStream.js:77@returns now reads The next event, or \false` when the stream is exhausted.`
  • event-storage-http/src/EventStoreHttpApi.js:28 — simplified to once(eventStore, 'ready').then(() => undefined) (no storage branch)
  • src/Storage/ReadableStorage.js:454 — JSDoc updated to @returns {Generator<{document: object, entry: { sequenceNumber, position, size, partition }}>}
  • src/EventStore.js:385checkCondition now passes condition.matcher + condition.raw to fromStreams; filtering is delegated to the stream, so the loop collapses to a single if (stream.next() !== false) throw
  • src/EventStore.js:557CommitCondition stores the original matcher and raw flag directly; buildConditionMatcher removed; query() passes matcher + raw straight through
  • src/metadataUtil.js:214 — added a doc comment explaining that indexOfSameLevel tracks JSON nesting depth from startOffset to ensure the match is at depth 0

Copilot finished work on behalf of albe May 23, 2026 11:45
Copilot AI requested a review from albe May 23, 2026 11:45
Copy link
Copy Markdown
Owner

@albe albe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot add doc blocks to the new metadataUtil methods explaining their intent and expectations. Then address the reduced code coverage, specifically EventStore and EventStream coverage reduced by 1-2%

Copy link
Copy Markdown
Contributor Author

Copilot AI commented May 23, 2026

@copilot add doc blocks to the new metadataUtil methods explaining their intent and expectations. Then address the reduced code coverage, specifically EventStore and EventStream coverage reduced by 1-...

Done in 3280453 and 49e59c0.

  • metadataUtil doc blockspreCheck, buildMatcherTree, and matchesNode each got a single-sentence "why" doc block per project style (why the function exists, not how it works).
  • Coverage — EventStream.js is now at 100% statements/branches/functions/lines. EventStore.js improved from 97.64%→99.13% statements and 91.87%→96.87% branches. Tests added:
    • EventStream: predicate-as-boolean shorthand in constructor, filter() method, object matcher in non-raw mode, Readable stream in raw mode
    • EventStore: predicate-as-boolean shorthand in getEventStream, getAllEvents, fromStreams, and getEventStreamForCategory

Copilot finished work on behalf of albe May 23, 2026 12:10
Copilot AI requested a review from albe May 23, 2026 12:10
import { buildConsumerName, parseConsumerIdentifier, scanConsumersAsync, splitConsumerStreamPath } from '../../http/routeUtils.js';

function registerPutConsumerRoute(app, eventStore) {
app.put(/^\/consumers\/([^/]+)\/stream\/(.+)$/, async (request, response) => {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot consumer put should receive a serialized state handling method. The consumer will then register the handler method and be kept open in memory to stay running.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in f8dbcfb. PUT /consumers/:id/stream/:stream now accepts { "state": {...}, "handler": "(event, state) => newState" }. The handler string is compiled via new Function() (documented as a trusted-caller feature), registered as a data listener on the consumer, and the consumer is started and stored in a consumerRegistry Map on EventStoreHttpApi so it stays running in memory. A re-PUT stops the old instance and starts a fresh one with the new handler.

}
try {
// eslint-disable-next-line no-new-func
const fn = new Function('return (' + handlerCode + ')')();
Copilot AI changed the title Add event-storage-http REST API layer with NDJSON reads, consumer endpoints, and raw buffer streaming Add event-storage-http REST API layer with NDJSON reads, stateful consumer endpoints, and raw buffer streaming May 23, 2026
Copilot finished work on behalf of albe May 23, 2026 14:00
Copilot AI requested a review from albe May 23, 2026 14:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants