Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stream] uncatchable error thrown during piping if source and dest don't have same objectMode setting #54945

Open
joelrbrandt opened this issue Sep 14, 2024 · 12 comments
Labels
confirmed-bug Issues with confirmed bugs. stream Issues and PRs related to the stream subsystem.

Comments

@joelrbrandt
Copy link
Contributor

Version

22.8.0

Platform

Darwin [hostname] 23.6.0 Darwin Kernel Version 23.6.0: Mon Jul 29 21:14:46 PDT 2024; root:xnu-10063.141.2~1/RELEASE_ARM64_T6031 arm64

Subsystem

stream

What steps will reproduce the bug?

Run the following code:

import { Readable, Transform } from "node:stream";

async function main() {
  const objectReadable = Readable.from([
    { hello: "world" },
    { goodbye: "world" }
  ]);

  objectReadable.on("error", err => {
    console.log("objectReadable error", err);
  });

  const passThrough = new Transform({
    transform(chunk, _encoding, cb) {
      this.push(chunk);
      cb(null);
    },
    objectMode: false // code works if set to true
  });

  passThrough.on("error", err => {
    console.log("passThrough error", err);
  });

  try {
    console.assert(objectReadable.readableObjectMode, "objectReadable is not in object mode");
    console.assert(!passThrough.writableObjectMode, "passThrough is not in byte mode write side");

    console.log("beginning pipe");
    objectReadable.pipe(passThrough);
  } catch (e) {
    console.error("caught error when calling pipe", e);
    return;
  }

  try {
    console.log("beginning consume of passThrough");
    const output = [];
    for await (const v of passThrough) {
      output.push(v);
    }
    console.log("output", output);
  } catch (e) {
    console.error("caught error while consuming output", e);
    return;
  }

  console.log("done");
}

process.setUncaughtExceptionCaptureCallback(err => {
  console.error("uncaught exception", err);
});

main();

How often does it reproduce? Is there a required condition?

Always

What is the expected behavior? Why is that the expected behavior?

Probably Readable.pipe() should compare the objectMode state of the source and the write side of the destination, and if they don't match, it should synchronously throw.

So, in the code above, I would expect to catch an error on the line that reads console.error("caught error when calling pipe", e);

Alternatively, the writable (or transform) stream could emit an error. In that case, in the code above, I would expect to see an error logged by the passThrough.on("error") callback.

What do you see instead?

There is an uncaught exception. It is caught by the callback to process.setUncaughtExceptionCaptureCallback. That exception is:

TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be of type string or an instance of Buffer, TypedArray, or DataView. Received an instance of Object
    at _write (node:internal/streams/writable:480:13)
    at Writable.write (node:internal/streams/writable:508:10)
    at Readable.ondata (node:internal/streams/readable:1007:22)
    at Readable.emit (node:events:520:28)
    at Readable.read (node:internal/streams/readable:780:10)
    at flow (node:internal/streams/readable:1281:53)
    at emitReadable_ (node:internal/streams/readable:845:3)
    at process.processTicksAndRejections (node:internal/process/task_queues:89:21) {
  code: 'ERR_INVALID_ARG_TYPE'
}

I don't believe there is any way to catch this exception in the code that has the streams in context. (But if there is, then that'd be swell! And there's no issue here! And in that case, apologies for not understanding the right part of the streams API to handle these errors correctly.)

Additional information

Broadly, the stream module is great ❤️ ! I love building stuff with it. Thanks for your hard work.

@joelrbrandt joelrbrandt changed the title [stream] uncatchable error thrown in piping if source and dest don't have same objectMode setting [stream] uncatchable error thrown during piping if source and dest don't have same objectMode setting Sep 14, 2024
@RedYetiDev RedYetiDev added the stream Issues and PRs related to the stream subsystem. label Sep 14, 2024
@RedYetiDev
Copy link
Member

RedYetiDev commented Sep 14, 2024

For reference, a smaller reproduction is:

import { Readable, Transform } from 'node:stream';

Readable.from([{}]).pipe(new Transform({
    transform(chunk, _, cb) {
        this.push(chunk);
    }
}));

AFAICT this is working as intended. Per the docs, if object mode is false, "streams created by Node.js APIs operate exclusively on strings, <Buffer>, <TypedArray> and <DataView> objects".

CC @nodejs/streams

@joelrbrandt
Copy link
Contributor Author

@RedYetiDev Thanks for taking a look!

AFAICT this is working as intended.

The part that I feel is not working as intended is that there is no way to catch the error that is produced when the pipe attempts to start flowing.

It should be possible (and in my opinion, would be preferrable) to throw synchronously in the Readable.pipe() call when the modes don't match between source and destination.

Individual streams cannot change their object mode once constructed. From the documentation on object mode:

Stream instances are switched into object mode using the objectMode option when the stream is created. Attempting to switch an existing stream into object mode is not safe.

