Skip to content

WAGED Rebalance Pipeline Redesign

Neal Sun edited this page Oct 10, 2022 · 1 revision

Introduction

After customers onboard WAGED rebalancer, it has become apparent that large clusters cause performance issues for the current WAGED implementation. As outlined in recent investigations, large clusters with 25k+ partitions suffer from slow controller pipelines due to WAGED calculation, and the performance must be improved before wider adoption. 

Background

There are currently several places where WAGED algorithm is executed:

Rebalance Type Purpose Scope Async? Produces
Global Rebalance Creates Baseline Assignment. Baseline is used as an anchor/reference for partial rebalance. Partitions based on cluster change, either all resources of the cluster or partial resources.  (The cluster model for calculation is based on previous Baseline assignment) Yes Baseline Assignment
Partial Rebalance Creates Best Possible Assignment. This assignment is the ideal placement that's persisted to the cluster. Partitions based on current states and difference with Baseline assignment.  (The cluster model for calculation is based on previous Best Possible assignment) No Best Possible Assignment
Rebalance Overwrite Only when nodes are down and delayed rebalance is enabled - creates an assignment that's based on immediately available nodes (excluding all downed nodes regardless of delayed rebalance settings). This assignment is combined with Best Possible Assignment, if Best Possible doesn't meet minActiveReplicas.  Partitions based on downed nodes. (The cluster model for calculation is based on previous Baseline assignment) No (An assignment of a temporary state, which is not persisted, but is the final product applied to IdealState)

During each pipeline iteration, the following steps happen during Partial Rebalance:

  1. Based on baseline divergence, find all replicas that need to have their placements recalculated (Candidate Replica);
  2. Sort Candidate Replica based on their sizes (largest replica are placed first);
  3. For each Candidate Replica, find the best fitting node and assign it. 

During each pipeline iteration, the following steps happen during Rebalance Overwrite:

  1. Find all replicas that are not on immediately available nodes (Candidate Replica);
  2. Sort Candidate Replica based on their sizes (largest replica are placed first);
  3. For each Candidate Replica, find the best fitting node and assign it. 

(The steps only feature work done in the main thread; work done in the asynchronous thread does not significantly impact performance and is not listed.)

When WAGED calculation is slow, bottlenecks can be identified in the above steps; improvements and optimizations are necessary for such bottlenecks. 

Problem Statement

After reports of large clusters converging slowly, it's observed that the partial rebalance latency (which is roughly equivalent to summed latency of the above 3 steps of partial rebalance) is as high as 30 to 40 seconds; meanwhile, the cluster has 60% baseline divergence and rising. The main thread workload can be calculated based on baseline divergence - for a cluster with 25k replica, 60% baseline divergence means that there are 15k Candidate Replica. That means for each pipeline iteration, the controller is working with 15k replica for each of its 3 steps. 

Because of the massive cluster size and Candidate Replica load, 3 problems are revealed:

  1. The sorting algorithm cannot finish within a reasonable timeframe;
  2. Generating a cluster model for a large cluster is expensive;
  3. The find-best-node algorithm is doing mostly wasted work, since the algorithm is deterministic, and the cluster state remains the same. 

Whether 60% baseline divergence of a 25k replica cluster, or a lower divergence of a larger cluster, these 3 problems will have impact on rebalancer performance when workload is large. Therefore, these 3 problems must be addressed. 

For problem 1, Helix developers have sped up the sorting algorithm to address it.

For problem 2 and 3, a new solution is required and will be documented here. 

Proposed Design

Overview

The rebalancer's functionality can be further divided into several categories with different priorities:

  1. Improve evenness;
  2. Assign new partitions.
  3. Assign node-less partitions;
  4. Ensure minActiveReplica.

Each category can be further sorted by priorities:

