Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37521][runtime] Introduce KeyedCoProcessOperator with async state API #26328

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

fredia
Copy link
Contributor

@fredia fredia commented Mar 20, 2025

What is the purpose of the change

This PR introduces AsyncKeyedCoProcessOperator

Brief change log

  • Add AsyncKeyedCoProcessOperator
  • Add AsyncKeyedCoProcessOperatorWithWatermarkDelay
  • Add DeclaringAsyncKeyedCoProcessFunction
  • Move AsyncIntervalJoinOperator to o.a.f.runtime.asyncprocessing.operators.co package.

Verifying this change

  • Add AsyncKeyedCoProcessOperatorTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 20, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

import org.apache.flink.util.function.ThrowingConsumer;

/**
* A function that processes elements of two keyed streams and produces a single output one.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: one. -> stream.

* <p>The function will be called for every element in the input streams and can produce zero or
* more output elements. Contrary to the {@link CoFlatMapFunction}, this function can also query the
* time (both event and processing) and set timers, through the provided {@link Context}. When
* reacting to the firing of set timers the function can emit yet more elements.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. in the previous sentence I assume that this function can set timers. In this sentence set timers is not so clear. Maybe "When reacting to the firing of timers," or "When reacting to the firing of timers that have been set,"

* time (both event and processing) and set timers, through the provided {@link Context}. When
* reacting to the firing of set timers the function can emit yet more elements.
*
* <p>An example use-case for connected streams would be the application of a set of rules that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:
use-case -> use case
would be -> is

* change over time ({@code stream A}) to the elements contained in another stream (stream {@code
* B}). The rules contained in {@code stream A} can be stored in the state and wait for new elements
* to arrive on {@code stream B}. Upon reception of a new element on {@code stream B}, the function
* can now apply the previously stored rules to the element and directly emit a result, and/or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:
remove the word now
remove the word directly

* @throws Exception The function may throw exceptions which cause the streaming program to fail
* and go into recovery.
*/
public void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: processElement1 -> processInputStream1Element or the like

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't rename it, because processElement1 is inherited from KeyedCoProcessFunction. I added override here.