So, it should be sound to throw in Readable.pipe(). (I.e., there shouldn't be a case where the check would fail in Readable.pipe() but things would have actually worked out right when data eventually started flowing.)

The reason I provided a longer repro step was to demonstrate different places in the control flow where an error could be caught.

@ronag
Copy link
Member

ronag commented Sep 14, 2024

I agree we are missing a try/catch in pipe.

@ronag ronag added the confirmed-bug Issues with confirmed bugs. label Sep 14, 2024
@joelrbrandt
Copy link
Contributor Author

@ronag

I agree we are missing a try/catch in pipe.

Thanks for taking a look!

I assume you mean "we're missing a check and a throw in Readable.pipe()"? There's no exception propagating through .pipe, so a catch won't address the issue. The exception happens when the pipe is flowing and the Readable attempts to write to the Writable. That happens here, and there's no way to catch that exception (that I am aware of).

Anyway, if folks feel this is indeed a bug, and that the right fix is to add a check and throw in Readable.prototype.pipe(), I'm happy to put up a PR by EOD tomorrow. This'll be my first time contributing to node, though, so I might need some guidance. 😄

@ronag
Copy link
Member

ronag commented Sep 14, 2024

  function ondata(chunk) {
    debug('ondata');
    try {
      const ret = dest.write(chunk);
      debug('dest.write', ret);
      if (ret === false) {
        pause();
      }
    } catch (err) {
      dest.destroy(err);
    }
  }

@joelrbrandt
Copy link
Contributor Author

@ronag got it. Thanks.

Should there also be a check and throw at Readable.prototype.pipe()?

I'll add some tests and put up a PR.

@lpinca
Copy link
Member

lpinca commented Sep 15, 2024

I think it will add a noticeable overhead if that is done for every data chunk. Also, I think this is an unrecoverable programmer error so a crash is expected. Can't we just add a check for the source and destination streams in Readable.prototype.pipe()? I think this is also not necessary, but at least it is less invasive.

I mean something like this:

diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js
index 17f8e53ad5..75dc350b59 100644
--- a/lib/internal/streams/readable.js
+++ b/lib/internal/streams/readable.js
@@ -910,6 +910,17 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
   const src = this;
   const state = this._readableState;
 
+  if (
+    (state[kState] & kObjectMode &&
+      !dest._writableState[kState] & kObjectMode) ||
+    ((state[kState] & kObjectMode === 0) &&
+      dest._writableState[kState] & kObjectMode)
+  ) {
+    throw new Error(
+      'The piped streams do not have the same objectMode stetting'
+    );
+  }
+
   if (state.pipes.length === 1) {
     if ((state[kState] & kMultiAwaitDrain) === 0) {
       state[kState] |= kMultiAwaitDrain;

@ronag
Copy link
Member

ronag commented Sep 15, 2024

Just note that you can pipe non object mode into object mode but not the other way around.

@joelrbrandt
Copy link
Contributor Author

@lpinca @ronag in my incredibly limited (only one machine / architecture) and not statistically valid (only run a few times, manually) benchmarking, I don't see a performance difference due to the addition of the try/catch in ondata. (See some manual runs below.)

Assuming there isn't performance overhead, is there a preferred approach (try/catch in onData vs check in .pipe)?

Happy to put up a PR for either.

Also, are these instructions still current for running benchmarks on CI? (I see the repo they are in is archived.) Also, will I have permissions to trigger such a run on CI? Or does a maintainer need to do that?

manual runs of benchmark

Note: I built node with ./configure --node-builtin-modules-path "$(pwd)". Then, I added a console.log("[local build]...") to Readable.prototype.pipe to ensure I was actually running the modified code in the benchmark.

local build, unmodified ondata

[~/devel/node] (main) $ ./node --version
v23.0.0-pre
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js                               
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 29,249,159.338211797
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 29,197,094.442971323
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 29,029,514.30719613
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 28,273,879.594224814
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 29,225,075.268889103
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 27,992,464.42857583
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 28,337,883.925193656
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with unmodified ondata
streams/pipe.js n=5000000: 29,188,259.898103785

local build, modified ondata with try/catch

[~/devel/node] (main) $ ./node --version
v23.0.0-pre
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with modified ondata
streams/pipe.js n=5000000: 28,868,575.624709625
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with modified ondata
streams/pipe.js n=5000000: 29,417,366.983859424
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with modified ondata
streams/pipe.js n=5000000: 28,880,720.458452556
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with modified ondata
streams/pipe.js n=5000000: 28,618,137.460068755
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with modified ondata
streams/pipe.js n=5000000: 28,907,380.567654
[~/devel/node] (main) $ ./node benchmark/streams/pipe.js
[local build] .pipe with modified ondata
streams/pipe.js n=5000000: 29,311,473.430494733

v22.8.0 installed through nvm

[~/devel/node] (main) $ node --version
v22.8.0
[~/devel/node] (main) $ node benchmark/streams/pipe.js  
streams/pipe.js n=5000000: 28,344,122.315635882
[~/devel/node] (main) $ node benchmark/streams/pipe.js
streams/pipe.js n=5000000: 28,975,323.889792357
[~/devel/node] (main) $ node benchmark/streams/pipe.js
streams/pipe.js n=5000000: 28,917,007.4656484
[~/devel/node] (main) $ node benchmark/streams/pipe.js
streams/pipe.js n=5000000: 28,974,708.162682924
[~/devel/node] (main) $ node benchmark/streams/pipe.js
streams/pipe.js n=5000000: 28,985,794.415783517
[~/devel/node] (main) $ node benchmark/streams/pipe.js
streams/pipe.js n=5000000: 28,888,451.761676144

@ronag
Copy link
Member

ronag commented Sep 15, 2024

If there is no performance overhead I would argue that both suggestions should be applied.

@lpinca
Copy link
Member

lpinca commented Sep 15, 2024

With the check in place, under what circumstances might writable.write() throw an error?

Also, are these instructions still current for running benchmarks on CI? (I see the repo they are in is archived.) Also, will I have permissions to trigger such a run on CI? Or does a maintainer need to do that?

See https://github.com/nodejs/node/blob/main/doc/contributing/writing-and-running-benchmarks.md. A collaborator needs to start the benchmark CI.

@ronag
Copy link
Member

ronag commented Sep 15, 2024

With the check in place, under what circumstances might writable.write() throw an error?

"Streamlike" objects.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
confirmed-bug Issues with confirmed bugs. stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

4 participants