fix weird stream behavior in some cases

This commit is contained in:
Randall Schmidt 2021-07-09 15:21:11 -04:00
parent 675ed9bbf2
commit 6023b56164
4 changed files with 36 additions and 53 deletions

@ -24,13 +24,13 @@ export class FileSystemCache {
const metaBuffer = await cacache.get.byDigest(this.cacheDirectory, metaInfo.integrity);
const metaData = JSON.parse(metaBuffer);
const { bodyStreamIntegrity, bodyStreamLength } = metaData;
const { bodyStreamIntegrity, empty } = metaData;
delete metaData.bodyStreamIntegrity;
delete metaData.bodyStreamLength;
delete metaData.empty;
const bodyStream = bodyStreamLength > 0
? cacache.get.stream.byDigest(this.cacheDirectory, bodyStreamIntegrity)
: Readable.from(Buffer.alloc(0));
const bodyStream = empty
? Readable.from(Buffer.alloc(0))
: cacache.get.stream.byDigest(this.cacheDirectory, bodyStreamIntegrity);
return {
bodyStream,
@ -49,13 +49,13 @@ export class FileSystemCache {
]);
}
async set(key, bodyStream, metaData, bodyStreamLength) {
async set(key, bodyStream, metaData) {
const [bodyKey, metaKey] = getBodyAndMetaKeys(key);
const metaCopy = { ...metaData, bodyStreamLength };
const metaCopy = { ...metaData };
this.keyTimeout.clearTimeout(key);
if (bodyStreamLength > 0) {
try {
metaCopy.bodyStreamIntegrity = await new Promise((fulfill, reject) => {
bodyStream.pipe(cacache.put.stream(this.cacheDirectory, bodyKey))
.on('integrity', (i) => fulfill(i))
@ -63,13 +63,22 @@ export class FileSystemCache {
reject(e);
});
});
} catch (err) {
if (err.code !== 'ENODATA') {
throw err;
}
metaCopy.empty = true;
}
const metaBuffer = Buffer.from(JSON.stringify(metaCopy));
await cacache.put(this.cacheDirectory, metaKey, metaBuffer);
const cachedData = await this.get(key);
if (typeof this.ttl === 'number') {
this.keyTimeout.updateTimeout(key, this.ttl, () => this.remove(key));
}
return cachedData;
}
}

@ -41,5 +41,7 @@ export class MemoryCache {
if (typeof this.ttl === 'number') {
this.keyTimeout.updateTimeout(key, this.ttl, () => this.remove(key));
}
return this.get(key);
}
}

@ -1,24 +1,15 @@
import { Response } from 'node-fetch';
import { PassThrough } from 'stream';
const responseInternalSymbol = Object.getOwnPropertySymbols(new Response())[1];
export class NFCResponse extends Response {
constructor(bodyStream, metaData, ejectFromCache, fromCache) {
const stream1 = new PassThrough();
const stream2 = new PassThrough();
bodyStream.pipe(stream1);
bodyStream.pipe(stream2);
super(stream1, metaData);
super(bodyStream, metaData);
this.ejectFromCache = ejectFromCache;
this.fromCache = fromCache;
this.serializationStream = stream2;
}
static fromNodeFetchResponse(res, ejectFromCache) {
const bodyStream = res.body;
static serializeMetaFromNodeFetchResponse(res) {
const metaData = {
url: res.url,
status: res.status,
@ -29,30 +20,7 @@ export class NFCResponse extends Response {
counter: res[responseInternalSymbol].counter,
};
return new NFCResponse(bodyStream, metaData, ejectFromCache, false);
}
static fromCachedResponse(bodyStream, rawMetaData, ejectSelfFromCache) {
if (bodyStream.readableEnded) {
throw new Error('Cache returned a body stream that has already been read to end.');
}
return new NFCResponse(bodyStream, rawMetaData, ejectSelfFromCache, true);
}
serialize() {
return {
bodyStream: this.serializationStream,
metaData: {
url: this.url,
status: this.status,
statusText: this.statusText,
headers: this.headers.raw(),
size: this.size,
timeout: this.timeout,
counter: this[responseInternalSymbol].counter,
},
};
return metaData;
}
ejectFromCache() {

@ -95,10 +95,11 @@ async function getResponse(cache, requestArguments) {
const ejectSelfFromCache = () => cache.remove(cacheKey);
if (cachedValue) {
return NFCResponse.fromCachedResponse(
return new NFCResponse(
cachedValue.bodyStream,
cachedValue.metaData,
ejectSelfFromCache,
true,
);
}
@ -106,26 +107,29 @@ async function getResponse(cache, requestArguments) {
try {
cachedValue = await cache.get(cacheKey);
if (cachedValue) {
return NFCResponse.fromCachedResponse(
return new NFCResponse(
cachedValue.bodyStream,
cachedValue.metaData,
ejectSelfFromCache,
true,
);
}
const fetchResponse = await fetch(...requestArguments);
const nfcResponse = NFCResponse.fromNodeFetchResponse(fetchResponse, ejectSelfFromCache);
const contentLength = Number.parseInt(nfcResponse.headers.get('content-length'), 10) || 0;
const nfcResponseSerialized = nfcResponse.serialize();
const serializedMeta = NFCResponse.serializeMetaFromNodeFetchResponse(fetchResponse);
await cache.set(
const newlyCachedData = await cache.set(
cacheKey,
nfcResponseSerialized.bodyStream,
nfcResponseSerialized.metaData,
contentLength,
fetchResponse.body,
serializedMeta,
);
return nfcResponse;
return new NFCResponse(
newlyCachedData.bodyStream,
newlyCachedData.metaData,
ejectSelfFromCache,
false,
);
} finally {
locko.unlock(cacheKey);
}