Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17109: Move lock backoff retry to streams TaskManager #17209

Merged
merged 11 commits into from
Sep 30, 2024

Conversation

aliehsaeedii
Copy link
Contributor

This PR aims at resolving the issue made by #17116

@mumrah
Copy link
Contributor

mumrah commented Sep 16, 2024

@aliehsaeedii please update the PR title to have a description of the patch. Thanks!

@aliehsaeedii aliehsaeedii changed the title KAFKA-17109 KAFKA-17109: Move lock backoff retry to streams TaskManager Sep 16, 2024
Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @aliehsaeedii !

Here my feedback.

I am missing unit tests.

}

public boolean canAttempt(final long nowMs) {
return nowMs - lastAttemptMs >= EXPONENTIAL_BACKOFF.backoff(attempts);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
return nowMs - lastAttemptMs >= EXPONENTIAL_BACKOFF.backoff(attempts);
return nowMs - lastAttemptMs >= EXPONENTIAL_BACKOFF.backoff(attempts);

Comment on lines 1016 to 1017
stateUpdater.add(task);
taskIdToBackoffRecord.remove(task.id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor:
I would swap those two lines. Once the task is initialized, the backoff can be removed.

taskIdToBackoffRecord.remove(task.id());
} else {
log.trace("Task {} is still not allowed to retry acquiring the state directory lock", task.id());
handleUnsuccessfulLockAcquiring(task, nowMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct?
Every time initialization is attempted before the back-off, the time of the last attempt is updated to the current time. If we assume an attempt every poll interval and the poll interval is less than the back-off time, the task will never be initialized.
Assume the last unsuccessful attempt occurred at time 200 and now the current call to canTryLock() is 100ms later at time 300. Furthermore, assume the current back-off is 250. That is, canTryLock() should return false because 300 - 200 >= 250 is not true. The last attempt is updated to 300 and the backoff is exponentially updated with the increased number of attempt (let's say 500). If you try again in 100ms at 400 canTryLock() will again return false, because 400 - 300 >= 500 is still not true and it will also not be true next time. You should only update the back-off record if you actually have attempted to initialize the task and it was unsuccessful and not when you skipped the attempt due to the back-off.

public static class BackoffRecord {
private long attempts;
private long lastAttemptMs;
private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new ExponentialBackoff(1, 2, 10000, 0.5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the exponential back-off be specified in terms of poll time? Something like

new ExponentialBackoff(pollTime, 2, 10000, 0.5);

If it is to much trouble getting that config into the task manager, just choose something larger than 1ms. 1 ms sounds really small. The sequence of the back-offs would be 1ms, 2ms, 4ms, 8ms, 16ms, 32ms, 64, 128. At the same time, with default configs, the task initialization is attempted every 100ms. So, it seems there will not be much improvement to the current situation because the first 7 poll iterations you attempt to initialize the task.

@aliehsaeedii
Copy link
Contributor Author

aliehsaeedii commented Sep 16, 2024

Thanks for the PR, @aliehsaeedii !

Here my feedback.

I am missing unit tests.

Thanks @cadonna. Utest is added + review is addressed

@@ -2116,4 +2132,37 @@ boolean needsInitializationOrRestoration() {
void addTask(final Task task) {
tasks.addTask(task);
}

private boolean canTryLock(final TaskId taskId, final long nowMs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I forgot to add this comment before in my review. Could you please rename this method to canTryInitializeTask()? I think that makes more sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna makes sense!

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates!

Here my comments.

@@ -1006,14 +1014,22 @@ private void addTasksToStateUpdater() {
}

private void addTaskToStateUpdater(final Task task) {
final long nowMs = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, you need to use

Suggested change
final long nowMs = System.currentTimeMillis();
final long nowMs = time.milliseconds();

We inject the time object at creation, so that we can control time for example in tests.

Comment on lines 156 to 159
/* For testing */
void setTaskIdToBackoffRecord(final Map<TaskId, BackoffRecord> taskIdToBackoffRecord) {
this.taskIdToBackoffRecord = taskIdToBackoffRecord;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think you need this method if you can control time as I describe on line 1017.

.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.drainPendingTasksToInit()).thenReturn(mkSet(task00, task01));
final TaskManager.BackoffRecord backoffRecord = mock(TaskManager.BackoffRecord.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do not need this mock. You can advance time with the time object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Advancing time alone wont help since the backoff record corresponding to task00 is not existing in the map.

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aliehsaeedii !

The test looks great now! That is how I envisioned the test!

I had some minor formatting comments. Sorry for all the comments, but since this is a rather complicated test, I think it makes sense structure it well to make it better readable.

@@ -1243,6 +1245,50 @@ public void shouldRetryInitializationWhenLockExceptionInStateUpdater() {
verify(stateUpdater).add(task01);
}

@Test
public void shouldRetryInitializationWhenCanNotInitializeTask() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is very good!
I just have some formatting comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please rename the test to shouldRetryInitializationWithBackoffWhenInitializationFails?

Comment on lines 1251 to 1252
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use 4 spaces and not 8 for indentation.

Suggested change
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();
.withInputPartitions(taskId00Partitions)
.inState(State.RESTORING).build();

Comment on lines 1254 to 1255
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the indentation also here.

verify(task00).initializeIfNeeded();
verify(task01).initializeIfNeeded();
verify(tasks).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the indentation.

// initializeIfNeeded() has NOT been called this time
verify(task00, Mockito.times(1)).initializeIfNeeded();
verify(tasks, Mockito.times(2)).addPendingTasksToInit(
argThat(tasksToInit -> tasksToInit.contains(task00))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the indentation


taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

verify(task00).initializeIfNeeded();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an inline comment here stating:

// task00 should not be initialized due to LockException, task01 should be initialized


taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);

// initializeIfNeeded() has NOT been called this time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This inline comment is not really clear. Could you please change it to something like:

// task00 should not be initialized since the backoff period has not passed.

verify(stateUpdater, never()).add(task00);
verify(stateUpdater).add(task01);

taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a time.sleep(5000) before this call, please?
Please add a new line between time.sleep() and checkStateUpdater().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the 2nd try 5000 does not work but anything less than 1000 is good!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, you are right! My bad!

verify(stateUpdater, never()).add(task00);

time.sleep(10000);
// do not throw lock exception this time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change this comment to something like:

// task00 should call initialize since the backoff period has passed

);
verify(stateUpdater, never()).add(task00);

time.sleep(10000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a new line after this line to highlight that time passed between the two initialization attempts.

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aliehsaeedii Thanks for the updates!

LGTM!

@cadonna
Copy link
Contributor

cadonna commented Sep 26, 2024

@aliehsaeedii the following test fails consistently with a NPE with this PR:
StreamThreadTest.shouldRecordCommitLatency().
See https://github.com/apache/kafka/actions/runs/11030675352?pr=17209
Could you have a look?

@mumrah
Copy link
Contributor

mumrah commented Sep 29, 2024

@aliehsaeedii can you merge in latest trunk to pick up the fix for the failing FeatureCommandTest?

@cadonna cadonna merged commit bb11257 into apache:trunk Sep 30, 2024
9 checks passed
@mumrah
Copy link
Contributor

mumrah commented Sep 30, 2024

@cadonna I was a little puzzled by the FeatureCommandTest failure, so I dug into this a bit. A few things I learned:

  1. By default, the checkout action will checkout the PRs merge commit rather than it's head commit (similar to what Jenkins did)
  2. Re-running a workflow does not "pull in" new changes from trunk for the merge commit.

The commit which fixed the test failure on trunk was

commit cd4d6ce9d576b170c10f81f3081529885abb933c
Author: PoAn Yang <[email protected]>
Date:   Thu Sep 26 23:24:09 2024 +0800

    MINOR: fix failed cases in FeatureCommandTest (#17287)
    
    Reviewers: David Arthur <[email protected]>

Which was Thu Sep 26 15:24:09 2024 UTC.

The first workflow run for this PR (prior to merging in trunk) was at 2024-09-26T11:45:55Z, so a few hours before the fix was committed. The merge commit was up to bd94a73 at that point (logs). The subsequent re-runs of the workflow did not advance the merge commit since a re-run of a workflow is meant to be deterministic.

@cadonna
Copy link
Contributor

cadonna commented Oct 1, 2024

@mumrah Thanks for the explanation!

@lucasbru
Copy link
Member

lucasbru commented Nov 5, 2024

@cadonna @aliehsaeedii Should we not port back these fixes to 3.9, 3.8 ?

@cadonna
Copy link
Contributor

cadonna commented Nov 6, 2024

Yeah, that is a good idea! As far as I understand, the change seems to be well tested, right? @aliehsaeedii

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants