From 92f34fd5e36bf254e796f0268b2de1603f0dda15 Mon Sep 17 00:00:00 2001 From: larabr Date: Tue, 18 Jun 2024 10:03:01 +0200 Subject: [PATCH] transformWithCancel: fix race condition in cancellation (#43) Relevant for e.g.passiveClones. --- lib/streams.js | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/lib/streams.js b/lib/streams.js index 215d56b..591eee2 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -165,9 +165,10 @@ function transformRaw(input, options) { * @param {Function} cancel * @returns {TransformStream} */ -function transformWithCancel(cancel) { +function transformWithCancel(customCancel) { let pulled = false; - let backpressureChangePromiseResolve; + let cancelled = false; + let backpressureChangePromiseResolve, backpressureChangePromiseReject; let outputController; return { readable: new ReadableStream({ @@ -181,16 +182,29 @@ function transformWithCancel(cancel) { pulled = true; } }, - cancel + async cancel(reason) { + cancelled = true; + if (customCancel) { + await customCancel(reason); + } + if (backpressureChangePromiseReject) { + backpressureChangePromiseReject(reason); + } + } }, {highWaterMark: 0}), writable: new WritableStream({ write: async function(chunk) { + if (cancelled) { + throw new Error('Stream is cancelled'); + } outputController.enqueue(chunk); if (!pulled) { - await new Promise(resolve => { + await new Promise((resolve, reject) => { backpressureChangePromiseResolve = resolve; + backpressureChangePromiseReject = reject; }); backpressureChangePromiseResolve = null; + backpressureChangePromiseReject = null; } else { pulled = false; }