Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull various ReadableStream fixes from WebKit #13840

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/bun.js/bindings/ErrorCode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export default [
["ERR_ILLEGAL_CONSTRUCTOR", TypeError, "TypeError"],
["ERR_INVALID_URL", TypeError, "TypeError"],
["ERR_BUFFER_TOO_LARGE", RangeError, "RangeError"],
["ERR_STREAM_RELEASE_LOCK", Error, "AbortError"],

// Bun-specific
["ERR_FORMDATA_PARSE_ERROR", TypeError, "TypeError"],
Expand Down
10 changes: 9 additions & 1 deletion src/js/builtins/ReadableStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,15 @@ export function pipeThrough(this, streams, options) {

if ($isWritableStreamLocked(internalWritable)) throw $makeTypeError("WritableStream is locked");

$readableStreamPipeToWritableStream(this, internalWritable, preventClose, preventAbort, preventCancel, signal);
const promise = $readableStreamPipeToWritableStream(
this,
internalWritable,
preventClose,
preventAbort,
preventCancel,
signal,
);
$markPromiseAsHandled(promise);

return readable;
}
Expand Down
5 changes: 1 addition & 4 deletions src/js/builtins/ReadableStreamDefaultReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,7 @@ export function releaseLock(this) {

if (!$getByIdDirectPrivate(this, "ownerReadableStream")) return;

if ($getByIdDirectPrivate(this, "readRequests")?.isNotEmpty())
throw new TypeError("There are still pending read requests, cannot release the lock");

$readableStreamReaderGenericRelease(this);
$readableStreamDefaultReaderRelease(this);
}

$getter;
Expand Down
38 changes: 27 additions & 11 deletions src/js/builtins/ReadableStreamInternals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,10 @@ export function pipeToDoReadWrite(pipeState) {
pipeState.pendingReadPromiseCapability.resolve.$call(undefined, canWrite);
if (!canWrite) return;

pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value);
pipeState.pendingWritePromise = $writableStreamDefaultWriterWrite(pipeState.writer, result.value).$then(
undefined,
() => {},
);
},
e => {
pipeState.pendingReadPromiseCapability.resolve.$call(undefined, false);
Expand Down Expand Up @@ -396,7 +399,7 @@ export function pipeToClosingMustBePropagatedForward(pipeState) {
action();
return;
}
$getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").promise.$then(action, undefined);
$getByIdDirectPrivate(pipeState.reader, "closedPromiseCapability").promise.$then(action, () => {});
}

