Skip to content

Storing Assignment Metadata in ZooKeeper

Hunter Lee edited this page Aug 15, 2019 · 3 revisions
Note: the actual implementation may differ from what's outlined in this document.

Overview

This design wiki contains information about how to store cluster metadata (more specifically, the "best possible" mapping compute result) efficiently to ZooKeeper given the constraints around it as a distributed coordination service.

Introduction

What

Helix is a generic cluster management framework upon which many stateful applications are built. One of its core functionalities is to automate the computation of partition mappings, namely, Helix FULL-AUTO rebalancer. The full-auto rebalancer, at its core, uses CRUSH algorithm (a variant of consistent hashing) with a twist that improves even distribution. However, this has proved problematic because it computes an entirely new set of partition mappings upon cluster topology change.

Why

This type of computation done in a stateless manner is problematic for stateful applications. Stateful applications define state transitions that are often both compute and I/O heavy, which means that these state transitions contribute to increased periods of unavailability. Therefore, it has become necessary for Helix to come up with the next-generation full-auto rebalancer, namely, Constraint-based rebalancer.

How

[The new full-auto rebalancer with globally even distribution aims to make stateful assignments by taking two things into account: 1) an ideal set of partition mappings that satisfy application-defined constraints and 2) the previous assignment (previous state). The second input will serve as the previous state of the cluster, and the new rebalancer will reconcile the delta between the two input variables, thereby outputting a set of partition mappings that can reduce unnecessary state transitions (partition movements). This, however, presents a new challenge because this approach requires Helix to persist the previous assignment in the metadata store. The challenges around this will be further expounded in the Problem Statement section.

Background

ZooKeeper

This section presents relevant information about ZooKeeper, which is currently the sole metadata store for Helix.

The New Full-auto Constraint-based Rebalancer

The following wiki goes further in depth about the new full-auto rebalancer: The New Helix Rebalancer: Weight-Aware Globally-Even Distribute Rebalancer

Similar Studies

Uber Engineering has published a blog post on compressing their trip data. The following link provides relevant statistics on combining serialization and compression to greatly reduce the amount of data sent over the wire. This study will be referenced in the later section.

Problem Statement

Limitations Imposed by ZooKeeper

ZooKeeper is a metadata store that employs a set of filesystem APIs. The challenge of using ZooKeeper for storing Helix's assignment metadata is that ZooKeeper doesn't deal well with larger ZNode sizes. To be more precise:

  • By default, the maximum allowed size for a single ZNode is 1MB. This limit could be raised by changing deployment configs, but this option is risky because it could affect other operations and not recommended in general.
  • A single ZNode should ideally be less than 100KB, smaller the better as recommended by ZooKeeper PMC. ZNodes larger than this could cause latency spikes, which are what we want to avoid in production. Then the next question is: exactly how much data are we dealing with? This profiling would be the work left for individual application.

Architecture/Implementation

Objectives

The objectives of this project are threefold:

  1. Come up with a more compact representation of data to minimize the amount of bytes sent over the wire
  2. Devise a strategy to intelligently split up a large amount of data (>1MB) into multiple ZNodes to optimize RW latency
  3. Add support for transactions in the ZkClient interface used in Helix These are absolute requirements. Another theme of this project will be to have a more generic way to store a large amount of data in ZooKeeper in a way that is faster and does not stress the system. A discussion around tradeoffs between achieving generality vs maximizing the efficiency will be introduced in later sections.

The following sections describe how each objective is related to another and the work that needs to be done for each.

Compact Representation of Assignment Data

The new constraint-based rebalancer requires two versions of assignment metadata to be stored: 1) the result of global baseline calculation and 2) previous assignment. For more details on the nature of these assignments, see Cluster Change Detector for Helix Rebalancer#GlobalBaselineCalculationandPartialRebalance. Assuming no change in cluster topology, these two sets of assignment metadata should be similar in size.

Helix represents an assignment in a form of a nested map called ResourceStateMap. ResourceStateMap is a map of PartitionStateMap, which is in turn a map of Partition objects to String maps. The problem with this is that this is a pretty loose representation of data, so we could use a fast serialization protocol and a compression scheme to make the representation more compact. There are many options available when it comes to serialization and compression.