Rebalancer Functionality Description Priority Who does this?
Ensure minActiveReplica  Exclude downed or disabled nodes regardless of delayed rebalance window, then ensure minActiveReplica. Immediately Required Rebalance Override (if nodes are down but in the delayed rebalance window, and if partitions are missing minActiveReplica, rebalance override add assignments to fulfill minActiveReplica)
Assign node-less partitions If partitions were assigned to nodes that are permanently downed, assign the partitions somewhere else.  Preferably Immediately Required Partial Rebalance (if node space permanently changes, partial rebalance will reassign partitions that no longer have a valid node assigned)
Ensure evenness Move existing partitions so that a better weight evenness is achieved.  Eventually Required

Partial Rebalance (scope based on Baseline)

Global Rebalance (scope based on cluster changes)


Assign new partitions Assign partitions of newly added Helix resources. Eventually Required

Partial Rebalance

Global Rebalance

The proposed design moves or keeps every immediately required actions to the synchronous thread, and moves or keeps every eventually required actions to the asynchronous thread. 

After the change, the following steps happen in the main thread:

1) If necessary, calculate new partition assignment based on live and enabled nodes:

  1. Find all replicas that are assigned to permanently downed nodes; they need to have their placements recalculated (Candidate Replica);
  2. Sort Candidate Replica based on their sizes;
  3. For each Candidate Replica, find the best fitting node and assign it. 

2) If necessary, start Rebalance Overwrite. 

Main thread step 1 addresses "Assign node-less partitions". Main thread step 2 addresses "Ensure minActiveReplica". Note the ordering because the first step may already satisfy minActiveReplica. 

Instead of running Partial Rebalance every time in the main thread, which means recreating cluster models and running the find-best-node algorithm every time, the main thread is now conditional and/or has stricter conditions ("if necessary"). That means if not necessary (which is often the case), no cluster model will be created and no wasted work will be done; if necessary, the rebalance scope is much smaller.

The proposed design added a new set of steps to handle "Assign node-less partitions"; combining with the existing Rebalance Override, we will name all main thread logic Emergency Rebalance.

New and Old Pipelines

Rebalance Type Description Scope Async? Produces
Emergency Rebalance (New)
  1. If there are node-less partitions, kill async Partial Rebalance thread if it exists, then assign node-less partitions based on the previous Best Possible;
  2. If step 1 happened, persist the result as Best Possible Assignment;
  3. Trigger async Partial Rebalance;
  4. If minActiveReplica is violated, assign partitions that are on any downed/disabled nodes to immediately available nodes based on the previous Baseline (this is just Rebalance Overwrite, but the condition to trigger it is new);
  5. Return the final result which will apply to IdealState. 

Step 1: partitions assigned to permanently downed nodes. 

Step 4: partitions assigned to nodes that are not immediately available. 

No

Step 1 optionally produces Best Possible Assignment. 

Step 4 creates a temporary state that's not persisted (like Rebalance Overwrite behavior). 

Global Rebalance (Unchanged) (Unchanged) Yes Baseline Assignment
Partial Rebalance (Changed)
  1. Calculate partition assignment based on previous Best Possible (this is old Partial Rebalance behavior);
  2. If the result differ from the persisted result, persist it; then, trigger the pipeline so that main thread logic can run again. (New)
