diff --git a/index.bs b/index.bs index 692c85fcc..471d7d772 100644 --- a/index.bs +++ b/index.bs @@ -1040,7 +1040,7 @@ default-reader-asynciterator-prototype-internal-slots">Asynchronous iteration 0, 1. Set |controller|.[=ReadableByteStreamController/[[closeRequested]]=] to true. 1. Return. - 1. If |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is not empty, + 1. If |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is not + [=list/is empty|empty=], 1. Let |firstPendingPullInto| be |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=][0]. 1. If |firstPendingPullInto|'s [=pull-into descriptor/bytes filled=] > 0, @@ -3225,8 +3226,7 @@ The following abstract operations support the implementation of the 1. If ! [$ReadableStreamHasDefaultReader$](|stream|) is true, 1. Perform ! [$ReadableByteStreamControllerProcessReadRequestsUsingQueue$](|controller|). 1. If ! [$ReadableStreamGetNumReadRequests$](|stream|) is 0, - 1. Assert: |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is - [=list/is empty|empty=]. + 1. Assert: |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] [=list/is empty=]. 1. Perform ! [$ReadableByteStreamControllerEnqueueChunkToQueue$](|controller|, |transferredBuffer|, |byteOffset|, |byteLength|). 1. Otherwise, @@ -3504,7 +3504,8 @@ The following abstract operations support the implementation of the |byteLength|, [=pull-into descriptor/bytes filled=] 0, [=pull-into descriptor/element size=] |elementSize|, [=pull-into descriptor/view constructor=] |ctor|, and [=pull-into descriptor/reader type=] "`byob`". - 1. If |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is not empty, + 1. If |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is not + [=list/is empty|empty=], 1. [=list/Append=] |pullIntoDescriptor| to |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=]. 1. Perform ! [$ReadableStreamAddReadIntoRequest$](|stream|, |readIntoRequest|). @@ -3538,7 +3539,8 @@ The following abstract operations support the implementation of the id="readable-byte-stream-controller-respond">ReadableByteStreamControllerRespond(|controller|, |bytesWritten|) performs the following steps: - 1. Assert: |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is not empty. + 1. Assert: |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=] is not + [=list/is empty|empty=]. 1. Let |firstDescriptor| be |controller|.[=ReadableByteStreamController/[[pendingPullIntos]]=][0]. 1. Let |state| be |controller|.[=ReadableByteStreamController/[[stream]]=].[=ReadableStream/[[state]]=]. @@ -4293,11 +4295,12 @@ following table:
await writer.{{WritableStreamDefaultWriter/ready}}
-