Encoding (Serialization)

As for serialization, the metric we want to optimize is the decode time. This is because encoding is not in the critical path of the rebalancer pipeline, but decoding is because reading in the baseline and the previous assignment is necessary. According to the table above, Protocol Buffer and MessagePack seem to be displaying best decoding characteristics. This project will explore actual latency characteristics using Helix's assignment data. Another thing of note is that MessagePack is schemaless. Again, the following table provides a summary:

The actual serialization protocol will be chosen at implementation time. However, MessagePack might be more convenient because it is schemaless, which allows this to be used for other types of Helix metadata. Also, it helps us avoid the effort of having to define protobuf schemas.

Compression

When it comes to compression, zlib and bz2 seem to be leading compression schemes. Both zlib (GZip) and bz2 are supported in Java either natively or by using an open-source library. Either works, but we note here that Helix already has GZip support (GZipCompressionUtil is available in the codebase).

Strategy for efficient ZooKeeper Reads and Writes

Let us reiterate the requirements discussed in the Problem Statement section:

A single ZNode size should be capped at 50-100KB to avoid unwarranted latency spikes This is because ZooKeeper tries to replicate every write using its consensus-based replication protocol called Zab. This requirement lends us to the following approach.

Bucketing the payload into N-KB ZNode Buckets

This idea has been explored before in Helix in the name of "bucketizing". However, it is no longer in use because read and write failures could leave the Helix Controller in a bad state. Setting the upper limit for the ZNode bucket size as 50KB, we have a tradeoff between the two goals: the number of writes and the size of each ZNode. At implementation time, a quick experiment will be done to find the largest bucket size that does not hurt the RW performance.

Alternative Design: Per-resource or Per-partition ZNodes

Alternatively, instead of defining a static bucket size, we could break up the assignment metadata into each ZNode representing either a resource or a partition. The advantage of this strategy is that it is easier to come up with a partial update scheme where only affected resource and partitions are updated, reducing the required amount of data sent over the wire. The drawback for this strategy is that this does not scale with the size of a single resource or a partition. In other words, if there were a god-resource with an excessive amount of partitions, this approach may fail.

Support for Transactions in Helix

ZooKeeper began its support for multi() API that is the foundation for transactions in ZooKeeper. Since the previous section requires that a series of reads or writes be performed successfully in an atomic manner, we will use ZooKeeper's multi() to enable transactions for such reads and writes. For ease of use and to minimize confusion, the transaction APIs will look like a filesystem APIs. The following are the example read and write APIs:

The APIs listed here are loosely defined; that is, they are subject to change during implementation.

`public class LargeDataAccessor {

public LargeDataAccessor(String path, int bucketSize) {}

public void bucketWrite(byte[] data);

public void bucketRead(byte[] data);

}`

Q: Why don't you just use ZooKeeper's Transaction class?

A: This was actually the plan, but ZooKeeper PMC/API documentation has it that the total data size in a single multi() call cannot exceed the default size limit (1MB) even though you might be writing to multiple ZNodes. This means that we need to build an abstraction layer using multi() to enable reads and writes for large data as discussed above.

Reads and Writes

Asynchronous Read and Writes with Callbacks

The actual write underneath the API calls will happen by way of asynchronous reads and writes. ZooKeeper provides these APIs, and failure scenarios will be dealt with by using callbacks to retry failed reads and writes.

Alternative Design: Chained multi() reads and writes

Since we cannot fit in all writes in a single multi() call, another approach is to chain multiple multi() calls in series. The advantage of this design is that we would know exactly which read/write failed, and it would be easy to deal with failures. The disadvantage of this design is that reads or writes do not happen in parallel, so the performance might be inferior to submitting all reads and writes at once using the asynchronous APIs.

(Optional) Managing Historical Metadata

We may need to consider persisting some historical metadata for debugging purposes. This requires deleting old metadata and updating another ZNode that keeps track of, for example, a sliding window of event ID-based metadata pair. multi() is useful in that delete operations and simple writes could be bundled into a transaction. This idea will be explored at implementation time.

Clone this wiki locally