'use strict'; import stream from 'stream'; import utils from '../utils.js'; import throttle from './throttle.js'; import speedometer from './speedometer.js'; const kInternals = Symbol('internals'); class AxiosTransformStream extends stream.Transform{ constructor(options) { options = utils.toFlatObject(options, { maxRate: 0, chunkSize: 64 * 1024, minChunkSize: 100, timeWindow: 500, ticksRate: 2, samplesCount: 15 }, null, (prop, source) => { return !utils.isUndefined(source[prop]); }); super({ readableHighWaterMark: options.chunkSize }); const self = this; const internals = this[kInternals] = { length: options.length, timeWindow: options.timeWindow, ticksRate: options.ticksRate, chunkSize: options.chunkSize, maxRate: options.maxRate, minChunkSize: options.minChunkSize, bytesSeen: 0, isCaptured: false, notifiedBytesLoaded: 0, ts: Date.now(), bytes: 0, onReadCallback: null }; const _speedometer = speedometer(internals.ticksRate * options.samplesCount, internals.timeWindow); this.on('newListener', event => { if (event === 'progress') { if (!internals.isCaptured) { internals.isCaptured = true; } } }); let bytesNotified = 0; internals.updateProgress = throttle(function throttledHandler() { const totalBytes = internals.length; const bytesTransferred = internals.bytesSeen; const progressBytes = bytesTransferred - bytesNotified; if (!progressBytes || self.destroyed) return; const rate = _speedometer(progressBytes); bytesNotified = bytesTransferred; process.nextTick(() => { self.emit('progress', { 'loaded': bytesTransferred, 'total': totalBytes, 'progress': totalBytes ? (bytesTransferred / totalBytes) : undefined, 'bytes': progressBytes, 'rate': rate ? rate : undefined, 'estimated': rate && totalBytes && bytesTransferred <= totalBytes ? (totalBytes - bytesTransferred) / rate : undefined }); }); }, internals.ticksRate); const onFinish = () => { internals.updateProgress(true); }; this.once('end', onFinish); this.once('error', onFinish); } _read(size) { const internals = this[kInternals]; if (internals.onReadCallback) { internals.onReadCallback(); } return super._read(size); } _transform(chunk, encoding, callback) { const self = this; const internals = this[kInternals]; const maxRate = internals.maxRate; const readableHighWaterMark = this.readableHighWaterMark; const timeWindow = internals.timeWindow; const divider = 1000 / timeWindow; const bytesThreshold = (maxRate / divider); const minChunkSize = internals.minChunkSize !== false ? Math.max(internals.minChunkSize, bytesThreshold * 0.01) : 0; function pushChunk(_chunk, _callback) { const bytes = Buffer.byteLength(_chunk); internals.bytesSeen += bytes; internals.bytes += bytes; if (internals.isCaptured) { internals.updateProgress(); } if (self.push(_chunk)) { process.nextTick(_callback); } else { internals.onReadCallback = () => { internals.onReadCallback = null; process.nextTick(_callback); }; } } const transformChunk = (_chunk, _callback) => { const chunkSize = Buffer.byteLength(_chunk); let chunkRemainder = null; let maxChunkSize = readableHighWaterMark; let bytesLeft; let passed = 0; if (maxRate) { const now = Date.now(); if (!internals.ts || (passed = (now - internals.ts)) >= timeWindow) { internals.ts = now; bytesLeft = bytesThreshold - internals.bytes; internals.bytes = bytesLeft < 0 ? -bytesLeft : 0; passed = 0; } bytesLeft = bytesThreshold - internals.bytes; } if (maxRate) { if (bytesLeft <= 0) { // next time window return setTimeout(() => { _callback(null, _chunk); }, timeWindow - passed); } if (bytesLeft < maxChunkSize) { maxChunkSize = bytesLeft; } } if (maxChunkSize && chunkSize > maxChunkSize && (chunkSize - maxChunkSize) > minChunkSize) { chunkRemainder = _chunk.subarray(maxChunkSize); _chunk = _chunk.subarray(0, maxChunkSize); } pushChunk(_chunk, chunkRemainder ? () => { process.nextTick(_callback, null, chunkRemainder); } : _callback); }; transformChunk(chunk, function transformNextChunk(err, _chunk) { if (err) { return callback(err); } if (_chunk) { transformChunk(_chunk, transformNextChunk); } else { callback(null); } }); } setLength(length) { this[kInternals].length = +length; return this; } } export default AxiosTransformStream;