Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object Store High Availability #10

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions reps/2022-04-21-object_store_HA.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
## Summary - Object Store High Availability

### General Motivation

Ray is a general-purpose and powerful computing framework. With the Object Store, it can be easily extended into a data service to provide data for various distributed tasks. Ray uses decentralized object ownership to avoid centralized bottlenecks in managing an object’s metadata (mainly its reference count and object directory entries), but it difficult to handle failover.

For now, users can only rely on lineage to recover the unavailable object. But lineage has many restriction:
- Can not recover the object which put in object store via `ray.put`.
- Can not recover the object returned by actor task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is not exactly true (actor tasks can also be retried, but user has to guarantee that they're idempotent).

- Require task is idempotent.

#### Goal

1. Objects can be specified for high availability mode, other objects are the same as before.
2. Any high availability objects should still be accessible if we encounter a single-node failure.

### Should this change be within `ray` or outside?

Changes are within Ray core.

## Stewardship

### Required Reviewers

@stephanie-wang, @ericl, @scv119, @kfstorm, @raulchen

### Shepherd of the Proposal (should be a senior committer)

@ericl, @raulchen, @stephanie-wang

## Design and Architecture

### Problem statement

#### Problem 1: Object Owner Failure

The owner of an object stores the metadata of the object, such as reference count and locations of the object. If the owner dies, other workers which hold the object ref cannot access the data of the object anymore because all copies of the object will be deleted from the Object Store.

#### Problem 2: Object Borrower Failure

In the chained object borrowing case, the owner of an object is not aware of the indirect borrowers. If all direct borrower fails, the owner will consider the object out-of-scope and GC the object. Accessing the object on indirect borrowers will fail with an `ObjectLostError`.

more details: [issues 18456](https://github.com/ray-project/ray/issues/18456)

#### Problem 3: Loss of All Copies

Data of objects stored in the plasma store. For now, the plasma store is a thread of the raylet process, failure of the raylet process will lose data which store in plasma. Some objects which only one copy in that failed plasma store, will be unavailable.

### Proposed Design

We implement
#### Options to implement object HA with checkpoint

We implement object HA based on the checkpoint, so we can walk around **Problem 3: Loss of All Copies**,
previously discussed options: https://github.com/ray-project/enhancements/pull/10#issuecomment-1127719640

We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with `max_restarts=-1`. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an `owner_is_gcs` flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to use high available storage instead of local disk?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Object Store HA hope that if a single node fails, the data will not be lost, so it still needs to rely on external storage, like s3


Note that we don't need to restore the reference table in global owners by pulling info from the cluster because objects are already checkpointed. Checkpoint info is stored in the reference table and it will be encoded when serializing an object ref, hence checkpoint info is recoverable. If borrowers detected owner failure, they will try to reconnect to the owner and the recovered owner will recover the reference count and borrower list via these new RPC connections.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that the checkpoint location will be stored with the serialized ObjectRef? It makes sense, but if that is the idea, could you make that more explicit somewhere in the design? For example, how does the protocol for retrieving the object value change when using HA?

I'm also wondering if we can try a version where the application has to manually free the object. Then we don't need to worry about keeping track of the ref count at all. Would this meet your use cases?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we will store checkpoint_uri into ObjectRef, protocol is here: #10 (comment). I will add to the REP later.

yes, In our prototype test, we do not recover the reference count after the owner restart, because the HA Object doesn't have a primary copy, and users need to release the checkpoint by themself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current prototype, refs caused by borrowers are still recorded. We didn't simplify this. But if the global owner is recovered from failure, the reference count will become zero and direct borrowers will not re-report borrow info to the new owner process. In this case, the owner will not GC the object because it knows that the object is HA.

We did think about bypassing reference recording for HA objects. But for the simplicity of the prototype, we didn't implement this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's also not clear to me how exactly the recovery protocol work. it's be nice to have an explicit section explain the recovery protocol.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current design is quite simple:

  1. Try to get the object locations from the owner, and then pull data directly from one of those locations
  2. If 1 fails, try to restore the data from the checkpoint. The current implementation of the prototype is to directly use an io_worker to read data from the local disk, which is the same as the previous restore the spilled data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation of the prototype is to directly use an io_worker to read data from the local disk

to be clear, the test cases we added in the prototype use local disks. In real environments, we should use remote distributed storage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does checkpointed object storage get GCed? Both under normal case and under failure?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GC of the checkpoint is handed over to the user to be responsible

We can add some strategies to help user management, such as deleting all checkpoints at the end of the job, if the cluster fails unexpectedly, we can only rely on external storage for its own life cycle management.


- Pros
- No major protocol changes compared to the existing ownership assignment protocol.
- Low dev cost.
- No owner address updates because the `owner_is_gcs` flag or the actor name is encoded in it.
- Low dev cost.
- Cons
- Centralized/semi-centralized architecture.
- Potentially bottleneck.
- Bad performance.
- Corner case handling such as RPC failures.
- Potentially high dev cost.

We prefer named actors rather than GCS as global owners.

- The number of global owners is configurable, hence scalable.
- No need to embed (part of) core worker code into GCS.
- No increased complexity for GCS.

#### API:

``` python
# Set the number of global owner (default is zero) and the number of HA object's primary copies (default is zero).
ray.init(
job_config=ray.job_config.JobConfig(
num_global_owners=16,
num_primary_copies=3,
)
)

# put a HA object. the default value of `enable_ha` is False.
ray.put(value, enable_ha=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are use cases for specifying the exact actor that should own the object. In workflows, for example, this could be the WorkflowManagementActor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also have an _owner= parameter, maybe we can reuse that one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is not very clear on the REP, the previous consideration was to hope that this interface should be as simple as possible, user only needs to config GlobalOwner on JobConfig (like the number of GlobalOwner):

ray.init(job_config=JobConfig(global_owner={"number": 16}))
ray.put("test", enable_ha=True)

Maybe we can provide an interface to initialize GlobalOwner:

ray.init()
### creat owner [o1, o2, o3]
...
###
# or ray.init_global_owner(number=16)
ray.init_global_owner(owners=[o1, o2, o3])
ray.put("test", enable_ha=True)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are use cases for specifying the exact actor that should own the object. In workflows, for example, this could be the WorkflowManagementActor.

Is there any reason that dummy actors as global owners don't work in workflows?


# normal task: returns HA object.
# the default value of `enable_ha_for_return_objects` is False.
@ray.remote(enable_ha_for_return_objects=True)
def fun(*args, **kwargs):
...

# actor task: returns HA object.
# the default value of `enable_ha_for_return_objects` is False.
@ray.remote(enable_ha_for_return_objects=True)
class Actor:
def func(self, *args, **kwargs):
...

```


## Compatibility, Deprecation, and Migration Plan

All these features in this REP are optional. The default behavior is the exactly the same as before. Users need to explicitly configure new options to enable these features.

## Test Plan and Acceptance Criteria

We plan to use a Ray job to test the HA feature of the Object Store.

1. In a multi-node cluster, each node runs two types of actors: producer and consumer.
- Each **producer** actor produces data and stores object refs at local. Adds or deletes objects according to a certain strategy for testing object GC.
- Each **consumer** actor gets an actor handle of a producer actor via the actor name and borrow objects from the producer actor randomly through `ray.get`.
2. Adjust data scale according to parameters:
- The size of an object.
- The number of objects.
- The capacity of the Object Store.

Acceptance criteria:

1. Performance degradation is acceptable when no process or node failures happen.
2. When a single worker process or Raylet fails, the test job can finish eventually.

## (Optional) Follow-on Work

- **Prototype test**