export function pipeToClosingMustBePropagatedBackward(pipeState) {
Expand Down Expand Up @@ -1367,20 +1370,18 @@ export function readableStreamError(stream, error) {

if (!reader) return;

$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(undefined, error);
const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
$markPromiseAsHandled(promise);

if ($isReadableStreamDefaultReader(reader)) {
const requests = $getByIdDirectPrivate(reader, "readRequests");
$putByIdDirectPrivate(reader, "readRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
$readableStreamDefaultReaderErrorReadRequests(reader, error);
} else {
$assert($isReadableStreamBYOBReader(reader));
const requests = $getByIdDirectPrivate(reader, "readIntoRequests");
$putByIdDirectPrivate(reader, "readIntoRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
}

$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(undefined, error);
const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
$markPromiseAsHandled(promise);
}

export function readableStreamDefaultControllerShouldCallPull(controller) {
Expand Down Expand Up @@ -1608,6 +1609,15 @@ export function isReadableStreamDisturbed(stream) {
return stream.$disturbed;
}

$visibility = "Private";
export function readableStreamDefaultReaderRelease(reader) {
$readableStreamReaderGenericRelease(reader);
$readableStreamDefaultReaderErrorReadRequests(
reader,
$ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()"),
);
}

$visibility = "Private";
export function readableStreamReaderGenericRelease(reader) {
$assert(!!$getByIdDirectPrivate(reader, "ownerReadableStream"));
Expand All @@ -1616,11 +1626,11 @@ export function readableStreamReaderGenericRelease(reader) {
if ($getByIdDirectPrivate($getByIdDirectPrivate(reader, "ownerReadableStream"), "state") === $streamReadable)
$getByIdDirectPrivate(reader, "closedPromiseCapability").reject.$call(
undefined,
$makeTypeError("releasing lock of reader whose stream is still in readable state"),
$ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()"),
);
else
$putByIdDirectPrivate(reader, "closedPromiseCapability", {
promise: $newHandledRejectedPromise($makeTypeError("reader released lock")),
promise: $newHandledRejectedPromise($ERR_STREAM_RELEASE_LOCK("Stream reader cancelled via releaseLock()")),
});

const promise = $getByIdDirectPrivate(reader, "closedPromiseCapability").promise;
Expand All @@ -1636,6 +1646,12 @@ export function readableStreamReaderGenericRelease(reader) {
$putByIdDirectPrivate(reader, "ownerReadableStream", undefined);
}

export function readableStreamDefaultReaderErrorReadRequests(reader, error) {
const requests = $getByIdDirectPrivate(reader, "readRequests");
$putByIdDirectPrivate(reader, "readRequests", $createFIFO());
for (var request = requests.shift(); request; request = requests.shift()) $rejectPromise(request, error);
}

export function readableStreamDefaultControllerCanCloseOrEnqueue(controller) {
if ($getByIdDirectPrivate(controller, "closeRequested")) {
return false;
Expand Down
88 changes: 88 additions & 0 deletions test/js/web/streams/streams.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,55 @@ it("ReadableStream for empty file closes immediately", async () => {
expect(chunks.length).toBe(0);
});

it("ReadableStream errors the stream on pull rejection", async () => {
let stream = new ReadableStream({
pull(controller) {
return Promise.reject("pull rejected");
},
});

let reader = stream.getReader();
let closed = reader.closed.catch(err => `closed: ${err}`);
let read = reader.read().catch(err => `read: ${err}`);
expect(await Promise.race([closed, read])).toBe("closed: pull rejected");
expect(await read).toBe("read: pull rejected");
});

it("ReadableStream rejects pending reads when the lock is released", async () => {
let { resolve, promise } = Promise.withResolvers();
let stream = new ReadableStream({
async pull(controller) {
controller.enqueue("123");
await promise;
controller.enqueue("456");
controller.close();
},
});

let reader = stream.getReader();
expect((await reader.read()).value).toBe("123");

let read = reader.read();
reader.releaseLock();
expect(read).rejects.toThrow(
expect.objectContaining({
name: "AbortError",
code: "ERR_STREAM_RELEASE_LOCK",
}),
);
expect(reader.closed).rejects.toThrow(
expect.objectContaining({
name: "AbortError",
code: "ERR_STREAM_RELEASE_LOCK",
}),
);

resolve();

reader = stream.getReader();
expect((await reader.read()).value).toBe("456");
});

it("new Response(stream).arrayBuffer() (bytes)", async () => {
var queue = [Buffer.from("abdefgh")];
var stream = new ReadableStream({
Expand Down Expand Up @@ -1055,3 +1104,42 @@ it("fs.createReadStream(filename) should be able to break inside async loop", as
expect(true).toBe(true);
}
});

it("pipeTo doesn't cause unhandled rejections on readable errors", async () => {
// https://github.com/WebKit/WebKit/blob/3a75b5d2de94aa396a99b454ac47f3be9e0dc726/LayoutTests/streams/pipeTo-unhandled-promise.html
let unhandledRejectionCaught = false;

const catchUnhandledRejection = () => {
unhandledRejectionCaught = true;
};
process.on("unhandledRejection", catchUnhandledRejection);

const writable = new WritableStream();
const readable = new ReadableStream({ start: c => c.error("error") });
readable.pipeTo(writable).catch(() => {});

await Bun.sleep(15);

process.off("unhandledRejection", catchUnhandledRejection);

expect(unhandledRejectionCaught).toBe(false);
});

it("pipeThrough doesn't cause unhandled rejections on readable errors", async () => {
let unhandledRejectionCaught = false;

const catchUnhandledRejection = () => {
unhandledRejectionCaught = true;
};
process.on("unhandledRejection", catchUnhandledRejection);

const readable = new ReadableStream({ start: c => c.error("error") });
const ts = new TransformStream();
readable.pipeThrough(ts);

await Bun.sleep(15);

process.off("unhandledRejection", catchUnhandledRejection);

expect(unhandledRejectionCaught).toBe(false);
});
Loading