From 38de11240bb2e2c5d8bdc6fc222ba04d8481d87a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 22 Mar 2026 12:46:51 +0000 Subject: [PATCH 1/4] Initial plan From 6810a12b92b2c5acd3070c75511ad0a5e3fdcc0f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 22 Mar 2026 12:53:30 +0000 Subject: [PATCH 2/4] Implement ring buffer for index memory management (fixes #73) Co-authored-by: albe <4259532+albe@users.noreply.github.com> Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/7e6a5798-f2db-4c7e-a376-9a78894d3bf5 --- src/Index/ReadOnlyIndex.js | 11 ++- src/Index/ReadableIndex.js | 149 +++++++++++++++++++++++++++---------- src/Index/WritableIndex.js | 18 +++-- test/Index.spec.js | 65 ++++++++++++++++ 4 files changed, 197 insertions(+), 46 deletions(-) diff --git a/src/Index/ReadOnlyIndex.js b/src/Index/ReadOnlyIndex.js index fa1db0d9..0b010190 100644 --- a/src/Index/ReadOnlyIndex.js +++ b/src/Index/ReadOnlyIndex.js @@ -24,9 +24,16 @@ class ReadOnlyIndex extends watchesFile(ReadableIndex) { if (!this.fd) { return; } - const prevLength = this.data.length; + const prevLength = this._length; const newLength = this.readFileLength(); - this.data.length = newLength; + if (newLength < prevLength) { + // Clear ring buffer slots for the removed entries to avoid stale reads + const oldCacheStart = Math.max(0, prevLength - this.cacheSize); + for (let i = Math.max(oldCacheStart, newLength); i < prevLength; i++) { + this.cache[i % this.cacheSize] = null; + } + } + this._length = newLength; if (newLength > prevLength) { this.emit('append', prevLength, newLength); } diff --git a/src/Index/ReadableIndex.js b/src/Index/ReadableIndex.js index c998d4d6..c53c7d28 100644 --- a/src/Index/ReadableIndex.js +++ b/src/Index/ReadableIndex.js @@ -38,6 +38,7 @@ class ReadableIndex extends events.EventEmitter { * @param {object} [options] An object with additional index options. * @param {typeof EntryInterface} [options.EntryClass] The entry class to use for index items. Must implement the EntryInterface methods. * @param {string} [options.dataDirectory] The directory to store the index file in. Default '.'. + * @param {number} [options.cacheSize] The number of most-recent index entries to keep in memory. Default 1024. * @param {number} [options.writeBufferSize] The number of bytes to use for the write buffer. Default 4096. * @param {number} [options.flushDelay] How many ms to delay the write buffer flush to optimize throughput. Default 100. * @param {object} [options.metadata] An object containing the metadata information for this index. Will be written on initial creation and checked on subsequent openings. @@ -65,8 +66,10 @@ class ReadableIndex extends events.EventEmitter { * @param {object} options */ initialize(options) { - /* @type Array */ - this.data = []; + this._length = 0; + this.cacheSize = options.cacheSize !== undefined ? Math.max(1, options.cacheSize >>> 0) : 1024; // jshint ignore:line + /* @type Array Ring buffer holding at most cacheSize entries */ + this.cache = new Array(this.cacheSize); this.fd = null; this.fileMode = 'r'; this.EntryClass = options.EntryClass; @@ -100,7 +103,7 @@ class ReadableIndex extends events.EventEmitter { * @returns {number} */ get length() { - return this.data.length; + return this._length; } /** @@ -155,7 +158,7 @@ class ReadableIndex extends events.EventEmitter { const length = this.readFileLength(); if (length > 0) { - this.data = new Array(length); + this._length = length; // Read last item to get the index started this.read(length); } @@ -232,8 +235,9 @@ class ReadableIndex extends events.EventEmitter { * @api */ close() { - this.data = []; + this._length = 0; this.readUntil = -1; + this.cache.fill(null); this.readBuffer.fill(0); if (this.fd) { fs.closeSync(this.fd); @@ -250,21 +254,27 @@ class ReadableIndex extends events.EventEmitter { * @returns {Entry} The index entry at the given position. */ read(index) { - index = Number(index) - 1; + const i = Number(index) - 1; // 0-based - fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + index * this.EntryClass.size); - if (index === this.readUntil + 1) { + fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + i * this.EntryClass.size); + if (i === this.readUntil + 1) { this.readUntil++; } - this.data[index] = this.EntryClass.fromBuffer(this.readBuffer); - - return this.data[index]; + const entry = this.EntryClass.fromBuffer(this.readBuffer); + // Store in ring buffer only if within the current cache window + if (i >= this._length - this.cacheSize) { + this.cache[i % this.cacheSize] = entry; + } + return entry; } /** * Read a range of entries from disk. This method will not do any range checks. * It will however optimize to prevent reading entries that have already been read sequentially from start. * + * Entries within the ring buffer cache window are stored in the cache; entries outside the window + * (older than cacheSize) are read from disk for the return value but not cached. + * * @private * @param {number} from The 1-based index position from where to read from (inclusive). * @param {number} until The 1-based index position until which to read to (inclusive). @@ -275,25 +285,61 @@ class ReadableIndex extends events.EventEmitter { return [this.read(from)]; } - from--; - until--; - - const readFrom = Math.max(this.readUntil + 1, from); - const amount = (until - readFrom + 1); - - const readBuffer = Buffer.allocUnsafe(amount * this.EntryClass.size); - let readSize = fs.readSync(this.fd, readBuffer, 0, readBuffer.byteLength, this.headerSize + readFrom * this.EntryClass.size); - let index = 0; - while (index < amount && readSize > 0) { - this.data[index + readFrom] = this.EntryClass.fromBuffer(readBuffer, index * this.EntryClass.size); - readSize -= this.EntryClass.size; - index++; + const f = from - 1; // 0-based + const u = until - 1; // 0-based + const cacheStart = Math.max(0, this._length - this.cacheSize); + + // Build the result array up front + const result = new Array(u - f + 1); + + // Part 1: Out-of-window entries [f, min(cacheStart-1, u)] — read from disk, do not cache + const outEnd = Math.min(cacheStart - 1, u); + if (f < cacheStart && outEnd >= f) { + const count = outEnd - f + 1; + const outBuf = Buffer.allocUnsafe(count * this.EntryClass.size); + const bytesRead = fs.readSync(this.fd, outBuf, 0, outBuf.byteLength, this.headerSize + f * this.EntryClass.size); + const entries = Math.floor(bytesRead / this.EntryClass.size); + for (let idx = 0; idx < entries; idx++) { + result[idx] = this.EntryClass.fromBuffer(outBuf, idx * this.EntryClass.size); + } } - if (from <= this.readUntil + 1) { - this.readUntil = Math.max(this.readUntil, until); + + // Part 2: In-window entries [max(cacheStart, f), u] — use cache + disk for uncached ones + // All indices accessed below satisfy i >= inStart >= cacheStart, so each slot i % cacheSize + // is exclusive to index i within the window and cannot hold a stale entry. + const inStart = Math.max(cacheStart, f); + if (inStart <= u) { + // Optimisation: skip entries already loaded sequentially into the cache + const readFrom = Math.max(this.readUntil + 1, inStart); + + // Trim trailing entries already present in the cache. + // readUntil >= readFrom >= cacheStart throughout, so all slots checked are in-window. + let readUntil = u; + while (readUntil >= readFrom && readUntil >= cacheStart && this.cache[readUntil % this.cacheSize]) { + readUntil--; + } + + if (readFrom <= readUntil) { + const count = readUntil - readFrom + 1; + const inBuf = Buffer.allocUnsafe(count * this.EntryClass.size); + const bytesRead = fs.readSync(this.fd, inBuf, 0, inBuf.byteLength, this.headerSize + readFrom * this.EntryClass.size); + const entries = Math.floor(bytesRead / this.EntryClass.size); + for (let idx = 0; idx < entries; idx++) { + const i = readFrom + idx; + this.cache[i % this.cacheSize] = this.EntryClass.fromBuffer(inBuf, idx * this.EntryClass.size); + } + if (inStart <= this.readUntil + 1) { + this.readUntil = Math.max(this.readUntil, readUntil); + } + } + + // Fill the result from the ring buffer for the in-window portion + for (let i = inStart; i <= u; i++) { + result[i - f] = this.cache[i % this.cacheSize]; + } } - return this.data.slice(from, until + 1); + return result; } /** @@ -318,13 +364,18 @@ class ReadableIndex extends events.EventEmitter { * @returns {Entry|boolean} The entry at the given index position or false if out of bounds. */ get(index) { - index = wrapAndCheck(index, this.length); + index = wrapAndCheck(index, this._length); if (index <= 0) { return false; } - if (this.data[index - 1]) { - return this.data[index - 1]; + const i = index - 1; // 0-based + // The ring buffer window is [_length - cacheSize, _length - 1]. + // Within this window every index maps to a unique slot (no two indices share a slot), + // so a non-null slot is guaranteed to belong to index i and cannot be stale. + if (i >= this._length - this.cacheSize) { + const cached = this.cache[i % this.cacheSize]; + if (cached) return cached; } return this.read(index); @@ -354,24 +405,44 @@ class ReadableIndex extends events.EventEmitter { * @returns {Array|boolean} An array of entries for the given range or false on error. */ range(from, until = -1) { - from = wrapAndCheck(from, this.length); - until = wrapAndCheck(until, this.length); + from = wrapAndCheck(from, this._length); + until = wrapAndCheck(until, this._length); if (from <= 0 || until < from) { return false; } - const readFrom = Math.max(this.readUntil + 1, from); - let readUntil = until; - while (readUntil >= readFrom && this.data[readUntil - 1]) { - readUntil--; + const f = from - 1; // 0-based + const u = until - 1; // 0-based + const cacheStart = Math.max(0, this._length - this.cacheSize); + + // Determine if any disk reads are required + const hasOutOfWindow = f < cacheStart; + const inStart = Math.max(cacheStart, f); + // Entries in [inStart, readUntil] are assumed cached (sequential read guarantee). + // All indices in [readFrom, u] satisfy >= inStart >= cacheStart — unique, non-stale slots. + const readFrom = Math.max(this.readUntil + 1, inStart); + let needsDiskRead = hasOutOfWindow; + if (!needsDiskRead && inStart <= u) { + // Scan backwards for uncached in-window tail entries (all >= cacheStart, no stale slots). + let scanUntil = u; + while (scanUntil >= readFrom && scanUntil >= cacheStart && this.cache[scanUntil % this.cacheSize]) { + scanUntil--; + } + needsDiskRead = readFrom <= scanUntil; } - if (readFrom <= readUntil) { - this.readRange(readFrom, readUntil); + if (needsDiskRead) { + return this.readRange(from, until); } - return this.data.slice(from - 1, until); + // All required entries are already in the ring buffer — build result directly. + // f >= cacheStart here (hasOutOfWindow is false), so all slots are in-window and valid. + const result = new Array(u - f + 1); + for (let i = f; i <= u; i++) { + result[i - f] = this.cache[i % this.cacheSize]; + } + return result; } /** diff --git a/src/Index/WritableIndex.js b/src/Index/WritableIndex.js index 9cf8a884..ac9ae7ea 100644 --- a/src/Index/WritableIndex.js +++ b/src/Index/WritableIndex.js @@ -20,6 +20,7 @@ class WritableIndex extends ReadableIndex { * @param {object} [options] An object with additional index options. * @param {EntryInterface} [options.EntryClass] The entry class to use for index items. Must implement the EntryInterface methods. * @param {string} [options.dataDirectory] The directory to store the index file in. Default '.'. + * @param {number} [options.cacheSize] The number of most-recent index entries to keep in memory. Default 1024. * @param {number} [options.writeBufferSize] The number of bytes to use for the write buffer. Default 4096. * @param {number} [options.flushDelay] How many ms to delay the write buffer flush to optimize throughput. Default 100. * @param {object} [options.metadata] An object containing the metadata information for this index. Will be written on initial creation and checked on subsequent openings. @@ -192,22 +193,23 @@ class WritableIndex extends ReadableIndex { throw new Error('Consistency error. Tried to add an index that should come before existing last entry.'); } - if (this.readUntil === this.data.length - 1) { + if (this.readUntil === this._length - 1) { this.readUntil++; } - this.data[this.data.length] = entry; + this.cache[this._length % this.cacheSize] = entry; + this._length++; if (this.writeBufferCursor === 0) { this.flushTimeout = setTimeout(() => this.flush(), this.flushDelay); } this.writeBufferCursor += entry.toBuffer(this.writeBuffer, this.writeBufferCursor); - this.onFlush(callback, this.length); + this.onFlush(callback, this._length); if (this.writeBufferCursor >= this.writeBuffer.byteLength) { this.flush(); } - return this.length; + return this._length; } /** @@ -230,7 +232,13 @@ class WritableIndex extends ReadableIndex { return; } fs.truncateSync(this.fileName, truncatePosition); - this.data.splice(after); + + // Clear ring buffer slots for the removed entries to avoid stale reads + const oldCacheStart = Math.max(0, this._length - this.cacheSize); + for (let i = Math.max(oldCacheStart, after); i < this._length; i++) { + this.cache[i % this.cacheSize] = null; + } + this._length = after; this.readUntil = Math.min(this.readUntil, after); } } diff --git a/test/Index.spec.js b/test/Index.spec.js index 7652282b..8b32fdfd 100644 --- a/test/Index.spec.js +++ b/test/Index.spec.js @@ -504,6 +504,71 @@ describe('Index', function() { }); + describe('cacheSize / ring buffer', function() { + + it('limits in-memory entries to cacheSize (old entries still readable via disk)', function() { + const cacheSize = 5; + index = setupIndexWithEntries(20, { cacheSize }); + index.close(); + index.open(); + // All entries must be readable even though only the last `cacheSize` are ever in memory. + for (let i = 1; i <= 20; i++) { + const entry = index.get(i); + expect(entry).not.to.be(false); + expect(entry.number).to.be(i); + } + }); + + it('can still read entries outside the cache window from disk', function() { + const cacheSize = 5; + index = setupIndexWithEntries(15, { cacheSize }); + // Entry 1 is outside the cache window (window = [11, 15]) + const entry = index.get(1); + expect(entry).not.to.be(false); + expect(entry.number).to.be(1); + }); + + it('can read a range that spans both cached and uncached entries', function() { + const cacheSize = 5; + index = setupIndexWithEntries(15, { cacheSize }); + const entries = index.range(8, 13); // crosses the boundary + expect(entries.length).to.be(6); + for (let i = 0; i < entries.length; i++) { + expect(entries[i].number).to.be(8 + i); + } + }); + + it('truncation followed by re-adding entries returns the new values, not stale ones', function() { + const cacheSize = 10; + index = setupIndexWithEntries(15, { cacheSize }); + index.truncate(7); // keep entries 1-7 + // Add new entries at positions 8-10 with different numbers + for (let i = 8; i <= 10; i++) { + index.add(new Index.Entry(i + 100, i)); + } + index.flush(); + // New entries must return the just-added values, not the old truncated ones + for (let i = 8; i <= 10; i++) { + const entry = index.get(i); + expect(entry.number).to.be(i + 100); + } + }); + + it('remains fully functional after many adds that cycle the ring buffer', function() { + const cacheSize = 8; + index = setupIndexWithEntries(32, { cacheSize }); // 4 full cycles + index.close(); + index.open(); + // All entries readable via disk reads + const entries = index.all(); + expect(entries.length).to.be(32); + for (let i = 0; i < entries.length; i++) { + expect(entries[i].number).to.be(i + 1); + } + }); + + }); + describe('ReadOnly', function(){ it('can be created without explicit name', function(){ From 3971ca999021902d442c17ca72d880ab13938fa1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 24 Mar 2026 17:23:31 +0000 Subject: [PATCH 3/4] Extract ring buffer logic into RingBuffer.js class with clean cache API Co-authored-by: albe <4259532+albe@users.noreply.github.com> Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/9fd21e21-3dde-4c24-90ae-e41f3620da75 --- src/Index/ReadOnlyIndex.js | 11 +-- src/Index/ReadableIndex.js | 64 ++++++-------- src/Index/RingBuffer.js | 129 ++++++++++++++++++++++++++++ src/Index/WritableIndex.js | 16 ++-- test/RingBuffer.spec.js | 172 +++++++++++++++++++++++++++++++++++++ 5 files changed, 332 insertions(+), 60 deletions(-) create mode 100644 src/Index/RingBuffer.js create mode 100644 test/RingBuffer.spec.js diff --git a/src/Index/ReadOnlyIndex.js b/src/Index/ReadOnlyIndex.js index 0b010190..fcf157b8 100644 --- a/src/Index/ReadOnlyIndex.js +++ b/src/Index/ReadOnlyIndex.js @@ -24,16 +24,9 @@ class ReadOnlyIndex extends watchesFile(ReadableIndex) { if (!this.fd) { return; } - const prevLength = this._length; + const prevLength = this._cache.length; const newLength = this.readFileLength(); - if (newLength < prevLength) { - // Clear ring buffer slots for the removed entries to avoid stale reads - const oldCacheStart = Math.max(0, prevLength - this.cacheSize); - for (let i = Math.max(oldCacheStart, newLength); i < prevLength; i++) { - this.cache[i % this.cacheSize] = null; - } - } - this._length = newLength; + this._cache.truncate(newLength); if (newLength > prevLength) { this.emit('append', prevLength, newLength); } diff --git a/src/Index/ReadableIndex.js b/src/Index/ReadableIndex.js index c53c7d28..fed8150d 100644 --- a/src/Index/ReadableIndex.js +++ b/src/Index/ReadableIndex.js @@ -3,6 +3,7 @@ const path = require('path'); const events = require('events'); const Entry = require('../IndexEntry'); const { assert, wrapAndCheck, binarySearch } = require('../util'); +const RingBuffer = require('./RingBuffer'); // node-event-store-index V01 const HEADER_MAGIC = "nesidx01"; @@ -66,10 +67,8 @@ class ReadableIndex extends events.EventEmitter { * @param {object} options */ initialize(options) { - this._length = 0; - this.cacheSize = options.cacheSize !== undefined ? Math.max(1, options.cacheSize >>> 0) : 1024; // jshint ignore:line - /* @type Array Ring buffer holding at most cacheSize entries */ - this.cache = new Array(this.cacheSize); + const cacheSize = options.cacheSize !== undefined ? options.cacheSize : 1024; + this._cache = new RingBuffer(cacheSize); this.fd = null; this.fileMode = 'r'; this.EntryClass = options.EntryClass; @@ -103,7 +102,7 @@ class ReadableIndex extends events.EventEmitter { * @returns {number} */ get length() { - return this._length; + return this._cache.length; } /** @@ -158,7 +157,7 @@ class ReadableIndex extends events.EventEmitter { const length = this.readFileLength(); if (length > 0) { - this._length = length; + this._cache.truncate(length); // Read last item to get the index started this.read(length); } @@ -235,9 +234,8 @@ class ReadableIndex extends events.EventEmitter { * @api */ close() { - this._length = 0; + this._cache.reset(); this.readUntil = -1; - this.cache.fill(null); this.readBuffer.fill(0); if (this.fd) { fs.closeSync(this.fd); @@ -261,10 +259,7 @@ class ReadableIndex extends events.EventEmitter { this.readUntil++; } const entry = this.EntryClass.fromBuffer(this.readBuffer); - // Store in ring buffer only if within the current cache window - if (i >= this._length - this.cacheSize) { - this.cache[i % this.cacheSize] = entry; - } + this._cache.set(i, entry); return entry; } @@ -272,7 +267,7 @@ class ReadableIndex extends events.EventEmitter { * Read a range of entries from disk. This method will not do any range checks. * It will however optimize to prevent reading entries that have already been read sequentially from start. * - * Entries within the ring buffer cache window are stored in the cache; entries outside the window + * Entries within the cache window are stored in the cache; entries outside the window * (older than cacheSize) are read from disk for the return value but not cached. * * @private @@ -287,7 +282,7 @@ class ReadableIndex extends events.EventEmitter { const f = from - 1; // 0-based const u = until - 1; // 0-based - const cacheStart = Math.max(0, this._length - this.cacheSize); + const cacheStart = this._cache.windowStart; // Build the result array up front const result = new Array(u - f + 1); @@ -305,17 +300,14 @@ class ReadableIndex extends events.EventEmitter { } // Part 2: In-window entries [max(cacheStart, f), u] — use cache + disk for uncached ones - // All indices accessed below satisfy i >= inStart >= cacheStart, so each slot i % cacheSize - // is exclusive to index i within the window and cannot hold a stale entry. const inStart = Math.max(cacheStart, f); if (inStart <= u) { // Optimisation: skip entries already loaded sequentially into the cache const readFrom = Math.max(this.readUntil + 1, inStart); - // Trim trailing entries already present in the cache. - // readUntil >= readFrom >= cacheStart throughout, so all slots checked are in-window. + // Trim trailing entries already present in the cache let readUntil = u; - while (readUntil >= readFrom && readUntil >= cacheStart && this.cache[readUntil % this.cacheSize]) { + while (readUntil >= readFrom && this._cache.get(readUntil)) { readUntil--; } @@ -326,16 +318,16 @@ class ReadableIndex extends events.EventEmitter { const entries = Math.floor(bytesRead / this.EntryClass.size); for (let idx = 0; idx < entries; idx++) { const i = readFrom + idx; - this.cache[i % this.cacheSize] = this.EntryClass.fromBuffer(inBuf, idx * this.EntryClass.size); + this._cache.set(i, this.EntryClass.fromBuffer(inBuf, idx * this.EntryClass.size)); } if (inStart <= this.readUntil + 1) { this.readUntil = Math.max(this.readUntil, readUntil); } } - // Fill the result from the ring buffer for the in-window portion + // Fill the result from the cache for the in-window portion for (let i = inStart; i <= u; i++) { - result[i - f] = this.cache[i % this.cacheSize]; + result[i - f] = this._cache.get(i); } } @@ -364,19 +356,14 @@ class ReadableIndex extends events.EventEmitter { * @returns {Entry|boolean} The entry at the given index position or false if out of bounds. */ get(index) { - index = wrapAndCheck(index, this._length); + index = wrapAndCheck(index, this._cache.length); if (index <= 0) { return false; } const i = index - 1; // 0-based - // The ring buffer window is [_length - cacheSize, _length - 1]. - // Within this window every index maps to a unique slot (no two indices share a slot), - // so a non-null slot is guaranteed to belong to index i and cannot be stale. - if (i >= this._length - this.cacheSize) { - const cached = this.cache[i % this.cacheSize]; - if (cached) return cached; - } + const cached = this._cache.get(i); + if (cached) return cached; return this.read(index); } @@ -405,8 +392,8 @@ class ReadableIndex extends events.EventEmitter { * @returns {Array|boolean} An array of entries for the given range or false on error. */ range(from, until = -1) { - from = wrapAndCheck(from, this._length); - until = wrapAndCheck(until, this._length); + from = wrapAndCheck(from, this._cache.length); + until = wrapAndCheck(until, this._cache.length); if (from <= 0 || until < from) { return false; @@ -414,19 +401,17 @@ class ReadableIndex extends events.EventEmitter { const f = from - 1; // 0-based const u = until - 1; // 0-based - const cacheStart = Math.max(0, this._length - this.cacheSize); + const cacheStart = this._cache.windowStart; // Determine if any disk reads are required const hasOutOfWindow = f < cacheStart; const inStart = Math.max(cacheStart, f); - // Entries in [inStart, readUntil] are assumed cached (sequential read guarantee). - // All indices in [readFrom, u] satisfy >= inStart >= cacheStart — unique, non-stale slots. const readFrom = Math.max(this.readUntil + 1, inStart); let needsDiskRead = hasOutOfWindow; if (!needsDiskRead && inStart <= u) { - // Scan backwards for uncached in-window tail entries (all >= cacheStart, no stale slots). + // Scan backwards for uncached in-window tail entries let scanUntil = u; - while (scanUntil >= readFrom && scanUntil >= cacheStart && this.cache[scanUntil % this.cacheSize]) { + while (scanUntil >= readFrom && this._cache.get(scanUntil)) { scanUntil--; } needsDiskRead = readFrom <= scanUntil; @@ -436,11 +421,10 @@ class ReadableIndex extends events.EventEmitter { return this.readRange(from, until); } - // All required entries are already in the ring buffer — build result directly. - // f >= cacheStart here (hasOutOfWindow is false), so all slots are in-window and valid. + // All required entries are already in the cache — build result directly const result = new Array(u - f + 1); for (let i = f; i <= u; i++) { - result[i - f] = this.cache[i % this.cacheSize]; + result[i - f] = this._cache.get(i); } return result; } diff --git a/src/Index/RingBuffer.js b/src/Index/RingBuffer.js new file mode 100644 index 00000000..fa7adb09 --- /dev/null +++ b/src/Index/RingBuffer.js @@ -0,0 +1,129 @@ +/** + * A fixed-capacity ring buffer used as an index entry cache. + * + * Only the most-recent `capacity` entries are kept in memory. The buffer also + * tracks the total number of entries ever added (`length`) so callers can tell + * whether a slot is within the live in-memory window. + * + * API contract + * ------------ + * - `get(index)` — cached item at 0-based `index`, or `null` when the + * index is outside the in-memory window or the slot has + * not been written yet. + * - `set(index, item)` — stores `item` at 0-based `index` if it falls inside + * the current window; silently ignores out-of-window + * writes. + * - `add(item)` — appends `item` at position `length`, advances + * `length`, and returns the new length. + * - `truncate(newLength)` — discards entries from `newLength` onwards (nulls + * their slots) and sets `length = newLength`. Safe to + * call with `newLength >= length` (grow-only update). + * - `reset()` — clears all slots and resets `length` to 0. + */ +class RingBuffer { + + /** + * @param {number} capacity Maximum number of entries held in memory. + */ + constructor(capacity) { + this._capacity = Math.max(1, capacity >>> 0); // jshint ignore:line + this._buffer = new Array(this._capacity); + this._length = 0; + } + + /** + * Total number of items ever appended (not capped at capacity). + * @type {number} + */ + get length() { + return this._length; + } + + /** + * Maximum number of items kept in memory. + * @type {number} + */ + get capacity() { + return this._capacity; + } + + /** + * The smallest 0-based index that is currently inside the in-memory window. + * Indices below this value are not cached and require a disk read. + * @type {number} + */ + get windowStart() { + return Math.max(0, this._length - this._capacity); + } + + /** + * Return the cached item at the given 0-based index, or `null` if the + * index is outside the in-memory window or the slot has not been populated. + * + * @param {number} index 0-based position. + * @returns {*|null} + */ + get(index) { + if (index < this.windowStart) { + return null; + } + const item = this._buffer[index % this._capacity]; + return item !== undefined ? item : null; + } + + /** + * Store `item` at the given 0-based `index`. + * Writes outside the current in-memory window are silently ignored. + * + * @param {number} index 0-based position. + * @param {*} item + */ + set(index, item) { + if (index < this.windowStart) { + return; + } + this._buffer[index % this._capacity] = item; + } + + /** + * Append `item` at position `length` and advance `length`. + * + * @param {*} item + * @returns {number} The new length (1-based position of the appended item). + */ + add(item) { + this._buffer[this._length % this._capacity] = item; + this._length++; + return this._length; + } + + /** + * Discard entries from `newLength` onwards by nulling their cache slots, + * then set `length = newLength`. + * + * When `newLength >= length` no eviction is performed and only `length` is + * updated (useful when the underlying file has grown and the caller just + * needs to advance the length counter without populating new slots). + * + * @param {number} newLength The new total length. + */ + truncate(newLength) { + if (newLength < this._length) { + const cacheStart = this.windowStart; + for (let i = Math.max(cacheStart, newLength); i < this._length; i++) { + this._buffer[i % this._capacity] = null; + } + } + this._length = newLength; + } + + /** + * Clear all cached slots and reset `length` to 0. + */ + reset() { + this._buffer.fill(null); + this._length = 0; + } +} + +module.exports = RingBuffer; diff --git a/src/Index/WritableIndex.js b/src/Index/WritableIndex.js index ac9ae7ea..d429f0c0 100644 --- a/src/Index/WritableIndex.js +++ b/src/Index/WritableIndex.js @@ -193,23 +193,22 @@ class WritableIndex extends ReadableIndex { throw new Error('Consistency error. Tried to add an index that should come before existing last entry.'); } - if (this.readUntil === this._length - 1) { + if (this.readUntil === this._cache.length - 1) { this.readUntil++; } - this.cache[this._length % this.cacheSize] = entry; - this._length++; + this._cache.add(entry); if (this.writeBufferCursor === 0) { this.flushTimeout = setTimeout(() => this.flush(), this.flushDelay); } this.writeBufferCursor += entry.toBuffer(this.writeBuffer, this.writeBufferCursor); - this.onFlush(callback, this._length); + this.onFlush(callback, this._cache.length); if (this.writeBufferCursor >= this.writeBuffer.byteLength) { this.flush(); } - return this._length; + return this._cache.length; } /** @@ -233,12 +232,7 @@ class WritableIndex extends ReadableIndex { } fs.truncateSync(this.fileName, truncatePosition); - // Clear ring buffer slots for the removed entries to avoid stale reads - const oldCacheStart = Math.max(0, this._length - this.cacheSize); - for (let i = Math.max(oldCacheStart, after); i < this._length; i++) { - this.cache[i % this.cacheSize] = null; - } - this._length = after; + this._cache.truncate(after); this.readUntil = Math.min(this.readUntil, after); } } diff --git a/test/RingBuffer.spec.js b/test/RingBuffer.spec.js new file mode 100644 index 00000000..f6bd0661 --- /dev/null +++ b/test/RingBuffer.spec.js @@ -0,0 +1,172 @@ +const expect = require('expect.js'); +const RingBuffer = require('../src/Index/RingBuffer'); + +describe('RingBuffer', function() { + + describe('constructor', function() { + + it('initialises length to 0', function() { + const rb = new RingBuffer(4); + expect(rb.length).to.be(0); + }); + + it('exposes the configured capacity', function() { + const rb = new RingBuffer(8); + expect(rb.capacity).to.be(8); + }); + + it('clamps capacity to a minimum of 1', function() { + const rb = new RingBuffer(0); + expect(rb.capacity).to.be(1); + }); + + }); + + describe('windowStart', function() { + + it('is 0 when length <= capacity', function() { + const rb = new RingBuffer(4); + rb.add('a'); rb.add('b'); + expect(rb.windowStart).to.be(0); + }); + + it('advances as length exceeds capacity', function() { + const rb = new RingBuffer(3); + for (let i = 0; i < 5; i++) rb.add(i); + // length=5, capacity=3 → windowStart=2 + expect(rb.windowStart).to.be(2); + }); + + }); + + describe('add', function() { + + it('returns the new length after each add', function() { + const rb = new RingBuffer(4); + expect(rb.add('x')).to.be(1); + expect(rb.add('y')).to.be(2); + }); + + it('increments length on every add', function() { + const rb = new RingBuffer(3); + rb.add('a'); rb.add('b'); rb.add('c'); rb.add('d'); + expect(rb.length).to.be(4); + }); + + it('makes items retrievable via get', function() { + const rb = new RingBuffer(4); + rb.add('first'); + rb.add('second'); + expect(rb.get(0)).to.be('first'); + expect(rb.get(1)).to.be('second'); + }); + + }); + + describe('get', function() { + + it('returns null for an index below windowStart', function() { + const rb = new RingBuffer(2); + rb.add('a'); rb.add('b'); rb.add('c'); // window = [1, 2] + expect(rb.get(0)).to.be(null); + }); + + it('returns null for a slot that has never been set', function() { + const rb = new RingBuffer(4); + expect(rb.get(0)).to.be(null); + }); + + it('returns the correct item within the window', function() { + const rb = new RingBuffer(4); + ['a', 'b', 'c', 'd', 'e'].forEach(v => rb.add(v)); + // window = [1, 4], item at index 4 = 'e' + expect(rb.get(4)).to.be('e'); + }); + + it('returns null after the item has been evicted by further adds', function() { + const rb = new RingBuffer(3); + rb.add('a'); rb.add('b'); rb.add('c'); // window=[0,2] + rb.add('d'); // window=[1,3], index 0 evicted + expect(rb.get(0)).to.be(null); + }); + + }); + + describe('set', function() { + + it('stores an item at an in-window index', function() { + const rb = new RingBuffer(4); + rb.add(null); // length=1 + rb.set(0, 'hello'); + expect(rb.get(0)).to.be('hello'); + }); + + it('ignores writes below windowStart', function() { + const rb = new RingBuffer(2); + rb.add('a'); rb.add('b'); rb.add('c'); // window=[1,2] + rb.set(0, 'overwrite'); + expect(rb.get(0)).to.be(null); + }); + + }); + + describe('truncate', function() { + + it('reduces length and evicts stale in-window slots', function() { + const rb = new RingBuffer(4); + rb.add('a'); rb.add('b'); rb.add('c'); + rb.truncate(1); // keep only index 0 + expect(rb.length).to.be(1); + expect(rb.get(1)).to.be(null); + expect(rb.get(2)).to.be(null); + }); + + it('retains entries below the truncation point', function() { + const rb = new RingBuffer(4); + rb.add('a'); rb.add('b'); rb.add('c'); + rb.truncate(2); + expect(rb.get(0)).to.be('a'); + expect(rb.get(1)).to.be('b'); + }); + + it('can grow length (no eviction)', function() { + const rb = new RingBuffer(4); + rb.add('a'); + rb.truncate(3); // grow + expect(rb.length).to.be(3); + expect(rb.get(0)).to.be('a'); + }); + + it('allows re-adding after truncation without returning stale data', function() { + const rb = new RingBuffer(4); + rb.add('old-1'); rb.add('old-2'); rb.add('old-3'); + rb.truncate(1); // keep only index 0 + rb.add('new-1'); // now at index 1 + expect(rb.get(1)).to.be('new-1'); + }); + + }); + + describe('reset', function() { + + it('clears all slots and sets length to 0', function() { + const rb = new RingBuffer(4); + rb.add('a'); rb.add('b'); + rb.reset(); + expect(rb.length).to.be(0); + expect(rb.get(0)).to.be(null); + expect(rb.get(1)).to.be(null); + }); + + it('allows fresh adds after reset', function() { + const rb = new RingBuffer(4); + rb.add('a'); rb.add('b'); + rb.reset(); + rb.add('x'); + expect(rb.get(0)).to.be('x'); + expect(rb.length).to.be(1); + }); + + }); + +}); From c2a083c57d7af3ac819c1ac4a3b9317ee02f99e5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 24 Mar 2026 17:46:10 +0000 Subject: [PATCH 4/4] Address review comments: rename variables and add RingBuffer.slice() Co-authored-by: albe <4259532+albe@users.noreply.github.com> Agent-Logs-Url: https://github.com/albe/node-event-storage/sessions/a143d98f-44b3-4cea-bcdc-2a7b9467adbd --- src/Index/ReadOnlyIndex.js | 4 +- src/Index/ReadableIndex.js | 93 ++++++++++++++++++-------------------- src/Index/RingBuffer.js | 20 ++++++++ src/Index/WritableIndex.js | 10 ++-- test/RingBuffer.spec.js | 32 ++++++++++++- 5 files changed, 102 insertions(+), 57 deletions(-) diff --git a/src/Index/ReadOnlyIndex.js b/src/Index/ReadOnlyIndex.js index fcf157b8..bd853074 100644 --- a/src/Index/ReadOnlyIndex.js +++ b/src/Index/ReadOnlyIndex.js @@ -24,9 +24,9 @@ class ReadOnlyIndex extends watchesFile(ReadableIndex) { if (!this.fd) { return; } - const prevLength = this._cache.length; + const prevLength = this.cache.length; const newLength = this.readFileLength(); - this._cache.truncate(newLength); + this.cache.truncate(newLength); if (newLength > prevLength) { this.emit('append', prevLength, newLength); } diff --git a/src/Index/ReadableIndex.js b/src/Index/ReadableIndex.js index fed8150d..fc82ef65 100644 --- a/src/Index/ReadableIndex.js +++ b/src/Index/ReadableIndex.js @@ -68,7 +68,7 @@ class ReadableIndex extends events.EventEmitter { */ initialize(options) { const cacheSize = options.cacheSize !== undefined ? options.cacheSize : 1024; - this._cache = new RingBuffer(cacheSize); + this.cache = new RingBuffer(cacheSize); this.fd = null; this.fileMode = 'r'; this.EntryClass = options.EntryClass; @@ -102,7 +102,7 @@ class ReadableIndex extends events.EventEmitter { * @returns {number} */ get length() { - return this._cache.length; + return this.cache.length; } /** @@ -157,7 +157,7 @@ class ReadableIndex extends events.EventEmitter { const length = this.readFileLength(); if (length > 0) { - this._cache.truncate(length); + this.cache.truncate(length); // Read last item to get the index started this.read(length); } @@ -234,7 +234,7 @@ class ReadableIndex extends events.EventEmitter { * @api */ close() { - this._cache.reset(); + this.cache.reset(); this.readUntil = -1; this.readBuffer.fill(0); if (this.fd) { @@ -252,14 +252,14 @@ class ReadableIndex extends events.EventEmitter { * @returns {Entry} The index entry at the given position. */ read(index) { - const i = Number(index) - 1; // 0-based + const zeroBasedIndex = Number(index) - 1; - fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + i * this.EntryClass.size); - if (i === this.readUntil + 1) { + fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + zeroBasedIndex * this.EntryClass.size); + if (zeroBasedIndex === this.readUntil + 1) { this.readUntil++; } const entry = this.EntryClass.fromBuffer(this.readBuffer); - this._cache.set(i, entry); + this.cache.set(zeroBasedIndex, entry); return entry; } @@ -280,54 +280,54 @@ class ReadableIndex extends events.EventEmitter { return [this.read(from)]; } - const f = from - 1; // 0-based - const u = until - 1; // 0-based - const cacheStart = this._cache.windowStart; + const zeroBasedFrom = from - 1; + const zeroBasedUntil = until - 1; + const cacheStart = this.cache.windowStart; // Build the result array up front - const result = new Array(u - f + 1); + const result = new Array(zeroBasedUntil - zeroBasedFrom + 1); - // Part 1: Out-of-window entries [f, min(cacheStart-1, u)] — read from disk, do not cache - const outEnd = Math.min(cacheStart - 1, u); - if (f < cacheStart && outEnd >= f) { - const count = outEnd - f + 1; + // Part 1: Out-of-window entries [zeroBasedFrom, min(cacheStart-1, zeroBasedUntil)] — read from disk, do not cache + const outEnd = Math.min(cacheStart - 1, zeroBasedUntil); + if (zeroBasedFrom < cacheStart && outEnd >= zeroBasedFrom) { + const count = outEnd - zeroBasedFrom + 1; const outBuf = Buffer.allocUnsafe(count * this.EntryClass.size); - const bytesRead = fs.readSync(this.fd, outBuf, 0, outBuf.byteLength, this.headerSize + f * this.EntryClass.size); + const bytesRead = fs.readSync(this.fd, outBuf, 0, outBuf.byteLength, this.headerSize + zeroBasedFrom * this.EntryClass.size); const entries = Math.floor(bytesRead / this.EntryClass.size); for (let idx = 0; idx < entries; idx++) { result[idx] = this.EntryClass.fromBuffer(outBuf, idx * this.EntryClass.size); } } - // Part 2: In-window entries [max(cacheStart, f), u] — use cache + disk for uncached ones - const inStart = Math.max(cacheStart, f); - if (inStart <= u) { + // Part 2: In-window entries [max(cacheStart, zeroBasedFrom), zeroBasedUntil] — use cache + disk for uncached ones + const inStart = Math.max(cacheStart, zeroBasedFrom); + if (inStart <= zeroBasedUntil) { // Optimisation: skip entries already loaded sequentially into the cache const readFrom = Math.max(this.readUntil + 1, inStart); // Trim trailing entries already present in the cache - let readUntil = u; - while (readUntil >= readFrom && this._cache.get(readUntil)) { - readUntil--; + let readUntilPos = zeroBasedUntil; + while (readUntilPos >= readFrom && this.cache.get(readUntilPos)) { + readUntilPos--; } - if (readFrom <= readUntil) { - const count = readUntil - readFrom + 1; + if (readFrom <= readUntilPos) { + const count = readUntilPos - readFrom + 1; const inBuf = Buffer.allocUnsafe(count * this.EntryClass.size); const bytesRead = fs.readSync(this.fd, inBuf, 0, inBuf.byteLength, this.headerSize + readFrom * this.EntryClass.size); const entries = Math.floor(bytesRead / this.EntryClass.size); for (let idx = 0; idx < entries; idx++) { - const i = readFrom + idx; - this._cache.set(i, this.EntryClass.fromBuffer(inBuf, idx * this.EntryClass.size)); + const index = readFrom + idx; + this.cache.set(index, this.EntryClass.fromBuffer(inBuf, idx * this.EntryClass.size)); } if (inStart <= this.readUntil + 1) { - this.readUntil = Math.max(this.readUntil, readUntil); + this.readUntil = Math.max(this.readUntil, readUntilPos); } } // Fill the result from the cache for the in-window portion - for (let i = inStart; i <= u; i++) { - result[i - f] = this._cache.get(i); + for (let index = inStart; index <= zeroBasedUntil; index++) { + result[index - zeroBasedFrom] = this.cache.get(index); } } @@ -356,13 +356,12 @@ class ReadableIndex extends events.EventEmitter { * @returns {Entry|boolean} The entry at the given index position or false if out of bounds. */ get(index) { - index = wrapAndCheck(index, this._cache.length); + index = wrapAndCheck(index, this.cache.length); if (index <= 0) { return false; } - const i = index - 1; // 0-based - const cached = this._cache.get(i); + const cached = this.cache.get(index - 1); if (cached) return cached; return this.read(index); @@ -392,26 +391,26 @@ class ReadableIndex extends events.EventEmitter { * @returns {Array|boolean} An array of entries for the given range or false on error. */ range(from, until = -1) { - from = wrapAndCheck(from, this._cache.length); - until = wrapAndCheck(until, this._cache.length); + from = wrapAndCheck(from, this.cache.length); + until = wrapAndCheck(until, this.cache.length); if (from <= 0 || until < from) { return false; } - const f = from - 1; // 0-based - const u = until - 1; // 0-based - const cacheStart = this._cache.windowStart; + const zeroBasedFrom = from - 1; + const zeroBasedUntil = until - 1; + const cacheStart = this.cache.windowStart; // Determine if any disk reads are required - const hasOutOfWindow = f < cacheStart; - const inStart = Math.max(cacheStart, f); + const hasOutOfWindow = zeroBasedFrom < cacheStart; + const inStart = Math.max(cacheStart, zeroBasedFrom); const readFrom = Math.max(this.readUntil + 1, inStart); let needsDiskRead = hasOutOfWindow; - if (!needsDiskRead && inStart <= u) { + if (!needsDiskRead && inStart <= zeroBasedUntil) { // Scan backwards for uncached in-window tail entries - let scanUntil = u; - while (scanUntil >= readFrom && this._cache.get(scanUntil)) { + let scanUntil = zeroBasedUntil; + while (scanUntil >= readFrom && this.cache.get(scanUntil)) { scanUntil--; } needsDiskRead = readFrom <= scanUntil; @@ -421,12 +420,8 @@ class ReadableIndex extends events.EventEmitter { return this.readRange(from, until); } - // All required entries are already in the cache — build result directly - const result = new Array(u - f + 1); - for (let i = f; i <= u; i++) { - result[i - f] = this._cache.get(i); - } - return result; + // All required entries are already in the cache — return a slice directly + return this.cache.slice(zeroBasedFrom, zeroBasedUntil); } /** diff --git a/src/Index/RingBuffer.js b/src/Index/RingBuffer.js index fa7adb09..447987b6 100644 --- a/src/Index/RingBuffer.js +++ b/src/Index/RingBuffer.js @@ -117,6 +117,26 @@ class RingBuffer { this._length = newLength; } + /** + * Return a copy of the cached items for the 0-based range [from, until] (inclusive). + * Both `from` and `until` must be within the current window (>= windowStart). + * + * When the range is contiguous in the internal buffer a single native slice is + * returned. When it wraps the two halves are concatenated. + * + * @param {number} from 0-based start position (inclusive). + * @param {number} until 0-based end position (inclusive). + * @returns {Array<*>} + */ + slice(from, until) { + const slotFrom = from % this._capacity; + const slotUntil = until % this._capacity; + if (slotFrom <= slotUntil) { + return this._buffer.slice(slotFrom, slotUntil + 1); + } + return this._buffer.slice(slotFrom).concat(this._buffer.slice(0, slotUntil + 1)); + } + /** * Clear all cached slots and reset `length` to 0. */ diff --git a/src/Index/WritableIndex.js b/src/Index/WritableIndex.js index d429f0c0..fae7ee78 100644 --- a/src/Index/WritableIndex.js +++ b/src/Index/WritableIndex.js @@ -193,22 +193,22 @@ class WritableIndex extends ReadableIndex { throw new Error('Consistency error. Tried to add an index that should come before existing last entry.'); } - if (this.readUntil === this._cache.length - 1) { + if (this.readUntil === this.cache.length - 1) { this.readUntil++; } - this._cache.add(entry); + this.cache.add(entry); if (this.writeBufferCursor === 0) { this.flushTimeout = setTimeout(() => this.flush(), this.flushDelay); } this.writeBufferCursor += entry.toBuffer(this.writeBuffer, this.writeBufferCursor); - this.onFlush(callback, this._cache.length); + this.onFlush(callback, this.cache.length); if (this.writeBufferCursor >= this.writeBuffer.byteLength) { this.flush(); } - return this._cache.length; + return this.cache.length; } /** @@ -232,7 +232,7 @@ class WritableIndex extends ReadableIndex { } fs.truncateSync(this.fileName, truncatePosition); - this._cache.truncate(after); + this.cache.truncate(after); this.readUntil = Math.min(this.readUntil, after); } } diff --git a/test/RingBuffer.spec.js b/test/RingBuffer.spec.js index f6bd0661..2e3f2f80 100644 --- a/test/RingBuffer.spec.js +++ b/test/RingBuffer.spec.js @@ -169,4 +169,34 @@ describe('RingBuffer', function() { }); -}); + describe('slice', function() { + + it('returns a contiguous sub-range from the buffer', function() { + const rb = new RingBuffer(4); + rb.add('a'); rb.add('b'); rb.add('c'); + expect(rb.slice(0, 2)).to.eql(['a', 'b', 'c']); + }); + + it('returns a sub-range that does not wrap', function() { + const rb = new RingBuffer(8); + ['a', 'b', 'c', 'd', 'e'].forEach(v => rb.add(v)); + expect(rb.slice(1, 3)).to.eql(['b', 'c', 'd']); + }); + + it('handles a range that wraps around the internal buffer', function() { + const rb = new RingBuffer(4); + // After 6 adds: window=[2,5], slots: [4%4=0]=>'e', [5%4=1]=>'f', [2%4=2]=>'c', [3%4=3]=>'d' + ['a', 'b', 'c', 'd', 'e', 'f'].forEach(v => rb.add(v)); + // Ask for [2, 5] (0-based, both inclusive) = 'c','d','e','f' + expect(rb.slice(2, 5)).to.eql(['c', 'd', 'e', 'f']); + }); + + it('returns a single-element slice', function() { + const rb = new RingBuffer(4); + rb.add('x'); rb.add('y'); rb.add('z'); + expect(rb.slice(1, 1)).to.eql(['y']); + }); + + }); + +}); \ No newline at end of file