Skip to content

Remove requested state in Task Framework

xyuanlu edited this page Sep 30, 2022 · 5 revisions

Introduction

Helix Task Framework is a widely used feature in Helix. This document explains how CurrentState ZNode update in task framework can be simplified to reduce Zookeeper IO load. It is achieved by removing requested state update by participants so that some redundant messages and CurrentState ZNode write requests can be reduced.

Background

Some of task framework's customer has large clusters with thousands of job scheduled at the same time. As workload increases, Zookeeper IO could be a performance bottleneck. Optimize Task framework pipeline, especially optimize redundant IO is a key part in performance and scalability improvement. This design is the first step for simplification TaskFramework.

Problem Statement

In the current design, although Task's CurrentState update requests are sent be participant, they are all instructed by controller's message. In TaskFramework, all task's current state update is instructed by messages sent by controller. When task finish execution or failed with error, participant will set requested_state field in CurrentState ZNode requesting a state transition message. The following graph illustrates how state transition is done when task finish executing. It shows all ZK writes including messages and CurrentState ZNode update. Since the task thread is actually running in participant side, participant should update task's CurrentState directly. There are numbers of redundant ZNode updates to be optimized.

Apart from the redundant IO, when task is actually finished running on participant's task runner thread, one more round of pipeline run → sending message → state transition → ZK update is needed before the actual CurrentState is Completed or TaskError. It will introduce some ambiguous for users when task is actually not running on task running thread but CurrentState is Running.

This document will focus on the state transitions below the dotted line after T0.  

Remove requested state (3)

Proposed Design

Controller side change:

Controller gets CurrentState ZNode change callbacks and start a pipeline. Originally when requested state in not empty, controller sends a state transition message instructing participant to update ZNode. Controller will get another callback for the actual CurrentState change and send message for the next step state transition. Removing requested state will remove one round of pipeline invocation on controller side. When the CurrentState is updated to next state, pipeline is triggered and run with all existing logic. No controller side change is needed.

Participant side change:

The benefit of having requested state in current design is to have one central controller initiating all CurrentState update and one thread requesting ZK write for one partition. Originally Task's CurrentState update requests are sent be participant StateTransitionHandler.postHandlingMessage() and they are all instructed by controller's message. 

When there are two thread doing ZK write requests or two threads scheduling ZK request by sending messages, they need to coordinate to prevent overwrite. We need to design carefully when handling cases like task finishes (w/o error) and wants to request state change while there is pending or on going state transition. 

Preferable Design  –  Participant Update CurrentState directly instead of updating requested state

The basic idea is for participant node to update CurrentState in ZNode directly without waiting for state transition message from controller. Ideally, the behavior should be as follows:

Pros: 

Minimum IO load to Zookeeper. 2 CurrentState Updates and 1 message is reduced. Also removes the duplicated StateTransition cycle to handle the CurrentState change. More efficient time wise.  

Cons:

Need to handle write conflict since 2 thread may update CurrentState at the same time. More detail can be found in the next section – update conflict.  

Update conflict

In the origin design, all CurrentState updates are scheduled by controller. When task finishes executing or ends with error state, task runner thread uses Requested thread to communicate with controller and controller will only send one state transition message at a time.

If Task runner thread update CurrentState when task finishes or ends with error state, it is possible that the state transition handler is also trying to update CurrentState. It could be an INIT→ RUNNING, RUNNING→CANCEL, or any other state transition message message.  

We may run into the following write conflict.  Screen Shot 2021-03-17 at 6 28 37 PM

We can prevent over write by checking the in memory state _stateModel and update ZK conditionally in message handling. Also the in memory state _stateModel update, comparing  and ZK update need to be protected by a lock. The following 2 graph illustrate how locking and comparing can protect this writ after read situation. We could modify the current synchronization on _state in stateTransitionHandler or introduce a new lock.

On thing to note in current message handling design, 'HelixTaskExecutor' (a message lister that creates and schedules all message handler) guarantees that only one on going state transition is at a time. In StateTransitionHandler, the whole message processing also synchronize on the _state object. Because one state transition per each partition/task at a time is already when creating StateTransitionHandler in HelixTaskExecutor, reducing the scope for the synchronization shouldn't cause any problem. We could reuse this lock to guard _state update in different threads.

Remove requested state (1) Remove requested state (2)

Alternative Design  –  Participant send a message (or mimic a dummy message) to itself setting to next stage

When task finishes in task runner thread, it sends a state transition message to itself.  Screen Shot 2021-03-11 at 4 42 58 PM

Participant send message to it self

Pros:

This design did not change a lot existing logic of current design. All CurrentState updates are requested by participant state transition handing process. Conflicts (Task finishes while handling a StateTransition) are handles by current logic. Current code garnets only one message is handled at a tome, thus only one ST happens at a time. 

Cons:

Current design only allows one message at a time per partition/task. If there are existing pending message for this participant, this new message will be dropped and won't be sent again by task runner thread. In order to solve this, we need to differentiate messages sent by controller or task runner and always drop controller sent message since controller can recalculate and resend message. This design still have one extra message sent.  It also requires an extra round of state transition handling.

Participant mimic sending send message to it self by calling HelixTaskExecutor.scheduleTask() with a dummy message object.

Pros:

Compared to the above approach, saved one message generation. Also reuses existing StateTransition logic to handle concurrent CurrentState update requests.

Cons:

HelixTaskExecutor maintains a in memory messageMap for all the ongoing messages. Same as the previous approach, if partition/task already has one ongoing or pending messages, the dummy message will be dropped. Complex code need to be added to work around with this. 

Dependencies and Backward compatibility

The code change is all on participant side. We should still keeps the logic that handles requested state on controller side before all participants are on the newer version. If some participants in a cluster is on a newer version with this same while others do not, controller should be able to schedule and get callbacks for all tasks throughout task's lifecycle. 

User's won't find any changes on Helix controller behavior. He or she may notice some CurrentState ZNodes do not has Requested_State in map filed anymore but it won't change any task state update logic. Also the state in PropertyStore/TaskRebalancer won't have any difference.

Performance, Scalability, Provisioning and Risk

We already see some Task Framework clusters have intensive ZK write requests results in increased pending requests queue size. We expect to see Zookeeper IO, especially write IO load decrease after the change is deployed to participants in Task Framework clusters. This change could relief the current ZK IO stress and make the Helix service more scalable.

Related PR

Clone this wiki locally