Returns a promise that will be fulfilled when the [=desired size to fill a stream's internal - queue|desired size to fill the stream's internal queue=] transitions from non-positive to - positive, signaling that it is no longer applying [=backpressure=]. Once the [=desired size to - fill a stream's internal queue|desired size=] dips back to zero or below, the getter will return - a new promise that stays pending until the next transition. +

Returns a promise that will be fulfilled when either the [=desired size to fill a stream's + internal queue|desired size to fill the stream's internal queue=] transitions from non-positive to + positive or when the stream calls {{WritableStreamDefaultController/releaseBackpressure()}}, + signaling that it is no longer applying [=backpressure=]. + Once the [=desired size to fill a stream's internal queue|desired size=] dips back to zero or below, + the getter will return a new promise that stays pending until the next transition.

If the stream becomes errored or aborted, or the writer's lock is [=release a write lock|released=], the returned promise will become rejected. @@ -4423,6 +4426,7 @@ The Web IDL definition for the {{WritableStreamDefaultController}} class is give interface WritableStreamDefaultController { readonly attribute AbortSignal signal; undefined error(optional any e); + undefined releaseBackpressure(); }; @@ -4452,6 +4456,10 @@ the following table: \[[queueTotalSize]] The total size of all the chunks stored in [=WritableStreamDefaultController/[[queue]]=] (see [[#queue-with-sizes]]) + + \[[releaseBackpressure]] + A boolean flag indicating whether to release backpressure until the + next chunk is written \[[signal]] An {{AbortSignal}} that can be used to abort the pending write or @@ -4498,6 +4506,17 @@ closed. It is only used internally, and is never exposed to web developers. the [=underlying sink=]'s methods. However, it can be useful for suddenly shutting down a stream in response to an event outside the normal lifecycle of interactions with the [=underlying sink=]. +

controller.{{WritableStreamDefaultController/releaseBackpressure()|releaseBackpressure}}() +
+

Releases [=backpressure=] until the next chunk is written. If there are still chunks in the queue, + this does nothing. + +

Usually, backpressure is automatically released when the [=desired size to fill a stream's + internal queue|desired size to fill the stream's internal queue=] becomes positive. However, + if the stream is created with a [=high water mark=] of zero, then the desired size is always at or + below zero, and the stream would always apply backpressure. + This method allows the [=underlying sink=] to manually release backpressure, for example when it + knows that now is a good time for the [=producer=] to write a new chunk.

@@ -4516,6 +4535,15 @@ closed. It is only used internally, and is never exposed to web developers. 1. Perform ! [$WritableStreamDefaultControllerError$]([=this=], |e|).
+
+ The releaseBackpressure() method steps are: + + 1. Let |state| be [=this=].[=WritableStreamDefaultController/[[stream]]=].[=WritableStream/[[state]]=]. + 1. If |state| is not "`writable`", return. + 1. Perform ! [$WritableStreamDefaultControllerReleaseBackpressure$]([=this=]). +
+

Internal methods

The following are internal methods implemented by each {{WritableStreamDefaultController}} instance. @@ -4881,7 +4909,7 @@ the {{WritableStream}}'s public API. performs the following steps: 1. Assert: |stream|.[=WritableStream/[[inFlightWriteRequest]]=] is undefined. - 1. Assert: |stream|.[=WritableStream/[[writeRequests]]=] is not empty. + 1. Assert: |stream|.[=WritableStream/[[writeRequests]]=] is not [=list/is empty|empty=]. 1. Let |writeRequest| be |stream|.[=WritableStream/[[writeRequests]]=][0]. 1. [=list/Remove=] |writeRequest| from |stream|.[=WritableStream/[[writeRequests]]=]. 1. Set |stream|.[=WritableStream/[[inFlightWriteRequest]]=] to |writeRequest|. @@ -5080,6 +5108,7 @@ The following abstract operations support the implementation of the 1. Set |controller|.[=WritableStreamDefaultController/[[stream]]=] to |stream|. 1. Set |stream|.[=WritableStream/[[controller]]=] to |controller|. 1. Perform ! [$ResetQueue$](|controller|). + 1. Set |controller|.[=WritableStreamDefaultController/[[releaseBackpressure]]=] to false. 1. Set |controller|.[=WritableStreamDefaultController/[[signal]]=] to a new {{AbortSignal}}. 1. Set |controller|.[=WritableStreamDefaultController/[[started]]=] to false. 1. Set |controller|.[=WritableStreamDefaultController/[[strategySizeAlgorithm]]=] to @@ -5146,7 +5175,7 @@ The following abstract operations support the implementation of the 1. If |state| is "`erroring`", 1. Perform ! [$WritableStreamFinishErroring$](|stream|). 1. Return. - 1. If |controller|.[=WritableStreamDefaultController/[[queue]]=] is empty, return. + 1. If |controller|.[=WritableStreamDefaultController/[[queue]]=] [=list/is empty=], return. 1. Let |value| be ! [$PeekQueueValue$](|controller|). 1. If |value| is the [=close sentinel=], perform ! [$WritableStreamDefaultControllerProcessClose$](|controller|). @@ -5211,6 +5240,8 @@ The following abstract operations support the implementation of the id="writable-stream-default-controller-get-backpressure">WritableStreamDefaultControllerGetBackpressure(|controller|) performs the following steps: + 1. If |controller|.[=WritableStreamDefaultController/[[releaseBackpressure]]=] is true and + |controller|.[=WritableStreamDefaultController/[[queue]]=] [=list/is empty=], return false. 1. Let |desiredSize| be ! [$WritableStreamDefaultControllerGetDesiredSize$](|controller|). 1. Return true if |desiredSize| ≤ 0, or false otherwise. @@ -5247,7 +5278,7 @@ The following abstract operations support the implementation of the 1. Let |stream| be |controller|.[=WritableStreamDefaultController/[[stream]]=]. 1. Perform ! [$WritableStreamMarkCloseRequestInFlight$](|stream|). 1. Perform ! [$DequeueValue$](|controller|). - 1. Assert: |controller|.[=WritableStreamDefaultController/[[queue]]=] is empty. + 1. Assert: |controller|.[=WritableStreamDefaultController/[[queue]]=] [=list/is empty=]. 1. Let |sinkClosePromise| be the result of performing |controller|.[=WritableStreamDefaultController/[[closeAlgorithm]]=]. 1. Perform ! [$WritableStreamDefaultControllerClearAlgorithms$](|controller|). @@ -5264,6 +5295,7 @@ The following abstract operations support the implementation of the 1. Let |stream| be |controller|.[=WritableStreamDefaultController/[[stream]]=]. 1. Perform ! [$WritableStreamMarkFirstWriteRequestInFlight$](|stream|). + 1. Set |controller|.[=WritableStreamDefaultController/[[releaseBackpressure]]=] to false. 1. Let |sinkWritePromise| be the result of performing |controller|.[=WritableStreamDefaultController/[[writeAlgorithm]]=], passing in |chunk|. 1. [=Upon fulfillment=] of |sinkWritePromise|, @@ -5281,6 +5313,19 @@ The following abstract operations support the implementation of the 1. Perform ! [$WritableStreamFinishInFlightWriteWithError$](|stream|, |reason|). +
+ WritableStreamDefaultControllerReleaseBackpressure(|controller|) + performs the following steps: + + 1. Let |stream| be |controller|.[=WritableStreamDefaultController/[[stream]]=]. + 1. Assert: |stream|.[=WritableStream/[[state]]=] is "`writable`". + 1. Set |controller|.[=WritableStreamDefaultController/[[releaseBackpressure]]=] to true. + 1. If ! [$WritableStreamHasOperationMarkedInFlight$](|stream|) is false and + ! [$WritableStreamCloseQueuedOrInFlight$](|stream|) is false, + 1. Let |backpressure| be ! [$WritableStreamDefaultControllerGetBackpressure$](|controller|). + 1. Perform ! [$WritableStreamUpdateBackpressure$](|stream|, |backpressure|). +
+
WritableStreamDefaultControllerWrite(|controller|, @@ -5291,6 +5336,7 @@ The following abstract operations support the implementation of the 1. Perform ! [$WritableStreamDefaultControllerErrorIfNeeded$](|controller|, |enqueueResult|.\[[Value]]). 1. Return. + 1. Set |controller|.[=WritableStreamDefaultController/[[releaseBackpressure]]=] to false. 1. Let |stream| be |controller|.[=WritableStreamDefaultController/[[stream]]=]. 1. If ! [$WritableStreamCloseQueuedOrInFlight$](|stream|) is false and |stream|.[=WritableStream/[[state]]=] is "`writable`", @@ -5803,6 +5849,9 @@ The following abstract operations operate on {{TransformStream}} instances at a stream.[=TransformStream/[[backpressureChangePromise]]=] with undefined. 1. Set |stream|.[=TransformStream/[[backpressureChangePromise]]=] to [=a new promise=]. 1. Set |stream|.[=TransformStream/[[backpressure]]=] to |backpressure|. + 1. If |backpressure| is false and |stream|.[=TransformStream/[[writable]]=].[=WritableStream/[[state]]=] + is "`writable`", perform ! + [$WritableStreamDefaultControllerReleaseBackpressure$](|stream|.[=TransformStream/[[writable]]=].[=WritableStream/[[controller]]=]).

Default controllers

@@ -6903,7 +6952,7 @@ for="ReadableStream">locked if ! [$IsReadableStreamLocked$](|stream|) retu up">writeAlgorithm, an optional algorithm closeAlgorithm, an optional algorithm abortAlgorithm, an optional number highWaterMark (default 1), an optional algorithm highWaterMark (default 1), and an optional algorithm sizeAlgorithm, perform the following steps. |writeAlgorithm| must be an algorithm that accepts a [=chunk=] object and returns a promise. If given, |closeAlgorithm| and |abortAlgorithm| may return a promise. If given, |sizeAlgorithm| must @@ -6984,14 +7033,22 @@ reason.
To set up a newly-[=new|created-via-Web IDL=] {{TransformStream}} |stream| given an algorithm transformAlgorithm and an optional algorithm flushAlgorithm, perform the following steps. + for="TransformStream/set up">transformAlgorithm, an optional algorithm flushAlgorithm, an optional number writableHighWaterMark (default 1), an optional + algorithm writableSizeAlgorithm, an + optional number readableHighWaterMark + (default 0), and an optional algorithm readableSizeAlgorithm, perform the following steps. |transformAlgorithm| and, if given, |flushAlgorithm|, may return a promise. + If given, |writableSizeAlgorithm| and |readableSizeAlgorithm| must be algorithms accepting + [=chunk=] objects and returning a number; and if given, |writableHighWaterMark| and + |readableHighWaterMark| must be non-negative, non-NaN numbers. - 1. Let |writableHighWaterMark| be 1. - 1. Let |writableSizeAlgorithm| be an algorithm that returns 1. - 1. Let |readableHighWaterMark| be 0. - 1. Let |readableSizeAlgorithm| be an algorithm that returns 1. + 1. If |writableSizeAlgorithm| was not given, let |writableSizeAlgorithm| be an algorithm that + returns 1. + 1. If |readableSizeAlgorithm| was not given, let |readableSizeAlgorithm| be an algorithm that + returns 1. 1. Let |transformAlgorithmWrapper| be an algorithm that runs these steps given a value |chunk|: 1. Let |result| be the result of running |transformAlgorithm| given |chunk|. If this throws an exception |e|, return [=a promise rejected with=] |e|. @@ -7025,9 +7082,11 @@ reason. an identity {{TransformStream}}: 1. Let |transformStream| be a [=new=] {{TransformStream}}. + 1. Let |transformAlgorithm| be an algorithm which, given |chunk|, [=TransformStream/enqueues=] + |chunk| in |transformStream|. 1. [=TransformStream/Set up=] |transformStream| with [=TransformStream/set up/transformAlgorithm=] set to an algorithm which, given - |chunk|, [=TransformStream/enqueues=] |chunk| in |transformStream|. + ignore>[=TransformStream/set up/transformAlgorithm=] set to |transformAlgorithm| and [=TransformStream/set up/writableHighWaterMark=] set to 0. 1. Return |transformStream|.
@@ -7036,7 +7095,7 @@ reason. The following algorithms must only be used on {{TransformStream}} instances initialized via the above [=TransformStream/set up=] algorithm. Usually they are called as part of [=TransformStream/set up/transformAlgorithm=] or -[=TransformStream/set up/flushAlgorithm=]. +[=TransformStream/set up/flushAlgorithm=].

To enqueue the JavaScript value |chunk| into a {{TransformStream}} |stream|, perform ! @@ -7079,8 +7138,7 @@ Including the {{GenericTransformStream}} mixin will give an IDL interface the ap the behavior of the resulting interface, its constructor (or other initialization code) must set each instance's [=GenericTransformStream/transform=] to a [=new=] {{TransformStream}}, and then [=TransformStream/set up|set it up=] with appropriate customizations via the -[=TransformStream/set up/transformAlgorithm=] and optionally -[=TransformStream/set up/flushAlgorithm=] arguments. +[=TransformStream/set up/transformAlgorithm=] and any optional arguments. Note: Existing examples of this pattern on the web platform include {{CompressionStream}} and {{TextDecoderStream}}. [[COMPRESSION]] [[ENCODING]] diff --git a/reference-implementation/lib/WritableStreamDefaultController-impl.js b/reference-implementation/lib/WritableStreamDefaultController-impl.js index 1e4b58a51..d60e226e5 100644 --- a/reference-implementation/lib/WritableStreamDefaultController-impl.js +++ b/reference-implementation/lib/WritableStreamDefaultController-impl.js @@ -21,6 +21,14 @@ exports.implementation = class WritableStreamDefaultControllerImpl { aos.WritableStreamDefaultControllerError(this, e); } + releaseBackpressure() { + const state = this._stream._state; + if (state !== 'writable') { + return; + } + aos.WritableStreamDefaultControllerReleaseBackpressure(this); + } + [AbortSteps](reason) { const result = this._abortAlgorithm(reason); aos.WritableStreamDefaultControllerClearAlgorithms(this); diff --git a/reference-implementation/lib/WritableStreamDefaultController.webidl b/reference-implementation/lib/WritableStreamDefaultController.webidl index 25139e6ff..b08eb71ee 100644 --- a/reference-implementation/lib/WritableStreamDefaultController.webidl +++ b/reference-implementation/lib/WritableStreamDefaultController.webidl @@ -2,4 +2,5 @@ interface WritableStreamDefaultController { readonly attribute AbortSignal signal; undefined error(optional any e); + undefined releaseBackpressure(); }; diff --git a/reference-implementation/lib/abstract-ops/transform-streams.js b/reference-implementation/lib/abstract-ops/transform-streams.js index 8e3f5fcc3..eefa08a33 100644 --- a/reference-implementation/lib/abstract-ops/transform-streams.js +++ b/reference-implementation/lib/abstract-ops/transform-streams.js @@ -7,7 +7,8 @@ const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, tr const { CreateReadableStream, ReadableStreamDefaultControllerClose, ReadableStreamDefaultControllerEnqueue, ReadableStreamDefaultControllerError, ReadableStreamDefaultControllerHasBackpressure, ReadableStreamDefaultControllerCanCloseOrEnqueue } = require('./readable-streams.js'); -const { CreateWritableStream, WritableStreamDefaultControllerErrorIfNeeded } = require('./writable-streams.js'); +const { CreateWritableStream, WritableStreamDefaultControllerErrorIfNeeded, + WritableStreamDefaultControllerReleaseBackpressure } = require('./writable-streams.js'); const TransformStream = require('../../generated/TransformStream.js'); const TransformStreamDefaultController = require('../../generated/TransformStreamDefaultController.js'); @@ -98,6 +99,10 @@ function TransformStreamSetBackpressure(stream, backpressure) { stream._backpressureChangePromise = newPromise(); stream._backpressure = backpressure; + + if (backpressure === false && stream._writable._state === 'writable') { + WritableStreamDefaultControllerReleaseBackpressure(stream._writable._controller); + } } // Default controllers diff --git a/reference-implementation/lib/abstract-ops/writable-streams.js b/reference-implementation/lib/abstract-ops/writable-streams.js index cf303bfe7..7aeb78bc1 100644 --- a/reference-implementation/lib/abstract-ops/writable-streams.js +++ b/reference-implementation/lib/abstract-ops/writable-streams.js @@ -27,6 +27,7 @@ Object.assign(exports, { WritableStreamDefaultControllerClearAlgorithms, WritableStreamDefaultControllerError, WritableStreamDefaultControllerErrorIfNeeded, + WritableStreamDefaultControllerReleaseBackpressure, WritableStreamDefaultWriterAbort, WritableStreamDefaultWriterClose, WritableStreamDefaultWriterCloseWithErrorPropagation, @@ -541,6 +542,7 @@ function SetUpWritableStreamDefaultController(stream, controller, startAlgorithm controller._queueTotalSize = undefined; ResetQueue(controller); + controller._releaseBackpressure = false; controller._abortController = new AbortController(); controller._started = false; @@ -657,6 +659,9 @@ function WritableStreamDefaultControllerErrorIfNeeded(controller, error) { } function WritableStreamDefaultControllerGetBackpressure(controller) { + if (controller._releaseBackpressure === true && controller._queue.length === 0) { + return false; + } const desiredSize = WritableStreamDefaultControllerGetDesiredSize(controller); return desiredSize <= 0; } @@ -699,6 +704,7 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) { const stream = controller._stream; WritableStreamMarkFirstWriteRequestInFlight(stream); + controller._releaseBackpressure = false; const sinkWritePromise = controller._writeAlgorithm(chunk); uponPromise( @@ -727,6 +733,19 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) { ); } +function WritableStreamDefaultControllerReleaseBackpressure(controller) { + const stream = controller._stream; + assert(stream._state === 'writable'); + + controller._releaseBackpressure = true; + + if (WritableStreamHasOperationMarkedInFlight(stream) === false && + WritableStreamCloseQueuedOrInFlight(stream) === false) { + const backpressure = WritableStreamDefaultControllerGetBackpressure(controller); + WritableStreamUpdateBackpressure(stream, backpressure); + } +} + function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) { try { EnqueueValueWithSize(controller, chunk, chunkSize); @@ -735,6 +754,8 @@ function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) { return; } + controller._releaseBackpressure = false; + const stream = controller._stream; if (WritableStreamCloseQueuedOrInFlight(stream) === false && stream._state === 'writable') { const backpressure = WritableStreamDefaultControllerGetBackpressure(controller); diff --git a/reference-implementation/web-platform-tests b/reference-implementation/web-platform-tests index 99d74f952..db995f6f8 160000 --- a/reference-implementation/web-platform-tests +++ b/reference-implementation/web-platform-tests @@ -1 +1 @@ -Subproject commit 99d74f9529e16ec0722ef11136ab29b9e80fff26 +Subproject commit db995f6f8b4d236444ad67fa525ce4237cdf3f2f