* and go into recovery.
*/
public void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception {
throw new IllegalAccessException("This method is replaced by declareProcess1.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this exception message mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is inherited from KeyedCoProcessFunction, but we want to use declareProcess1 to replace it.
This method should not be called as expected, so an exception thrown to prevent misuse.

* <p>This function can output zero or more elements using the {@link Collector} parameter and
* also update internal state or set timers using the {@link Context} parameter.
*
* @param value The stream element
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this parameter called value? Could it be something more specific like streamElement

Copy link
Contributor Author

@fredia fredia Mar 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the question above, this method is inherited from KeyedCoProcessFunction, the naming is consistent with KeyedCoProcessFunction. I prefer keeping it.

* also update internal state or set timers using the {@link Context} parameter.
*
* @param value The stream element
* @param ctx A {@link Context} that allows querying the timestamp of the element, querying the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe a list would be easier to read in the javadoc. A {@link Context} that allows querying of

    ...

When you say querying the time is this event or processing?

* timers and querying the time. The context is only valid during the invocation of this
* method, do not store it.
* @param out The collector to emit resulting elements to
* @throws Exception The function may throw exceptions which cause the streaming program to fail
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it throw a retriable exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a retriable exception?

* @throws Exception The function may throw exceptions which cause the streaming program to fail
* and go into recovery.
*/
public void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception {
Copy link
Contributor

@davidradl davidradl Mar 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should label all the method as @public or @experimental or the like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary because it is inherited from KeyedCoProcessFunction . I add override here.

throws DeclarationException;

/**
* Declare a procedure which is called when a timer set using {@link TimerService} fires.
Copy link
Contributor

@davidradl davidradl Mar 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the words say it is declaring a procedure but the output parameter says it is a processor. I would expect these words to be the same. The method name should align with what we are declaring. maybe declareStream2Processor

* of this method, do not store it.
* @param out The processor for processing timestamps.
*/
public ThrowingConsumer<Long, Exception> declareOnTimer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: declareOnTimer -> declareOnTimerProcessor

private static final long serialVersionUID = 1L;

// Shared timestamp variable for collector, context and onTimerContext.
private transient DeclaredVariable<Long> sharedTimestamp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we mean by the sharedTimestamp? Is this event time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used to store one Timestamp. It can be event time or processing time.

collector = new TimestampedCollectorWithDeclaredVariable<>(output, sharedTimestamp);

InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe change to a constant and defined in the same place as other named InternalTimerServices. I am not sure what does user mean this context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming "user-timers" in AsyncKeyedCoProcessOperator is consistent with KeyedCoProcessOperator#internalTimerService. It represents a timer-service created by operator.

if (userFunction instanceof DeclaringAsyncKeyedCoProcessFunction) {
DeclaringAsyncKeyedCoProcessFunction declaringFunction =
(DeclaringAsyncKeyedCoProcessFunction) userFunction;
declaringFunction.declareVariables(declarationContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious, is there a description of what these declarations are and why we are declaring things for async?
I assume we are describing the variables, functions and timers that will be realised when they run asynchronously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import java.util.function.Consumer;

/** A {@link KeyedCoProcessOperator} that supports holding back watermarks with a static delay. */
public class AsyncKeyedCoProcessOperatorWithWatermarkDelay<K, IN1, IN2, OUT>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cant see this class called anywhere. Even from test code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, It will be called at StreamExecIntervalJoin in the future after we implement async version rowJoinFunc.

@fredia fredia force-pushed the twoinds branch 2 times, most recently from 3640756 to 7ddeb5d Compare March 31, 2025 06:15
@@ -387,8 +387,11 @@ public Watermark preProcessWatermark(Watermark watermark) throws Exception {
* perform async state here. Only some synchronous logic is suggested.
*
* @param watermark the advanced watermark.
* @return the watermark that should be emitted to downstream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should allow returning null if there is no need emitting any watermark downstream.

And please update the javadoc of this method explaining this.

rightCache.asyncRemove(key);
}
});
return emitResult.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We better not blocking get() here. It will block the task thread as well as those callbacks for async state requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍I changed the return value to StateFuture<Boolean>.

@fredia fredia force-pushed the twoinds branch 2 times, most recently from 3351542 to 1c0dca7 Compare April 1, 2025 06:55
Comment on lines 123 to 124
joinFunction.setJoinKey(ctx.getCurrentKey());
joinCollector.setInnerCollector(out);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the internal state of joinFunction and joinCollector should be maintained by DeclaredVariable since it belongs to the context for async processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Good suggestion, I rewrote AsyncTimeIntervalJoin with declareChain.

throw new IllegalAccessException("This method is replaced by declareProcess1.");
}

/** Override this method or use {@link #declareProcess2} instead. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we dont recommend to use this method. How about rewrite this javadoc?

Suggested change
/** Override this method or use {@link #declareProcess2} instead. */
/** Override and finalize this method. Please use {@link #declareProcess2} instead. */

*/
public void postProcessWatermark(Watermark watermark) throws Exception {}
public Watermark postProcessWatermark(Watermark watermark) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change the javadoc of this method to :

A hook that will be invoked after finishing advancing the watermark and right before the watermark being emitting downstream. Here is a chance for customization of the emitting watermark. ....(and following description)

@@ -417,12 +419,12 @@ public <X> void output(OutputTag<X> outputTag, X value) {
}

@VisibleForTesting
MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>> getLeftBuffer() {
public MapState<Long, List<IntervalJoinOperator.BufferEntry<T1>>> getLeftBuffer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unnecessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because AsyncIntervalJoinOperator is moved into asyncprocessing package, and AsyncIntervalJoinOperatorTest still needs these methods.

Copy link
Contributor

@Zakelly Zakelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update! LGTM. Would you please update the description of this PR and remove the unrelated change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants