overhaul to use node-fetch internals

This commit is contained in:
Randall Schmidt 2021-07-05 18:14:42 -04:00
parent 45ca35f057
commit e8ad8da0bb
6 changed files with 87 additions and 68 deletions

@ -1,5 +1,15 @@
import { Readable } from 'stream';
import { KeyTimeout } from './key_timeout.js';
function streamToBuffer(stream) {
const chunks = [];
return new Promise((resolve, reject) => {
stream.on('data', (chunk) => chunks.push(Buffer.from(chunk)));
stream.on('error', (err) => reject(err));
stream.on('end', () => resolve(Buffer.concat(chunks)));
});
}
export class MemoryCache {
constructor(options = {}) {
this.ttl = options.ttl;
@ -8,7 +18,15 @@ export class MemoryCache {
}
get(key) {
return this.cache[key];
const cachedValue = this.cache[key];
if (cachedValue) {
return {
bodyStream: Readable.from(cachedValue.bodyBuffer),
metaData: cachedValue.metaData,
};
}
return undefined;
}
remove(key) {
@ -16,8 +34,9 @@ export class MemoryCache {
delete this.cache[key];
}
set(key, value) {
this.cache[key] = value;
async set(key, bodyStream, metaData) {
const bodyBuffer = await streamToBuffer(bodyStream);
this.cache[key] = { bodyBuffer, metaData };
if (typeof this.ttl === 'number') {
this.keyTimeout.updateTimeout(key, this.ttl, () => this.remove(key));

@ -1,42 +1,54 @@
import stream from 'stream';
import { Headers } from './headers.js';
import { Response } from 'node-fetch';
import { PassThrough } from 'stream';
export class Response {
constructor(raw, ejectSelfFromCache, fromCache) {
Object.assign(this, raw);
this.ejectSelfFromCache = ejectSelfFromCache;
this.headers = new Headers(raw.headers);
const responseInternalSymbol = Object.getOwnPropertySymbols(new Response())[1];
export class NFCResponse extends Response {
constructor(bodyStream, metaData, ejectFromCache, fromCache) {
const stream1 = new PassThrough();
const stream2 = new PassThrough();
bodyStream.pipe(stream1);
bodyStream.pipe(stream2);
super(stream1, metaData);
this.ejectFromCache = ejectFromCache;
this.fromCache = fromCache;
this.bodyUsed = false;
if (this.bodyBuffer.type === 'Buffer') {
this.bodyBuffer = Buffer.from(this.bodyBuffer);
}
this.serializationStream = stream2;
}
get body() {
return stream.Readable.from(this.bodyBuffer);
static fromNodeFetchResponse(res, ejectFromCache) {
const bodyStream = res.body;
const metaData = {
url: res.url,
status: res.status,
statusText: res.statusText,
headers: res.headers.raw(),
size: res.size,
timeout: res.timeout,
counter: res[responseInternalSymbol].counter,
};
return new NFCResponse(bodyStream, metaData, ejectFromCache, false);
}
consumeBody() {
if (this.bodyUsed) {
throw new Error('Error: body used already');
}
this.bodyUsed = true;
return this.bodyBuffer;
static fromCachedResponse(bodyStream, rawMetaData, ejectSelfFromCache) {
return new NFCResponse(bodyStream, rawMetaData, ejectSelfFromCache, true);
}
async text() {
return this.consumeBody().toString();
}
async json() {
return JSON.parse(this.consumeBody().toString());
}
async buffer() {
return this.consumeBody();
serialize() {
return {
bodyStream: this.serializationStream,
metaData: {
url: this.url,
status: this.status,
statusText: this.statusText,
headers: this.headers.raw(),
size: this.size,
timeout: this.timeout,
counter: this[responseInternalSymbol].counter,
},
};
}
ejectFromCache() {

@ -2,10 +2,10 @@ import fetch from 'node-fetch';
import fs from 'fs';
import { URLSearchParams } from 'url';
import crypto from 'crypto';
import { Response } from './classes/response.js';
import { NFCResponse } from './classes/response.js';
import { MemoryCache } from './classes/caching/memory_cache.js';
const CACHE_VERSION = 2;
const CACHE_VERSION = 3;
function md5(str) {
return crypto.createHash('md5').update(str).digest('hex');
@ -71,21 +71,6 @@ function getCacheKey(requestArguments) {
return md5(JSON.stringify([resourceCacheKeyJson, initCacheKeyJson, CACHE_VERSION]));
}
async function createRawResponse(fetchRes) {
const buffer = await fetchRes.buffer();
return {
status: fetchRes.status,
statusText: fetchRes.statusText,
type: fetchRes.type,
url: fetchRes.url,
ok: fetchRes.ok,
headers: fetchRes.headers.raw(),
redirected: fetchRes.redirected,
bodyBuffer: buffer,
};
}
async function getResponse(cache, requestArguments) {
const cacheKey = getCacheKey(requestArguments);
const cachedValue = await cache.get(cacheKey);
@ -93,13 +78,22 @@ async function getResponse(cache, requestArguments) {
const ejectSelfFromCache = () => cache.remove(cacheKey);
if (cachedValue) {
return new Response(cachedValue, ejectSelfFromCache, true);
if (cachedValue.bodyStream.readableEnded) {
throw new Error('Cache returned a body stream that has already been read to end.');
}
return NFCResponse.fromCachedResponse(
cachedValue.bodyStream,
cachedValue.metaData,
ejectSelfFromCache,
);
}
const fetchResponse = await fetch(...requestArguments);
const rawResponse = await createRawResponse(fetchResponse);
await cache.set(cacheKey, rawResponse);
return new Response(rawResponse, ejectSelfFromCache, false);
const nfcResponse = NFCResponse.fromNodeFetchResponse(fetchResponse, ejectSelfFromCache);
const nfcResponseSerialized = nfcResponse.serialize();
await cache.set(cacheKey, nfcResponseSerialized.bodyStream, nfcResponseSerialized.metaData);
return nfcResponse;
}
function createFetchWithCache(cache) {

5
package-lock.json generated

@ -1508,11 +1508,6 @@
"mime-types": "^2.1.12"
}
},
"fpersist": {
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/fpersist/-/fpersist-1.0.5.tgz",
"integrity": "sha512-WXY+zZXlOo1dU+wS8rqigz5PFu7WHBDd0vcaaWcnu319bPJi/IeWipOmi1PNaHAUqFVSzp1mLpNkgX6g2uLGbQ=="
},
"fromentries": {
"version": "1.3.2",
"resolved": "https://registry.npmjs.org/fromentries/-/fromentries-1.3.2.tgz",

@ -38,7 +38,6 @@
"rimraf": "^3.0.2"
},
"dependencies": {
"fpersist": "^1.0.5",
"node-fetch": "2.6.1"
},
"husky": {

@ -130,22 +130,22 @@ describe('Header tests', function() {
it('Gets correct header keys', async function() {
let { cachedFetchResponse, standardFetchResponse } = await dualFetch(TWO_HUNDRED_URL);
assert.deepStrictEqual(cachedFetchResponse.headers.keys(), [...standardFetchResponse.headers.keys()]);
assert.deepStrictEqual([...cachedFetchResponse.headers.keys()], [...standardFetchResponse.headers.keys()]);
cachedFetchResponse = await cachedFetch(TWO_HUNDRED_URL);
assert.deepStrictEqual(cachedFetchResponse.headers.keys(), [...standardFetchResponse.headers.keys()]);
assert.deepStrictEqual([...cachedFetchResponse.headers.keys()], [...standardFetchResponse.headers.keys()]);
});
it('Gets correct header values', async function() {
let { cachedFetchResponse, standardFetchResponse } = await dualFetch(TWO_HUNDRED_URL);
assert.deepStrictEqual(
removeDates(cachedFetchResponse.headers.values()),
removeDates([...cachedFetchResponse.headers.values()]),
removeDates([...standardFetchResponse.headers.values()]),
);
cachedFetchResponse = await cachedFetch(TWO_HUNDRED_URL);
assert.deepStrictEqual(
removeDates(cachedFetchResponse.headers.values()),
removeDates([...cachedFetchResponse.headers.values()]),
removeDates([...standardFetchResponse.headers.values()]),
);
});
@ -153,13 +153,13 @@ describe('Header tests', function() {
it('Gets correct header entries', async function() {
let { cachedFetchResponse, standardFetchResponse } = await dualFetch(TWO_HUNDRED_URL);
assert.deepStrictEqual(
removeDates(cachedFetchResponse.headers.entries()),
removeDates([...cachedFetchResponse.headers.entries()]),
removeDates([...standardFetchResponse.headers.entries()]),
);
cachedFetchResponse = await cachedFetch(TWO_HUNDRED_URL);
assert.deepStrictEqual(
removeDates(cachedFetchResponse.headers.entries()),
removeDates([...cachedFetchResponse.headers.entries()]),
removeDates([...standardFetchResponse.headers.entries()]),
);
});
@ -334,7 +334,7 @@ describe('Data tests', function() {
await res.text();
throw new Error('The above line should have thrown.');
} catch (err) {
assert(err.message.includes('Error: body used already'));
assert(err.message.includes('body used already for:'));
}
});