diff --git a/classes/caching/memory_cache.js b/classes/caching/memory_cache.js index 1f69a85..dc328be 100644 --- a/classes/caching/memory_cache.js +++ b/classes/caching/memory_cache.js @@ -1,5 +1,15 @@ +import { Readable } from 'stream'; import { KeyTimeout } from './key_timeout.js'; +function streamToBuffer(stream) { + const chunks = []; + return new Promise((resolve, reject) => { + stream.on('data', (chunk) => chunks.push(Buffer.from(chunk))); + stream.on('error', (err) => reject(err)); + stream.on('end', () => resolve(Buffer.concat(chunks))); + }); +} + export class MemoryCache { constructor(options = {}) { this.ttl = options.ttl; @@ -8,7 +18,15 @@ export class MemoryCache { } get(key) { - return this.cache[key]; + const cachedValue = this.cache[key]; + if (cachedValue) { + return { + bodyStream: Readable.from(cachedValue.bodyBuffer), + metaData: cachedValue.metaData, + }; + } + + return undefined; } remove(key) { @@ -16,8 +34,9 @@ export class MemoryCache { delete this.cache[key]; } - set(key, value) { - this.cache[key] = value; + async set(key, bodyStream, metaData) { + const bodyBuffer = await streamToBuffer(bodyStream); + this.cache[key] = { bodyBuffer, metaData }; if (typeof this.ttl === 'number') { this.keyTimeout.updateTimeout(key, this.ttl, () => this.remove(key)); diff --git a/classes/response.js b/classes/response.js index 47f123e..a6c9341 100644 --- a/classes/response.js +++ b/classes/response.js @@ -1,42 +1,54 @@ -import stream from 'stream'; -import { Headers } from './headers.js'; +import { Response } from 'node-fetch'; +import { PassThrough } from 'stream'; -export class Response { - constructor(raw, ejectSelfFromCache, fromCache) { - Object.assign(this, raw); - this.ejectSelfFromCache = ejectSelfFromCache; - this.headers = new Headers(raw.headers); +const responseInternalSymbol = Object.getOwnPropertySymbols(new Response())[1]; + +export class NFCResponse extends Response { + constructor(bodyStream, metaData, ejectFromCache, fromCache) { + const stream1 = new PassThrough(); + const stream2 = new PassThrough(); + + bodyStream.pipe(stream1); + bodyStream.pipe(stream2); + + super(stream1, metaData); + this.ejectFromCache = ejectFromCache; this.fromCache = fromCache; - this.bodyUsed = false; - - if (this.bodyBuffer.type === 'Buffer') { - this.bodyBuffer = Buffer.from(this.bodyBuffer); - } + this.serializationStream = stream2; } - get body() { - return stream.Readable.from(this.bodyBuffer); + static fromNodeFetchResponse(res, ejectFromCache) { + const bodyStream = res.body; + const metaData = { + url: res.url, + status: res.status, + statusText: res.statusText, + headers: res.headers.raw(), + size: res.size, + timeout: res.timeout, + counter: res[responseInternalSymbol].counter, + }; + + return new NFCResponse(bodyStream, metaData, ejectFromCache, false); } - consumeBody() { - if (this.bodyUsed) { - throw new Error('Error: body used already'); - } - - this.bodyUsed = true; - return this.bodyBuffer; + static fromCachedResponse(bodyStream, rawMetaData, ejectSelfFromCache) { + return new NFCResponse(bodyStream, rawMetaData, ejectSelfFromCache, true); } - async text() { - return this.consumeBody().toString(); - } - - async json() { - return JSON.parse(this.consumeBody().toString()); - } - - async buffer() { - return this.consumeBody(); + serialize() { + return { + bodyStream: this.serializationStream, + metaData: { + url: this.url, + status: this.status, + statusText: this.statusText, + headers: this.headers.raw(), + size: this.size, + timeout: this.timeout, + counter: this[responseInternalSymbol].counter, + }, + }; } ejectFromCache() { diff --git a/index.js b/index.js index fe6a83f..9eb450f 100644 --- a/index.js +++ b/index.js @@ -2,10 +2,10 @@ import fetch from 'node-fetch'; import fs from 'fs'; import { URLSearchParams } from 'url'; import crypto from 'crypto'; -import { Response } from './classes/response.js'; +import { NFCResponse } from './classes/response.js'; import { MemoryCache } from './classes/caching/memory_cache.js'; -const CACHE_VERSION = 2; +const CACHE_VERSION = 3; function md5(str) { return crypto.createHash('md5').update(str).digest('hex'); @@ -71,21 +71,6 @@ function getCacheKey(requestArguments) { return md5(JSON.stringify([resourceCacheKeyJson, initCacheKeyJson, CACHE_VERSION])); } -async function createRawResponse(fetchRes) { - const buffer = await fetchRes.buffer(); - - return { - status: fetchRes.status, - statusText: fetchRes.statusText, - type: fetchRes.type, - url: fetchRes.url, - ok: fetchRes.ok, - headers: fetchRes.headers.raw(), - redirected: fetchRes.redirected, - bodyBuffer: buffer, - }; -} - async function getResponse(cache, requestArguments) { const cacheKey = getCacheKey(requestArguments); const cachedValue = await cache.get(cacheKey); @@ -93,13 +78,22 @@ async function getResponse(cache, requestArguments) { const ejectSelfFromCache = () => cache.remove(cacheKey); if (cachedValue) { - return new Response(cachedValue, ejectSelfFromCache, true); + if (cachedValue.bodyStream.readableEnded) { + throw new Error('Cache returned a body stream that has already been read to end.'); + } + + return NFCResponse.fromCachedResponse( + cachedValue.bodyStream, + cachedValue.metaData, + ejectSelfFromCache, + ); } const fetchResponse = await fetch(...requestArguments); - const rawResponse = await createRawResponse(fetchResponse); - await cache.set(cacheKey, rawResponse); - return new Response(rawResponse, ejectSelfFromCache, false); + const nfcResponse = NFCResponse.fromNodeFetchResponse(fetchResponse, ejectSelfFromCache); + const nfcResponseSerialized = nfcResponse.serialize(); + await cache.set(cacheKey, nfcResponseSerialized.bodyStream, nfcResponseSerialized.metaData); + return nfcResponse; } function createFetchWithCache(cache) { diff --git a/package-lock.json b/package-lock.json index 7756126..ec8aefb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1508,11 +1508,6 @@ "mime-types": "^2.1.12" } }, - "fpersist": { - "version": "1.0.5", - "resolved": "https://registry.npmjs.org/fpersist/-/fpersist-1.0.5.tgz", - "integrity": "sha512-WXY+zZXlOo1dU+wS8rqigz5PFu7WHBDd0vcaaWcnu319bPJi/IeWipOmi1PNaHAUqFVSzp1mLpNkgX6g2uLGbQ==" - }, "fromentries": { "version": "1.3.2", "resolved": "https://registry.npmjs.org/fromentries/-/fromentries-1.3.2.tgz", diff --git a/package.json b/package.json index 38c65ee..7c6e5c5 100644 --- a/package.json +++ b/package.json @@ -38,7 +38,6 @@ "rimraf": "^3.0.2" }, "dependencies": { - "fpersist": "^1.0.5", "node-fetch": "2.6.1" }, "husky": { diff --git a/test/tests.js b/test/tests.js index 5a824b6..27cc0f5 100644 --- a/test/tests.js +++ b/test/tests.js @@ -130,22 +130,22 @@ describe('Header tests', function() { it('Gets correct header keys', async function() { let { cachedFetchResponse, standardFetchResponse } = await dualFetch(TWO_HUNDRED_URL); - assert.deepStrictEqual(cachedFetchResponse.headers.keys(), [...standardFetchResponse.headers.keys()]); + assert.deepStrictEqual([...cachedFetchResponse.headers.keys()], [...standardFetchResponse.headers.keys()]); cachedFetchResponse = await cachedFetch(TWO_HUNDRED_URL); - assert.deepStrictEqual(cachedFetchResponse.headers.keys(), [...standardFetchResponse.headers.keys()]); + assert.deepStrictEqual([...cachedFetchResponse.headers.keys()], [...standardFetchResponse.headers.keys()]); }); it('Gets correct header values', async function() { let { cachedFetchResponse, standardFetchResponse } = await dualFetch(TWO_HUNDRED_URL); assert.deepStrictEqual( - removeDates(cachedFetchResponse.headers.values()), + removeDates([...cachedFetchResponse.headers.values()]), removeDates([...standardFetchResponse.headers.values()]), ); cachedFetchResponse = await cachedFetch(TWO_HUNDRED_URL); assert.deepStrictEqual( - removeDates(cachedFetchResponse.headers.values()), + removeDates([...cachedFetchResponse.headers.values()]), removeDates([...standardFetchResponse.headers.values()]), ); }); @@ -153,13 +153,13 @@ describe('Header tests', function() { it('Gets correct header entries', async function() { let { cachedFetchResponse, standardFetchResponse } = await dualFetch(TWO_HUNDRED_URL); assert.deepStrictEqual( - removeDates(cachedFetchResponse.headers.entries()), + removeDates([...cachedFetchResponse.headers.entries()]), removeDates([...standardFetchResponse.headers.entries()]), ); cachedFetchResponse = await cachedFetch(TWO_HUNDRED_URL); assert.deepStrictEqual( - removeDates(cachedFetchResponse.headers.entries()), + removeDates([...cachedFetchResponse.headers.entries()]), removeDates([...standardFetchResponse.headers.entries()]), ); }); @@ -334,7 +334,7 @@ describe('Data tests', function() { await res.text(); throw new Error('The above line should have thrown.'); } catch (err) { - assert(err.message.includes('Error: body used already')); + assert(err.message.includes('body used already for:')); } });