Skip to content

Concurrency and Parallelism for BucketDataAccessor

Hunter Lee edited this page Oct 10, 2019 · 2 revisions

Overview

BucketDataAccessor is a Helix's data accessor interface designed to write large amounts of data to ZooKeeper. This wiki identifies the limitations/gaps of BucketDataAccessor and suggests ways to address them.

Background

See Storing Metadata for Helix Constraint-based Rebalancer for more context.

Problem Statement

Low Parallelism

Currently, ZkBasedBucketDataAccessor uses a locking mechanism implemented using ZooKeeper's ephemeral nodes. This is similar to a mutex, and allows for one entity to either read/write. Since there could only be one ongoing operation, the users of this data accessor such as WAGED rebalancer suffers from low parallelism. More specifically, it is not possible to have verifiers read from the metadata written to ZK while some RW is happening for testing/verification purposes.

Latency

Overall latency of the rebalance pipeline (BestPossibleCalcStage) could be reduced if we could parallelize read and write operations. The current implementation, due to locking, has to linearize all read and write operations.

Added Complexity from Locking

Inevitably, there will be unexpected failures. When we use locks in a distributed manner, we need to account for all failure scenarios related to connection issues. This makes coding a little more difficult because of many try-catch statements.

Requirements

Readers

Reads shouldn't be blocked by writes. Reads shouldn't block each other. Reads should read the latest data if possible. Reads should only read successfully-written data (no corrupt data).

Writers

Writes shouldn't block other writes. Writes shouldn't be blocked by reads. Writes shouldn't overwrite each other. Older writes should never overwrite the lastSuccessfulWriteVersion of newer writes.

Solution

Metadata ZNodes

LastSuccessfulWriteVersion for Reads There will be a ZNode that records the version that was written successfully (so safe to be read) so that readers know which version to read.

LastWriteVersion for Writes

There will be a ZNode that records the version that the last writer used to write. A writer would come in and check this ZNode and increment it, and use that version to write.

For these two ZNodes, synchronization now becomes a problem. For this, we'll use optimistic concurrency control so that there aren't any conflicts among different versions.

Optimistic Concurrency Control

Helix's data accessor supports a ZkClient interface called DataUpdater that we can instrument to implement a ZNode version-based optimistic concurrency control. It goes like this:

  1. Read old stats
  2. Write with the expected version from the stats that was read in Step 1
  3. If the versions don't match, retry the write

Monotonically Increasing Version Number

Another property to add to the solution is that it will essentially adopt the idea of MVCC and have versions increase monotonically. This ensures that there will never be version conflicts.

To prevent an integer overflow, we will reset the number at a high-enough version number (Integer.MAX_VALUE for example). Or another easier option is using long.

Garbage Collection of Stale Metadata

Because of the monotonically increasing version number property, we will end up with many versions of the metadata. This is a problem that needs to be dealt with because it might cause ZK's disk to run out of space. To garbage collect, we will append an asynchronous operation at the very end of a successful write to delete all versions before the new last successful write version.

Retrying Reads upon Failure

The garbage collection described above may potentially disrupt an ongoing read. In that case, we fail the read. If the read gets re-tried, then it will pull the last successful write version afresh and read the latest version.

Pseudocode

Reader

boolean read() { read last successful write version number from metadata znode read last successful write version if empty/failure, terminate }

Writer

boolean write() { increment lastWriteVersion using updater write to the incremented lastWriteVersion if the write finishes, update lastSuccessfulWriteVersion using updater asynchronously trigger garbage collection }

Clone this wiki locally