From 57ca01f9764919f032def7028fd4de9c51641ea6 Mon Sep 17 00:00:00 2001 From: larabr <7375870+larabr@users.noreply.github.com> Date: Mon, 17 Jun 2024 15:47:02 +0200 Subject: [PATCH 1/2] transformWithCancel: fix race condition in cancellation --- lib/streams.js | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/lib/streams.js b/lib/streams.js index 4671312..4314497 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -218,9 +218,9 @@ function transformRaw(input, options) { * @param {Function} cancel * @returns {TransformStream} */ -function transformWithCancel(cancel) { +function transformWithCancel(customCancel) { let pulled = false; - let backpressureChangePromiseResolve; + let backpressureChangePromiseResolve, backpressureChangePromiseReject; let outputController; return { readable: new ReadableStream({ @@ -234,16 +234,26 @@ function transformWithCancel(cancel) { pulled = true; } }, - cancel + async cancel(reason) { + if (customCancel) { + await customCancel(reason); + } + if (backpressureChangePromiseReject) { + backpressureChangePromiseReject(reason); + } + outputController.error(reason); + } }, {highWaterMark: 0}), writable: new WritableStream({ write: async function(chunk) { outputController.enqueue(chunk); if (!pulled) { - await new Promise(resolve => { + await new Promise((resolve, reject) => { backpressureChangePromiseResolve = resolve; + backpressureChangePromiseReject = reject; }); backpressureChangePromiseResolve = null; + backpressureChangePromiseReject = null; } else { pulled = false; } From 7e5c5a0ff4700861b992e23e24ce6b71b30bbd48 Mon Sep 17 00:00:00 2001 From: larabr <7375870+larabr@users.noreply.github.com> Date: Tue, 18 Jun 2024 09:42:44 +0200 Subject: [PATCH 2/2] Only error writable side in case of cancellation --- lib/streams.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/streams.js b/lib/streams.js index 4314497..308b792 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -220,6 +220,7 @@ function transformRaw(input, options) { */ function transformWithCancel(customCancel) { let pulled = false; + let cancelled = false; let backpressureChangePromiseResolve, backpressureChangePromiseReject; let outputController; return { @@ -235,17 +236,20 @@ function transformWithCancel(customCancel) { } }, async cancel(reason) { + cancelled = true; if (customCancel) { await customCancel(reason); } if (backpressureChangePromiseReject) { backpressureChangePromiseReject(reason); } - outputController.error(reason); } }, {highWaterMark: 0}), writable: new WritableStream({ write: async function(chunk) { + if (cancelled) { + throw new Error('Stream is cancelled'); + } outputController.enqueue(chunk); if (!pulled) { await new Promise((resolve, reject) => {