diff --git a/docs/docs/api/CacheStore.md b/docs/docs/api/CacheStore.md index 00ceb960641..15524748ad5 100644 --- a/docs/docs/api/CacheStore.md +++ b/docs/docs/api/CacheStore.md @@ -66,10 +66,12 @@ Parameters: Returns: `GetResult | Promise | undefined` - If the request is cached, the cached response is returned. If the request's method is anything other than HEAD, the response is also returned. If the request isn't cached, `undefined` is returned. +The `get` method may return a `Promise` for async cache stores (e.g. Redis-backed or remote stores). The cache interceptor handles both synchronous and asynchronous return values, including in revalidation paths (304 Not Modified handling and stale-while-revalidate background revalidation). + Response properties: * **response** `CacheValue` - The cached response data. -* **body** `Readable | undefined` - The response's body. +* **body** `Readable | Iterable | undefined` - The response's body. This can be an array of `Buffer` chunks (with a `.values()` method) or a `Readable` stream. Both formats are supported in all code paths, including 304 revalidation. ### Function: `createWriteStream` @@ -98,8 +100,11 @@ This is an interface containing the majority of a response's data (minus the bod ### Property `vary` -`Record | undefined` - The headers defined by the response's `Vary` header -and their respective values for later comparison +`Record | undefined` - The headers defined by the response's `Vary` header +and their respective values for later comparison. Values are `null` when the +header specified in `Vary` was not present in the original request. These `null` +values are automatically filtered out during revalidation so they are not sent +as request headers. For example, for a response like ``` @@ -116,6 +121,14 @@ This would be } ``` +If the original request did not include the `accepts` header: +```js +{ + 'content-encoding': 'utf8', + accepts: null +} +``` + ### Property `cachedAt` `number` - Time in millis that this value was cached. diff --git a/lib/handler/cache-handler.js b/lib/handler/cache-handler.js index d074cb72dea..93a70e80535 100644 --- a/lib/handler/cache-handler.js +++ b/lib/handler/cache-handler.js @@ -193,57 +193,92 @@ class CacheHandler { // Not modified, re-use the cached value // https://www.rfc-editor.org/rfc/rfc9111.html#name-handling-304-not-modified if (statusCode === 304) { - /** - * @type {import('../../types/cache-interceptor.d.ts').default.CacheValue} - */ - const cachedValue = this.#store.get(this.#cacheKey) - if (!cachedValue) { - // Do not create a new cache entry, as a 304 won't have a body - so cannot be cached. - return downstreamOnHeaders() - } - - // Re-use the cached value: statuscode, statusmessage, headers and body - value.statusCode = cachedValue.statusCode - value.statusMessage = cachedValue.statusMessage - value.etag = cachedValue.etag - value.headers = { ...cachedValue.headers, ...strippedHeaders } + const handle304 = (cachedValue) => { + if (!cachedValue) { + // Do not create a new cache entry, as a 304 won't have a body - so cannot be cached. + return downstreamOnHeaders() + } - downstreamOnHeaders() + // Re-use the cached value: statuscode, statusmessage, headers and body + value.statusCode = cachedValue.statusCode + value.statusMessage = cachedValue.statusMessage + value.etag = cachedValue.etag + value.headers = { ...cachedValue.headers, ...strippedHeaders } - this.#writeStream = this.#store.createWriteStream(this.#cacheKey, value) + downstreamOnHeaders() - if (!this.#writeStream || !cachedValue?.body) { - return - } + this.#writeStream = this.#store.createWriteStream(this.#cacheKey, value) - const bodyIterator = cachedValue.body.values() + if (!this.#writeStream || !cachedValue?.body) { + return + } - const streamCachedBody = () => { - for (const chunk of bodyIterator) { - const full = this.#writeStream.write(chunk) === false - this.#handler.onResponseData?.(controller, chunk) - // when stream is full stop writing until we get a 'drain' event - if (full) { - break + if (typeof cachedValue.body.values === 'function') { + const bodyIterator = cachedValue.body.values() + + const streamCachedBody = () => { + for (const chunk of bodyIterator) { + const full = this.#writeStream.write(chunk) === false + this.#handler.onResponseData?.(controller, chunk) + // when stream is full stop writing until we get a 'drain' event + if (full) { + break + } + } } - } - } - this.#writeStream - .on('error', function () { - handler.#writeStream = undefined - handler.#store.delete(handler.#cacheKey) - }) - .on('drain', () => { + this.#writeStream + .on('error', function () { + handler.#writeStream = undefined + handler.#store.delete(handler.#cacheKey) + }) + .on('drain', () => { + streamCachedBody() + }) + .on('close', function () { + if (handler.#writeStream === this) { + handler.#writeStream = undefined + } + }) + streamCachedBody() - }) - .on('close', function () { - if (handler.#writeStream === this) { - handler.#writeStream = undefined - } - }) + } else if (typeof cachedValue.body.on === 'function') { + // Readable stream body (e.g. from async/remote cache stores) + cachedValue.body + .on('data', (chunk) => { + this.#writeStream.write(chunk) + this.#handler.onResponseData?.(controller, chunk) + }) + .on('end', () => { + this.#writeStream.end() + }) + .on('error', () => { + this.#writeStream = undefined + this.#store.delete(this.#cacheKey) + }) + + this.#writeStream + .on('error', function () { + handler.#writeStream = undefined + handler.#store.delete(handler.#cacheKey) + }) + .on('close', function () { + if (handler.#writeStream === this) { + handler.#writeStream = undefined + } + }) + } + } - streamCachedBody() + /** + * @type {import('../../types/cache-interceptor.d.ts').default.CacheValue} + */ + const result = this.#store.get(this.#cacheKey) + if (result && typeof result.then === 'function') { + result.then(handle304) + } else { + handle304(result) + } } else { if (typeof resHeaders.etag === 'string' && isEtagUsable(resHeaders.etag)) { value.etag = resHeaders.etag diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index b0449374fd4..81d7cb12cbb 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -292,7 +292,7 @@ function handleResult ( // Start background revalidation (fire-and-forget) queueMicrotask(() => { - let headers = { + const headers = { ...opts.headers, 'if-modified-since': new Date(result.cachedAt).toUTCString() } @@ -302,9 +302,10 @@ function handleResult ( } if (result.vary) { - headers = { - ...headers, - ...result.vary + for (const key in result.vary) { + if (result.vary[key] != null) { + headers[key] = result.vary[key] + } } } @@ -335,7 +336,7 @@ function handleResult ( withinStaleIfErrorThreshold = now < (result.staleAt + (staleIfErrorExpiry * 1000)) } - let headers = { + const headers = { ...opts.headers, 'if-modified-since': new Date(result.cachedAt).toUTCString() } @@ -345,9 +346,10 @@ function handleResult ( } if (result.vary) { - headers = { - ...headers, - ...result.vary + for (const key in result.vary) { + if (result.vary[key] != null) { + headers[key] = result.vary[key] + } } } diff --git a/test/interceptors/cache-async-store.js b/test/interceptors/cache-async-store.js new file mode 100644 index 00000000000..54725ac6b2a --- /dev/null +++ b/test/interceptors/cache-async-store.js @@ -0,0 +1,233 @@ +'use strict' + +const { test, after, describe } = require('node:test') +const { strictEqual, notStrictEqual } = require('node:assert') +const { createServer } = require('node:http') +const { once } = require('node:events') +const { Readable } = require('node:stream') +const { request, Client, interceptors } = require('../../index') +const MemoryCacheStore = require('../../lib/cache/memory-cache-store') +const FakeTimers = require('@sinonjs/fake-timers') +const { setTimeout } = require('node:timers/promises') + +/** + * Wraps a MemoryCacheStore to simulate an async remote store: + * - get() always returns a Promise + * - body is returned as a Readable stream instead of an array + */ +class AsyncCacheStore { + #inner + + constructor () { + this.#inner = new MemoryCacheStore() + } + + async get (key) { + const result = this.#inner.get(key) + if (!result) return undefined + + const { body, ...rest } = result + const readable = new Readable({ read () {} }) + if (body) { + for (const chunk of body) { + readable.push(chunk) + } + } + readable.push(null) + + return { ...rest, body: readable } + } + + createWriteStream (key, value) { + return this.#inner.createWriteStream(key, value) + } + + delete (key) { + return this.#inner.delete(key) + } +} + +describe('cache interceptor with async store', () => { + test('stale-while-revalidate 304 refreshes cache with async store', async () => { + const clock = FakeTimers.install({ now: 1 }) + after(() => clock.uninstall()) + + let count200 = 0 + let count304 = 0 + + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.sendDate = false + res.setHeader('Date', new Date(clock.now).toUTCString()) + res.setHeader('Cache-Control', 'public, max-age=10, stale-while-revalidate=3600') + res.setHeader('ETag', '"test-etag"') + + if (req.headers['if-none-match']) { + count304++ + res.statusCode = 304 + res.end() + } else { + res.end('hello world ' + count200++) + } + }) + + server.listen(0) + await once(server, 'listening') + + const store = new AsyncCacheStore() + const dispatcher = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.cache({ store })) + + after(async () => { + server.close() + await dispatcher.close() + }) + + const url = `http://localhost:${server.address().port}` + + // First request, populates cache + { + const res = await request(url, { dispatcher }) + strictEqual(await res.body.text(), 'hello world 0') + strictEqual(res.statusCode, 200) + strictEqual(res.headers.warning, undefined) + } + + // Advance past max-age into stale-while-revalidate window + clock.tick(12000) + + // Second request: stale, triggers background 304 revalidation + { + const res = await request(url, { dispatcher }) + strictEqual(await res.body.text(), 'hello world 0') + strictEqual(res.statusCode, 200) + strictEqual(res.headers.warning, '110 - "response is stale"') + await setTimeout(100) + } + + // Third request: should be fresh after 304 revalidation + { + clock.tick(10) + const res = await request(url, { dispatcher }) + strictEqual(await res.body.text(), 'hello world 0') + strictEqual(res.statusCode, 200) + strictEqual(res.headers.warning, undefined) + } + + strictEqual(count200, 1) + strictEqual(count304, 1) + }) + + test('stale-while-revalidate 200 refreshes cache with async store', async () => { + const clock = FakeTimers.install({ now: 1 }) + after(() => clock.uninstall()) + + let requestCount = 0 + + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.sendDate = false + res.setHeader('Date', new Date(clock.now).toUTCString()) + res.setHeader('Cache-Control', 'public, max-age=10, stale-while-revalidate=3600') + res.setHeader('ETag', `"etag-${requestCount}"`) + res.end('hello world ' + requestCount++) + }) + + server.listen(0) + await once(server, 'listening') + + const store = new AsyncCacheStore() + const dispatcher = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.cache({ store })) + + after(async () => { + server.close() + await dispatcher.close() + }) + + const url = `http://localhost:${server.address().port}` + + // First request + { + const res = await request(url, { dispatcher }) + strictEqual(await res.body.text(), 'hello world 0') + } + + // Advance past max-age + clock.tick(12000) + + // Stale response, triggers background 200 revalidation + { + const res = await request(url, { dispatcher }) + strictEqual(await res.body.text(), 'hello world 0') + strictEqual(res.headers.warning, '110 - "response is stale"') + await setTimeout(100) + } + + // Should be fresh with new content + { + clock.tick(10) + const res = await request(url, { dispatcher }) + strictEqual(await res.body.text(), 'hello world 1') + strictEqual(res.headers.warning, undefined) + } + }) + + test('null vary values are not sent in revalidation headers', async () => { + const clock = FakeTimers.install({ now: 1 }) + after(() => clock.uninstall()) + + let revalidationHeaders = null + + const server = createServer({ joinDuplicateHeaders: true }, (req, res) => { + res.sendDate = false + res.setHeader('Date', new Date(clock.now).toUTCString()) + res.setHeader('Cache-Control', 'public, max-age=10, stale-while-revalidate=3600') + res.setHeader('ETag', '"test-etag"') + res.setHeader('Vary', 'X-Custom-Header, X-Another-Header') + + if (req.headers['if-none-match']) { + revalidationHeaders = { ...req.headers } + res.statusCode = 304 + res.end() + } else { + res.end('hello world') + } + }) + + server.listen(0) + await once(server, 'listening') + + const store = new AsyncCacheStore() + const dispatcher = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.cache({ store })) + + after(async () => { + server.close() + await dispatcher.close() + }) + + const url = `http://localhost:${server.address().port}` + + // First request without X-Custom-Header or X-Another-Header + // These will be stored as null in the vary record + { + const res = await request(url, { dispatcher }) + strictEqual(await res.body.text(), 'hello world') + } + + // Advance past max-age + clock.tick(12000) + + // Trigger stale-while-revalidate + { + const res = await request(url, { dispatcher }) + strictEqual(res.headers.warning, '110 - "response is stale"') + await setTimeout(100) + } + + // Verify the revalidation request did NOT include null vary headers + notStrictEqual(revalidationHeaders, null) + strictEqual(revalidationHeaders['x-custom-header'], undefined) + strictEqual(revalidationHeaders['x-another-header'], undefined) + strictEqual(revalidationHeaders['if-none-match'], '"test-etag"') + }) +})