Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: cleanup KStream JavaDocs (5/N) - stream-globalTable-inner-join #18747

Merged
merged 3 commits into from
Feb 7, 2025

Conversation

mjsax
Copy link
Member

@mjsax mjsax commented Jan 30, 2025

No description provided.

@@ -1043,33 +1043,96 @@ public <VO, VR> KStream<K, VR> leftJoin(final KTable<K, VO> table,
}
}

@SuppressWarnings("unchecked")
private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just move this method from further below up here, where it belongs.

* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* and thus no output record will be added to the resulting {@code KStream}.
* The key of the result record is the same as the stream record's key.
* If you need read access to the {@code KStream} key, use {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just describe what the code currently does. But it seems it's not correct. We should not pass in the KStream record key, but the join key (ie, whatever keySelector returns).

The original KIP says (https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner)

it seems like extending the interface to pass the join key along as well would be helpful

The question now is, can we change this directly? Even if it would be a bug-fix, is could be considered a "breaking change" (should we try to sneak a fix into 4.0 which would allows us to ship a breaking change)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit border-line, since it's just the motivation section that's promising something different. Have you checked the discussion if this was done on purpose without updating the motivation section?

Sneaking a quick fix into 4.0 seems a bit sneaky for this one

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did skim over the discuss thread and could not find anything. And that's not really surprising. The main discussion focused on more fundamental question, and the KStreamKTableJoinProcessor is shared between both stream-table and stream-globalTable join -- and it does the right thing for stream-table case. So I am very confident it was just an oversight, and nobody realized that both case should be handled differently.

Sneaking a quick fix into 4.0 seems a bit sneaky for this one

That's more than fair. File a Jira for it: https://issues.apache.org/jira/browse/KAFKA-18731. Nobody did complain so far, so it seems not to be a big issue. For this PR, we can just document what to code does right now and move forward.

@mjsax mjsax force-pushed the minor-update-kstreams-javadocs-5 branch from 103e097 to 4e504ef Compare January 31, 2025 04:18
@mjsax
Copy link
Member Author

mjsax commented Feb 1, 2025

Java 17 passed.

Jave 23:

Found 1 test failures:
FAILED ❌ SmokeTestDriverIntegrationTest > "shouldWorkWithRebalance(boolean, boolean).stateUpdaterEnabled=true, processingThreadsEnabled=true"

@mjsax
Copy link
Member Author

mjsax commented Feb 1, 2025

Looking into test failure history (especially for PRs), SmokeTestDriverIntegrationTest has a high failure rate with processingThreadsEnabled=true and 100% pass rate w/o processing threads.

-> #18773

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @mjsax ! I left some comments.

* If a {@code KStream} input value is {@code null} the record will not be included in the join operation
* and thus no output record will be added to the resulting {@code KStream}.
* The key of the result record is the same as the stream record's key.
* If you need read access to the {@code KStream} key, use {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit border-line, since it's just the motivation section that's promising something different. Have you checked the discussion if this was done on purpose without updating the motivation section?

Sneaking a quick fix into 4.0 seems a bit sneaky for this one

*
* In contrast to {@link #join(KTable, ValueJoiner)}, there is no co-partitioning requirement between this
* {@code KStream} and the {@link GlobalKTable}.
* Also note, that the {@link GlobalKTable} is updated "asynchronously", and thus this operation is inherently
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if you can make this more precise (I cannot, btw). Essentially, there are no guarantees on the ordering of the join w.r.t. stream time, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there is not really a concept of "stream time" for the global store to begin with. The global store runs in it's own thread, we do bootstrap it at the beginning, and also read from all partitions (w/o any time ordering guarantees between partitions), and update the store on it's own "pace", totally independent from StreamThread for which we do "time-ordered processing" between partitions etc.

Thus, re-processing the same input data, can give a quite different result, as the global-state is updated "randomly" and it's more or less impossible to force deterministic re-processing. Guess the only way it's determinists is, if the global store data does not change at all while processing the stream side.

Of course, we can say all this in the JavaDocs, but I am wondering if it becomes too much (ie, explaining the KS architecture)? To me, that's something for the actual docs, not JavaDocs.

Happy to add more color if you have a good proposal. I tried to give some "hint", staying a little vague on purpose to avoid the need to add a one-pager to the JavaDocs.

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, maybe just expand on it a tiny bit. But I think you are right, maybe that is best explained in the actual doc. I wasn't aware that there is a section on this in the documentation.

Maybe I was just triggered by the quotation marks. You could consider something like this: Also note that there are no ordering guarantees between the updates on the left and the right side of this join, since updates to the {@link GlobalKTable} are in no way synchronized to this join. Therefore, the result of the join is inherently non-deterministic.

But if you don't like the proposal, feel free to just keep it as is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't aware that there is a section on this in the documentation.

To be fair, not sure to what extend we actually cover this in the docs. But if we want to document it better, let's do it in the docs, not JavaDocs.

Maybe I was just triggered by the quotation marks.

Yeah. In the end, it's hard to describe and I was trying to use single word, but asynchronously does not fully fit (it's much more nuanced) so I used quotation marks to indicate it's not a totally accurate description.

I like your suggestions.

@mjsax mjsax force-pushed the minor-update-kstreams-javadocs-5 branch from 4e504ef to e1b274a Compare February 5, 2025 05:06
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, check my comment on the discussion but no blocking issue for me.

@mjsax
Copy link
Member Author

mjsax commented Feb 7, 2025

Java 17:

Found 2 flaky test failures:
FLAKY ⚠️  ReplicationQuotasTest > initializationError
FLAKY ⚠️  RequestQuotaTest > initializationError

Java 23:

Found 2 flaky test failures:
FLAKY ⚠️  ReplicationQuotasTest > initializationError
FLAKY ⚠️  RequestQuotaTest > initializationError

Only flaky tests. Merging.

@mjsax mjsax merged commit 326ee36 into apache:trunk Feb 7, 2025
7 of 9 checks passed
@mjsax mjsax deleted the minor-update-kstreams-javadocs-5 branch February 7, 2025 21:44
pdruley pushed a commit to pdruley/kafka that referenced this pull request Feb 12, 2025
manoj-mathivanan pushed a commit to manoj-mathivanan/kafka that referenced this pull request Feb 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants