Add event-storage-http REST API layer with NDJSON reads, stateful consumer endpoints, and raw buffer streaming#316
Conversation
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/162a67ee-41b8-4fb9-98a0-f5616832b447 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/162a67ee-41b8-4fb9-98a0-f5616832b447 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/162a67ee-41b8-4fb9-98a0-f5616832b447 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/162a67ee-41b8-4fb9-98a0-f5616832b447 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/162a67ee-41b8-4fb9-98a0-f5616832b447 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/0e84b705-f114-4af6-b8d2-fa59e75a8d5d Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/0e84b705-f114-4af6-b8d2-fa59e75a8d5d Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/0e84b705-f114-4af6-b8d2-fa59e75a8d5d Co-authored-by: albe <4259532+albe@users.noreply.github.com>
|
@copilot address the codeQL concern that the identifier and stream names are user controlled input and end up in file operations. We should sanitize the inputs to sensible values. For streamName that would be alphanumeric plus -,/,_; for consumer identifier we don't allow / |
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/8f840359-8922-4743-b452-1793375f3469 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/8f840359-8922-4743-b452-1793375f3469 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/8f840359-8922-4743-b452-1793375f3469 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/8f840359-8922-4743-b452-1793375f3469 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Handled in ccdb22c, cb35d4c, 9a33a38, and eb47d63. The HTTP routes now validate stream names against letters/numbers plus |
|
@copilot typeAccessor should make sure to return a string. Also that should be constrained to a streamName match, since want to use it as a stream. A streamName should also allow single dots in the name, but always start with a an alphanumeric character. In fact each segment of the streamName (separated by either /-.) should start with an alphanumeric. |
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/8f328f24-fe92-4990-bcef-01e0ff360869 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/8f328f24-fe92-4990-bcef-01e0ff360869 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Handled in 386e8bc and polished in f96754a. Stream names now allow single |
…mitCondition Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/7498367c-4780-4bcd-9b93-a12415658959 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Done in 88893fe:
|
…tore coverage Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/50208c41-645b-4fe9-8211-c68334244308 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/50208c41-645b-4fe9-8211-c68334244308 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
|
| import { buildConsumerName, parseConsumerIdentifier, scanConsumersAsync, splitConsumerStreamPath } from '../../http/routeUtils.js'; | ||
|
|
||
| function registerPutConsumerRoute(app, eventStore) { | ||
| app.put(/^\/consumers\/([^/]+)\/stream\/(.+)$/, async (request, response) => { |
There was a problem hiding this comment.
@copilot consumer put should receive a serialized state handling method. The consumer will then register the handler method and be kept open in memory to stay running.
There was a problem hiding this comment.
Done in f8dbcfb. PUT /consumers/:id/stream/:stream now accepts { "state": {...}, "handler": "(event, state) => newState" }. The handler string is compiled via new Function() (documented as a trusted-caller feature), registered as a data listener on the consumer, and the consumer is started and stored in a consumerRegistry Map on EventStoreHttpApi so it stays running in memory. A re-PUT stops the old instance and starts a fresh one with the new handler.
…nsumer running in memory Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/b64fb2e6-813b-44a8-8775-c2c5843e9cd9 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
| } | ||
| try { | ||
| // eslint-disable-next-line no-new-func | ||
| const fn = new Function('return (' + handlerCode + ')')(); |
…sumer error handler Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/b64fb2e6-813b-44a8-8775-c2c5843e9cd9 Co-authored-by: albe <4259532+albe@users.noreply.github.com>
event-storage-http REST API layer with NDJSON reads, consumer endpoints, and raw buffer streamingevent-storage-http REST API layer with NDJSON reads, stateful consumer endpoints, and raw buffer streaming
ReadablePartition.readFrom→ return Buffer (logic inlined from removedextractDocumentPayload);readAllBackwards→ addheaderOutparam; removeiterateFileBuffers/createFileReadStream/iterateDocumentBuffers/createDocumentReadStreamWritablePartition.readAllBackwards→ addheaderOutparamReadableStorage: fix backwards iteration (indexed backwards iterates entries array in reverse; no-index backwards usesreadAllBackwardsk-way max-merge via extendedkWayMerge); extractbuildDocumentEntryhelper shared between forward/backward no-index iteration; addreadRangeBuffers/iterateRangeBuffers/readBufferFrom; removereverse()helpersrc/util.js: extendkWayMergewithascendingboolean to support both min-merge and max-mergeReadablePartition.readFrom— removed implicitBuffer.from()copy; returns asubarrayview directly;readFromnow denormalizestime64withthis.metadata.epochso the epoch stays an implementation detail of the partitionEventStream— constructorpredicateparam doubles as the raw-mode toggle: passingtrueactivates raw mode (switchesobjectModeoff, usesstorage.readRangeBuffers(), pushesBufferchunks in_read());filter()throws when called on a raw stream;_read()simplified tonext ? (this.raw ? next : next.payload) : nullJoinEventStream—fetch()usesstorage.readRange()in object mode (storage handles deserialization) andstorage.iterateRangeBuffers()in raw mode; ordering uses epoch-denormalizedtime64from the binary document header withsequenceNumberas a tiebreaker — no JSON deserialization needed for ordering; raw mode pushes raw on-disk bytes directlyReadableStorage.iterateRangeBuffers— yields{ buffer, time64, sequenceNumber }per document (epoch denormalization done by the partition); used byJoinEventStreamas its merge feed; separateiterateRangeWithHeadersremovedReadableStorage.readBufferFrom— simplified pass-through; no longer computesglobalTime(partition owns the epoch)ReadableStorage.readRangeBuffers— destructures{ buffer }fromiterateRangeBuffersEventStore.getEventStream/getAllEvents/fromStreams—raw = falseparameter forwarded to the underlying stream classessrc/RawEventStream.jsremoved;index.jsexport removedAsyncReadablePartition.readFrom→ return Bufferevent-storage-http/src/http/ndjson.js—writeNdjsoncheckseventStream.rawand pipes directly; separatewriteRawNdjsonremovedevent-storage-http/bench→ rewritten to usegetEventStream(..., true)/getAllEvents(..., true)test/Partition.spec.js→ update for Buffer API; remove partition-level streaming testsmetadataUtil— doc blocks added topreCheck,buildMatcherTree,matchesNode, andindexOfSameLevelPUT /consumers/:id— accepts{ state, handler }body wherehandleris a serialized function string; the compiled handler is registered on the consumer and the consumer is kept alive in an in-memoryconsumerRegistryso it continues processing events; errors remove the entry from the registryEventStoreHttpApi.js— creates aconsumerRegistryMap and passes it toregisterPutConsumerRoute