(Unchanged) Yes (Changed Best Possible Assignment, only if new result differs from the old persisted result. (Changed)

Rebalance Overwrite (Combined)

(Now combined with Emergency Rebalance) N/A N/A N/A

Architecture

Main thread before this change:

(Note that Rebalance Overwrite is omitted from the graph for the sake of simplicity. It could alter the Best Possible assignment before the assignment is applied to IdealStates. )

image

Assume that at all units of time, there is a cluster event, which triggers partial rebalance.

Assume that BestPossibleAssignment takes 2 units of time to compute. The t unit is arbitrary and does not scale with real latency. 

Thread Pipeline t=n t=n+1 t=n+2 t=n+3 t=n+4
Cluster event

A cluster event happens and triggers Partial Rebalance.

A cluster event is queued because the main thread is blocked. 

A cluster event is dequeued and triggers Partial Rebalance.

A cluster event is queued because the main thread is blocked. 

A cluster event is queued because the main thread is blocked. 

A cluster event is dequeued and triggers Partial Rebalance.

A cluster event is queued because the main thread is blocked. 

Main thread Partial Rebalance
  • Pipeline triggered by cluster event. With reference to previous Baseline and BestPossible, the new BestPossible (t=n) computation starts. 
  • BestPossible (t=n) still computing.
  • BestPossible (t=n) finishes computing. It's stored to ZooKeeper and also written to IdealState. 
  • Pipeline triggered by cluster event. With reference to previous Baseline and BestPossible (t=n), BestPossible (t=n+2) computation starts. 
  • BestPossible (t=n+2) still computing.
  • BestPossible (t=n+2) finishes computing. It's stored to ZooKeeper and also written to IdealState. 
  • Pipeline triggered by cluster event. With reference to previous Baseline and BestPossible (t=n+2), BestPossible (t=n+4) computation starts. 

The purpose of this illustration is to show the current thread structure, and the fact that slow BestPossibleAssignment computation leads to a slow responding pipeline. 


WAGED thread structure after this change:

image

Assume there are cluster events at t=n, t=n+1, t=n+2; the cluster event at t=n and t=n+1 are permanent node down events, so Emergency Rebalance kicks in. A

Assume that Partial Rebalance takes 2 units of time to compute BestPossible, and Emergency Rebalance takes 1 unit of time to compute due to smaller scope. The t unit is arbitrary and does not scale with real latency. 


Thread Pipeline t=n t=n+1 t=n+2 t=n+3 t=n+4
Cluster event

A cluster event happens, and triggers Emergency Rebalance. It's a permanent node down event so Emergency Rebalance computes. 

A cluster event happens, and triggers Emergency Rebalance. It's a permanent node down event so Emergency Rebalance computes. 

A cluster event happens, and triggers Emergency Rebalance.

Nothing happens. 

The pipeline is triggered by the asynchronous thread because Partial Rebalance has finished computing.  

Main thread Emergency Rebalance
  • Pipeline triggered by cluster event. With reference to previous Baseline and BestPossible, the new BestPossible (t=n) computation starts (limited scope).  
  • BestPossible (t=n) finishes computing. It's stored to ZooKeeper and also written to IdealState. 
  • A new Partial Rebalance thread is started. 

  • Pipeline triggered by cluster event. With reference to previous Baseline and BestPossible (t=n), the new BestPossible (t=n+1) computation starts (limited scope).  
  • Since Emergency Rebalance is starting again, kill the existing Partial Rebalance thread. 
  • BestPossible (t=n+1) finishes computing. It's stored to ZooKeeper and also written to IdealState. 
  • A new Partial Rebalance thread is started. 
  • Pipeline triggered by cluster event. However, no emergency rebalance is necessary. Return the previous BestPossible (t=n+1). 

Nothing happens.

  • Pipeline triggered by asynchronous thread. However, no emergency rebalance is necessary. Return the previous BestPossible (t=n+2). 
  • A new Partial Rebalance thread is started. 
Asynchronous thread Partial Rebalance (#1)


  • With reference to previous Baseline and BestPossible (t=n), the new BestPossible (t=n+1) computation starts.  
  • Thread is killed. 



Partial Rebalance (#2)
  • With reference to previous Baseline and BestPossible (t=n+1), the new BestPossible (t=n+2) computation starts.  
  • BestPossible (t=n+2) still computing.
  • BestPossible (t=n+2) finishes computing. It's stored to ZooKeeper. 
  • A cluster event is triggered. 

With the new thread structure, the main pipeline is doing minimal necessary computation work. The work is fast and is optional; unless invalid placements occur, the pipeline will be skipped. 

Thread Sequencing

As mentioned in previous sections, both main thread and asynchronous thread are computing BestPossibleAssignments. While conceptually they are different, they are both stored in the same place in ZooKeeper and they are both written to IdealState. To avoid race condition, the priority is given to the main thread.

If main thread does not compute assignment:

Since Emergency Rebalance only computes due to nodes permanently down or minActiveReplicas, usually it does nothing.

In this case, the main thread will attempt to create a Partial Rebalance thread; if it already exists, it will not. 

If main thread does compute assignment:

If Emergency Rebalance is triggered, it will immediately cancel any existing Partial Rebalance thread. Only when it finishes computing, and finishes persisting, it will start a new Partial Rebalance thread. 

Thus there will be the following cases:

Case 1: Partial Rebalance → Partial Rebalance

This is the expected case during normal operations. Cluster events happen, and none of them are permanent node down or minActiveReplica violation. Emergency Rebalance does not compute; the main thread attempts to spawn Partial Rebalance threads, before it applies the previous BestPossibleAssignment that was persisted to the cluster. 

Existing Partial Rebalance threads compute in peace until finishing. Each Partial Rebalance calculation will be based on the previous Partial Rebalance calculation. 

Once a Partial Rebalance thread finishes computing, it persists the result and triggers the main thread which applies the new result to the cluster. 

Note that there's no possibility that a thread spawning cycle starts: Partial Rebalance only persists and triggers the main thread if the computed result is different from the previous result; else, the thread quietly dies and the controller awaits the next real cluster events. 

This case is similar to the current WAGED pipelines, except that only different results are persisted. 

Case 2: Partial Rebalance → Emergency Rebalance

A cluster event of permanent node down or minActiveReplica violation happens. Emergency Rebalance kills the existing (singular) Partial Rebalance thread and starts computing. a new BestPossibleAssignment with limited scope will be computed, persisted, and returned. Then, a new Partial Rebalance thread will be spawned. 

The Emergency Rebalance calculation will be based on the previous Partial Rebalance calculation (minimal movement). The future Partial Rebalance calculation will be based on the current Emergency Rebalance calculation (minimal movement). 

Case 3: Emergency Rebalance → Emergency Rebalance

If cluster events such as permanent node down and minActiveReplica violation keeps on happening, every event will cause the main thread to compute. Therefore, 1) a new BestPossibleAssignment with limited scope will be computed (fast) every pipeline iteration in the main thread, 2) each Emergency Rebalance calculation will be based on the previous Emergency Rebalance calculation, 3) Partial Rebalance threads are created and then killed immediately. 

