Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions handwritten/storage/src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,17 @@ class File extends ServiceObject<File, FileMetadata> {

const shouldRunValidation = !rangeRequest && (crc32c || md5);

const cleanupRequest = () => {
if (request?.agent) {
request.agent.destroy();
}
};

const cleanupRawResponse = (rawResponseStream: Readable) => {
rawResponseStream.destroy();
cleanupRequest();
};

if (rangeRequest) {
if (
typeof options.validation === 'string' ||
Expand All @@ -1590,9 +1601,7 @@ class File extends ServiceObject<File, FileMetadata> {
if (err) {
// There is an issue with node-fetch 2.x that if the stream errors the underlying socket connection is not closed.
// This causes a memory leak, so cleanup the sockets manually here by destroying the agent.
if (request?.agent) {
request.agent.destroy();
}
cleanupRequest();
throughStream.destroy(err);
}
};
Expand Down Expand Up @@ -1622,6 +1631,11 @@ class File extends ServiceObject<File, FileMetadata> {
}

request = (rawResponseStream as r.Response).request;
if (throughStream.destroyed) {
cleanupRawResponse(rawResponseStream as Readable);
return;
}

const headers = (rawResponseStream as ResponseBody).toJSON().headers;
const isCompressed = headers['content-encoding'] === 'gzip';
const hashes: {crc32c?: string; md5?: string} = {};
Expand Down Expand Up @@ -2178,12 +2192,36 @@ class File extends ServiceObject<File, FileMetadata> {
});

writeStream.once('writing', () => {
const onPrePipelineError = (error: Error) => {
pipelineCallback(error);
};
fileWriteStream.once('error', onPrePipelineError);

if (options.resumable === false) {
this.startSimpleUpload_(fileWriteStream, options);
} else {
this.startResumableUpload_(fileWriteStream, options);
}

if (
fileWriteStream.destroyed ||
writeStream.destroyed ||
emitStream.destroyed
) {
// Destroying an upload stream can queue its terminal error before
// close, so keep the temporary listener until teardown completes.
fileWriteStream.once('close', () => {
fileWriteStream.removeListener('error', onPrePipelineError);
});
if (!fileWriteStream.destroyed) {
fileWriteStream.destroy();
}
emitStream.destroy();
return;
}
Comment thread
rockwotj marked this conversation as resolved.

fileWriteStream.removeListener('error', onPrePipelineError);

// remove temporary noop listener as we now create a pipeline that handles the errors
emitStream.removeListener('error', noop);

Expand Down
64 changes: 64 additions & 0 deletions handwritten/storage/test/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,49 @@ describe('File', () => {
file.createReadStream().resume();
});

it('should clean up if the returned stream is destroyed before the response is piped', done => {
const rawResponseStream = new PassThrough();
const agentDestroy = sinon.spy();
Object.assign(rawResponseStream, {
request: {
agent: {
destroy: agentDestroy,
},
},
toJSON() {
return {headers: {}};
},
});

handleRespOverride = (
_err: Error,
_res: {},
_body: {},
callback: Function
) => {
callback(null, null, rawResponseStream);
};

file.requestStream = () => {
const requestStream = new PassThrough();
setImmediate(() => {
requestStream.emit('response', rawResponseStream);
});
return requestStream;
};

const readStream = file.createReadStream({validation: false});
readStream.once('response', () => {
readStream.destroy();
});
readStream.once('close', () => {
assert.strictEqual(rawResponseStream.destroyed, true);
assert.strictEqual(agentDestroy.calledOnce, true);
done();
});
readStream.resume();
});

describe('errors', () => {
const ERROR = new Error('Error.');

Expand Down Expand Up @@ -2106,6 +2149,27 @@ describe('File', () => {
writable.write('data');
});

it('should clean up if the returned stream is destroyed during upload startup', done => {
const writable = file.createWriteStream();
let fileWriteStream: duplexify.Duplexify | undefined;

file.startResumableUpload_ = (stream: duplexify.Duplexify) => {
fileWriteStream = stream;
writable.destroy();
};

writable.on('close', () => {
setImmediate(() => {
assert(fileWriteStream);
assert.strictEqual(fileWriteStream.destroyed, true);
assert.strictEqual(fileWriteStream.listenerCount('error'), 0);
done();
});
});

writable.write('data');
});

it('should alias contentType to metadata object', done => {
const contentType = 'text/html';
const writable = file.createWriteStream({contentType});
Expand Down
Loading