In this scenario, every calculation has the minimal amount of scope, and the main thread is busy "putting out fire". One may note that the evenness is not reinforced, but when every cluster event is catastrophic, the priority should be resolving the problem instead of evenness. 

Case 4: Emergency Rebalance → Partial Rebalance

When cluster goes back to normal, Emergency Rebalance does not compute and the existing Partial Rebalance finishes in peace. Now back to case 1. 

Note

In the unlikely event that the asynchronous partial rebalance finishes very fast, one of two situations happen:

  1. The main thread has started computation. If so, the main thread already tried to kill the asynchronous thread before it can persist. 
  2. The main thread has not started computation. If so, the asynchronous thread will successfully write its result to ZooKeeper. This is okay, because this result is based on an previous BestPossible, meaning movements are minimized, and the new computation will be based on this persisted result, meaning movements are minimized. 

Flowchart

Dotted lines indicate asynchronous work. 

image


Performance

The main thread performance is expected to improve drastically due to 2 reasons:

  1. Without invalid placements, the main thread does nothing. The WAGED pipeline will have 0 calculation latency in this case. 
  2. With invalid placements, the main thread only recalculates placements for the partitions on permanently down nodes or to fulfill minActiveReplica, which should be a relatively smaller number of partitions comparing to the total partitions in a cluster. The pipeline will have minimally required calculation latency in this case. 

During usual circumstances, we anticipate the main pipeline to have performance in order of 100 milliseconds. 

Backward compatibility

N/A. This is a controller change with no impact on customers. 

Risks

As the proposed design concerns race conditions and multithread operations, the development team must carefully verify the logic to avoid possible deadlocks or data-corrupting race conditions. 

The evenness must also be ensured - test clusters must be put under prolonged cluster events to show how their evenness changes over time. 


Clone this wiki locally