From f986ebcf89a281bb5ed0c65dae25a01028099c3d Mon Sep 17 00:00:00 2001 From: hejialing Date: Thu, 21 Apr 2022 20:22:01 +0800 Subject: [PATCH 01/11] save --- reps/2022-04-21-object_store_HA.md | 169 +++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 reps/2022-04-21-object_store_HA.md diff --git a/reps/2022-04-21-object_store_HA.md b/reps/2022-04-21-object_store_HA.md new file mode 100644 index 0000000..8c7d118 --- /dev/null +++ b/reps/2022-04-21-object_store_HA.md @@ -0,0 +1,169 @@ +## Summary +### General Motivation + +Ray is a general-purpose and powerful computing framework, combined with the Object Store module, it can be easily extended into a data service to provide data for various machine learning tasks. Here are two more typical scenarios. + +#### Scenarios One: Data-Provider for Deep Learning Offline Training + +A data-provider can be implemented by Ray, which has the following functions: +1. Read data from various data sources and store them in the Object Store, which can be read by ray-client. +2. Has simple data processing capabilities, such as data augmentation, data distillation, etc. + +#### Scenarios Two: Mars On Ray + +[mars](https://github.com/mars-project/mars) is a tensor-based unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and many other libraries. + +Mars on Ray uses Ray actors as execution backend, Ray remote function-based execution backend is in progress. Mars will store data in the Ray Object Store. + +#### Why does the Object Store need High availability? + +In those scenario, there are two requirements for the service: +1. **Stability**: As a long-running service, it needs to be automatically restored after some process failure. Currently, any worker who dies can result in objects lost. +2. **Auto-Scaling**: Primarily reducing cluster size when data requirements are reduced. Drop any node can cause many objects to lost primary-copy, and those objects will become unavailable + +### Goal + +Any alive worker can read any object in scope via `ray.get`, when any node `N_A` in the ray cluster is dead. + +### Should this change be within `ray` or outside? + +main `ray` project. Changes are made to Ray Core level. + +## Stewardship +### Required Reviewers + +@stephanie-wang, @ericl, @scv119 + +### Shepherd of the Proposal (should be a senior committer) + +@ericl + +## Design and Architecture + +### Problem statement + +#### Problem One: The Object Owner Failure + +The object owner stores the metadata of an object, it manages the reference and primary-copy of the object. + +For example: If the owner of the object `O_A` in node `N_A`, the object `O_A` will be out of scope when node A dead. The primary copy will be deleted, and any worker who wants to get `O_A` will be told the object is lost. + +#### Problem Two: The Object Borrower Failure + +Borrower failure causes object reference loss, and makes the owner release the object early. + +For example: If the owner of the object `O_A` borrows `O_A` to the worker `W_A` which in node `N_A` and `W_A` borrow `O_A` to a new worker `W_B`, the object `O_A` will be released, when the worker `W_A` is dead. When calling `ray.get` on `W_B` will get an exception: RayObjectLostError. + +![image_1_the_object_borrower_failure](https://user-images.githubusercontent.com/11995469/164454286-6d7658c7-952b-4351-bd06-7404f49db919.png) + +#### Problem Three: The Primary Copy Lost + +Primary copy loss due to node failure, will cause the object which does not have a copy on another node’s plasma store not to be available. + +For example: If the primary copy of the object `O_A` in node `N_A` and `O_A` does not have any copy on another node’s plasma store, the object will be not available when the node `N_A` is dead. + +### Proposed Design + +#### How to solve owner failure? + +##### Option one: Highly Available Storage to Store Object Metadata + +Use high-availability storage to store original data to help owners restore.Store A in external storage to ensure data is not lost. When the owner restarts, the data in the `reference table` can be restored, and rebuild the RPC link. + +**Pros**: + +- Relatively simple. + +**Cons**: + +- Increased requirements for deployment environments. +- The modification of the `reference table` is a high frequency operation and may have performance issues. +- Potentially some trickier consistency issues when the owner fails. + +##### Option two: Global Owner + +Add a job level setting (default is disable) to enable this feature. Use ray-actor to be the global owner to own all objects created by interface `ray.put`. The global owner is a dummy actor with `max_restarts=-1`, and try to disperse on each node. + +There were two options active or passive to rebuild the `reference table` and the RPC link when `G_1` retry. + +**Active**: + +The global owner will actively collect information about the alive objects it owns to rebuild the `reference table`. +1. `G_1` begins to rebuild the `reference table`, setting status to be `REBUILDING`. +2. Send RPC to all Raylets, ask every raylet to traverse all workers on it, and reply with the information of all objects owned by `G_1`. Use those information to rebuild the `reference table` and the RPC link. +3. `G_1` sets the state to `READY`. +4. Some RPC's (WaitForObjectFree) reply-callback will not be called before the state of `G_1` not `READY`. + +**Passive**: + +As the illustration shows, every raylet will maintain a RPC to every global owner, to watch those global owners. When the global owner `G_1` died and the job was not finished, + +1. Raylet will find `G_1` dead through RPC disconnection, then traverse all the workers on current raylet, collect the information (reference, primary copy address) of all objects whose owner is `G_1`, and send it to `G_1` when rebuilding the RPC link. +2. `G_1` will reconnect to the raylet and the workers on it after receiving the RPC from this raylet. +3. `G_1` sets the state to `READY` after rebuilding with all raylets. +4. Some RPC's (WaitForObjectFree) reply-callback will not be called before the state of `G_1` not `READY`. + +**API**: +```python +# Set the number of global owners, the default number is zero. The object store behaves the same as before when the number of global owners is zero. +ray.init(global_owner_number=16) +``` +![image_2_global_owner](https://user-images.githubusercontent.com/11995469/164455556-b2f4101b-23f4-46db-808a-4407d48526a6.png) + +**Pros**: + +- No need to rely on external storage. +- Performance will not be affected. + +**Cons**: + +- Active: The rebuilding process may take a long time. +- **Passive**: When waiting for the RPC which is sent by raylets, the global owner needs to handle timeouts, which can be difficult. +- Relatively complex. + +#### How to solve borrower failure? + +##### Star Topology + +Use Star Topology instead of Tree Topology, when the number of global owners is greater than zero, make the owner directly borrow the object to other workers. + +As the illustration shows, the worker `W_A` owns the object `O_A`, and the worker `W_B` already borrows `O_A` from `W_A`. Here is the process by which `W_B` borrows `O_A` to `W_C` + +1. The worker `W_B` sends the object `O_A` to the worker `W_C`, and it is stored in the `W_C` reference table. +2. The worker `W_B` increments the reference to object `O_A` by one. Avoid the object being freed before finish borrowed. +3. The worker `W_C` sends an async RPC to the worker `W_A`, making `W_A` add `W_C` in the borrowers list. +4. The worker `W_A` sends an async RPC(WaitForRefRemoved) to the worker `W_C`. +5. The worker `W_C` sends an async RPC to `W_A`. +6. The worker `W_B` reduces the reference to object `O_A` by one. When the RPC which sends in step 1 replies. + +![image_3_star_topology](https://user-images.githubusercontent.com/11995469/164456154-2163f505-d835-4d23-9901-fac6d867d368.png) + +#### How to solve primary copy loss? + +##### The Backup of Primary Copy + +After the primary copy is created, create a backup of it on another raylet. When the owner of object `O_A` finds its primary copy is unavailable, `G_1` will rebuild primary copy by fellow steps: +1. When RPC disconnects, it will turn backup to primary copy, and create a new backup. +2. Send a RPC request to the raylet which has `O_A` backup, turn it to primary copy. +3. `G_1` sends an async RPC request to another raylet to create a new backup. +4. The raylet which has the new backup will keep a RPC connected to watch `G_1`. + +![image_4_rebuild_primary_copy](https://user-images.githubusercontent.com/11995469/164456515-0f9e7d15-51be-4bb4-8852-ca5017a0411e.png) + +When `G_1` finds the backup of object `O_A` is unavailable, `G_1` will send a RPC request to the Node `N_B`, and make it create a new Backup. As the illustration shows: +1. When RPC disconnects, `G_1` finds the backup of `O_A` is unavailable. +2. `G_1` sends a RPC request to make `N_B` create a new backup. +3. `G_1` sends a RPC request to `N_C`. +4. `N_C` creates the backup of `O_A`. +5. `N_C` keeps a RPC connected to watch `G_1`. + +![image_4_rebuild_backup](https://user-images.githubusercontent.com/11995469/164456742-c585aea8-4df8-47a4-b632-e7b61c756536.png) + +## Compatibility, Deprecation, and Migration Plan + + +## Test Plan and Acceptance Criteria + + +## (Optional) Follow-on Work + From adb9df5eadd5f09c393fd0bb2c408c2586d26091 Mon Sep 17 00:00:00 2001 From: hejialing Date: Fri, 22 Apr 2022 17:12:25 +0800 Subject: [PATCH 02/11] save --- reps/2022-04-21-object_store_HA.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/reps/2022-04-21-object_store_HA.md b/reps/2022-04-21-object_store_HA.md index 8c7d118..3185382 100644 --- a/reps/2022-04-21-object_store_HA.md +++ b/reps/2022-04-21-object_store_HA.md @@ -161,9 +161,23 @@ When `G_1` finds the backup of object `O_A` is unavailable, `G_1` will send a RP ## Compatibility, Deprecation, and Migration Plan +Fully forward compatible, the behavior of the object store will same as usual when the high-available mode is disabled. ## Test Plan and Acceptance Criteria +We plan to use a ray job to test the object store HA. About this job: + +1. Multiple node, and each node has two type actors: producer and consumer. + - **producer**: Produce data and cache `ObjectRef`. Add or delete objects according to a certain strategy, for testing object gc. + - **consumer**: Get the `ActorHandle` for producer via actor name, and borrower object randomly through `ray.get`. +2. Adjust data scale according to parameters, include: + - Data size, proportion of plasma store. + - The number of objects. + +Acceptance criteria: + +1. Performance reduction is acceptable when no process or node failure. +2. Any one raylet process of any workers processes failure, and the job will finish in the end. ## (Optional) Follow-on Work From 444073cfb6e62cdd4da1205019c2823499252c18 Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Mon, 25 Apr 2022 20:36:57 +0800 Subject: [PATCH 03/11] Refine wording. Small adjustments. Add TODOs. --- reps/2022-04-21-object_store_HA.md | 207 ++++++++++++++++++----------- 1 file changed, 127 insertions(+), 80 deletions(-) diff --git a/reps/2022-04-21-object_store_HA.md b/reps/2022-04-21-object_store_HA.md index 3185382..7f3502a 100644 --- a/reps/2022-04-21-object_store_HA.md +++ b/reps/2022-04-21-object_store_HA.md @@ -1,35 +1,43 @@ -## Summary +## Summary - Object Store High Availability + ### General Motivation -Ray is a general-purpose and powerful computing framework, combined with the Object Store module, it can be easily extended into a data service to provide data for various machine learning tasks. Here are two more typical scenarios. +Ray is a general-purpose and powerful computing framework. With the Object Store, it can be easily extended into a data service to provide data for various machine learning tasks. Here are two typical scenarios. + +#### Scenarios 1: Data Provider for Deep Learning Offline Training + +We can implement a data provider on top of Ray with the following functions: + +1. Reading data from various data sources and storing them in the Object Store, which can be read by workers. +2. Simple data processing capability, such as data augmentation, data distillation, .etc. -#### Scenarios One: Data-Provider for Deep Learning Offline Training +#### Scenarios 2: Mars on Ray -A data-provider can be implemented by Ray, which has the following functions: -1. Read data from various data sources and store them in the Object Store, which can be read by ray-client. -2. Has simple data processing capabilities, such as data augmentation, data distillation, etc. +[Mars](https://github.com/mars-project/mars) is a tensor-based, unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and many other libraries. -#### Scenarios Two: Mars On Ray +Mars on Ray uses Ray actors as execution backend. Mars leverages the Object Store for data exchanges between Mars workers. -[mars](https://github.com/mars-project/mars) is a tensor-based unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and many other libraries. +#### Why do we need high availability for the Object Store? -Mars on Ray uses Ray actors as execution backend, Ray remote function-based execution backend is in progress. Mars will store data in the Ray Object Store. +There are two requirements from the above scenarios: -#### Why does the Object Store need High availability? +1. **Stability**: As a long-running service, workers need to be automatically restored after failures. Currently, a worker failure may result in object loss due to the worker is the owner of these objects. +2. **Auto-Scaling**: The cluster needs to be scaled down when the workload is reduced. Currently, dropping a node may result in object loss if the only copy of an object is on this node. -In those scenario, there are two requirements for the service: -1. **Stability**: As a long-running service, it needs to be automatically restored after some process failure. Currently, any worker who dies can result in objects lost. -2. **Auto-Scaling**: Primarily reducing cluster size when data requirements are reduced. Drop any node can cause many objects to lost primary-copy, and those objects will become unavailable +**TODO: Node failure is not mentioned in stability.** + +**TODO: Why these issues exist in these scenarios but not the others (e.g. Ray dataset)? Need explanation.** ### Goal -Any alive worker can read any object in scope via `ray.get`, when any node `N_A` in the ray cluster is dead. +All objects should still be accessible if we encounter a single-node failure. ### Should this change be within `ray` or outside? -main `ray` project. Changes are made to Ray Core level. +Changes are within Ray core. ## Stewardship + ### Required Reviewers @stephanie-wang, @ericl, @scv119 @@ -42,33 +50,41 @@ main `ray` project. Changes are made to Ray Core level. ### Problem statement -#### Problem One: The Object Owner Failure +**TODO: Is it better to have a separate section or separate document for the detailed design and let this REP mainly focus on motivation and high-level design?** + +#### Problem 1: Object Owner Failure -The object owner stores the metadata of an object, it manages the reference and primary-copy of the object. +The owner of an object stores the metadata of the object, such as reference count and locations of the object. If the owner dies, other workers which hold the object ref cannot access the data of the object anymore because all copies of the object will be deleted from the Object Store. -For example: If the owner of the object `O_A` in node `N_A`, the object `O_A` will be out of scope when node A dead. The primary copy will be deleted, and any worker who wants to get `O_A` will be told the object is lost. +For example, if the owner of the object `O_A` is on node `N_A`, the object `O_A` will be out-of-scope when node A dies. All copies of the object will be deleted, and any worker which wants to access `O_A` will fail with an `ObjectLostError`. -#### Problem Two: The Object Borrower Failure +**TODO: Naming refine. `O_A` -> `Object A`, `W_A` -> `Worker A`, `N_A` -> `Node A`, `G_1` -> `Global Owner 1`.** -Borrower failure causes object reference loss, and makes the owner release the object early. +#### Problem 2: Object Borrower Failure -For example: If the owner of the object `O_A` borrows `O_A` to the worker `W_A` which in node `N_A` and `W_A` borrow `O_A` to a new worker `W_B`, the object `O_A` will be released, when the worker `W_A` is dead. When calling `ray.get` on `W_B` will get an exception: RayObjectLostError. +In the chained object borrowing case, the owner of an object is not aware of the indirect borrowers. If all direct borrower fails, the owner will consider the object out-of-scope and GC the object. Accessing the object on indirect borrowers will fail with an `ObjectLostError`. + +**TODO: Add issue link.** + +For example, the owner of the object `O_A` borrowed `O_A` to the worker `W_A` which is on node `N_A`, and the worker `W_A` borrowed `O_A` to the worker `W_B`. If the worker `W_A` dies, the object `O_A` will be deleted. Calling `ray.get(O_A)` on the worker `W_B` will fail with an `ObjectLostError`. ![image_1_the_object_borrower_failure](https://user-images.githubusercontent.com/11995469/164454286-6d7658c7-952b-4351-bd06-7404f49db919.png) -#### Problem Three: The Primary Copy Lost +#### Problem 3: Loss of All Copies -Primary copy loss due to node failure, will cause the object which does not have a copy on another node’s plasma store not to be available. +If all copies of an object are lost due to node failures, trying to access the object on any other node will fail with an `ObjectLostError` because there's no way to recover the data of the object. If there's only one copy of the object, the failure of the node on which the only copy is stored will result in inaccessibility of the object in the cluster. -For example: If the primary copy of the object `O_A` in node `N_A` and `O_A` does not have any copy on another node’s plasma store, the object will be not available when the node `N_A` is dead. +For example, if the only copy of the object `O_A` is on node `N_A`, the object will be unavailable if the node `N_A` dies. ### Proposed Design -#### How to solve owner failure? +#### How to solve the object owner failure problem? + +##### Option 1: syncing reference table of owners to a highly available external storage. -##### Option one: Highly Available Storage to Store Object Metadata +We keep the in-memory reference table of owners in sync with a high-availability external storage. When an owner restarts from failure, it can restore the reference table from the external storage and recreate the RPC connections with borrowers and Raylets. -Use high-availability storage to store original data to help owners restore.Store A in external storage to ensure data is not lost. When the owner restarts, the data in the `reference table` can be restored, and rebuild the RPC link. +**TODO: Mention that option 1 is still based on global owners, and reorganize if needed.** **Pros**: @@ -76,108 +92,139 @@ Use high-availability storage to store original data to help owners restore.Stor **Cons**: -- Increased requirements for deployment environments. -- The modification of the `reference table` is a high frequency operation and may have performance issues. -- Potentially some trickier consistency issues when the owner fails. +- Deployment is more complicated. +- The modification of the reference table is a high-frequency operation. Syncing with an external storage may hurt performance. +- Potentially some tricky consistency issues when an owner fails. + +##### Option 2: dedicated global owners + +A group of special Ray actors with `max_restarts=-1` will be created to own all objects of a job created by `ray.put`. When one of the special actors fails, the restarted instance will rebuild the reference table based on information provided by Raylets and other non-special workers. + +**TODO: we should also support other types of objects, not just objects created by `ray.put`. e.g. task returns.** -##### Option two: Global Owner +These actors will be distributed onto different nodes with best effort. -Add a job level setting (default is disable) to enable this feature. Use ray-actor to be the global owner to own all objects created by interface `ray.put`. The global owner is a dummy actor with `max_restarts=-1`, and try to disperse on each node. +A job-level configuration will be added (disabled by default) to enable this feature. -There were two options active or passive to rebuild the `reference table` and the RPC link when `G_1` retry. +###### Detailed Design -**Active**: +There are two options, active way and passive way, to rebuild the reference table and the RPC connections when the global owner `G_1` restarts. -The global owner will actively collect information about the alive objects it owns to rebuild the `reference table`. -1. `G_1` begins to rebuild the `reference table`, setting status to be `REBUILDING`. -2. Send RPC to all Raylets, ask every raylet to traverse all workers on it, and reply with the information of all objects owned by `G_1`. Use those information to rebuild the `reference table` and the RPC link. -3. `G_1` sets the state to `READY`. -4. Some RPC's (WaitForObjectFree) reply-callback will not be called before the state of `G_1` not `READY`. +**The Active Way**: -**Passive**: +The global owner actively collect information about the alive objects it owns to rebuild the reference table. -As the illustration shows, every raylet will maintain a RPC to every global owner, to watch those global owners. When the global owner `G_1` died and the job was not finished, +1. `G_1` begins to rebuild the reference table, and sets the rebuilding status to `REBUILDING`. +2. `G_1` sends RPCs to all Raylets and ask every Raylet to traverse all local workers and reply with the information of all objects owned by `G_1`. `G_1` then uses the information to rebuild the reference table and re-establish RPC connections. +3. `G_1` sets the rebuilding state to `READY`. +4. The reply callback of some RPCs (WaitForObjectFree) will not be called until the rebuilding state of `G_1` is set to `READY`. -1. Raylet will find `G_1` dead through RPC disconnection, then traverse all the workers on current raylet, collect the information (reference, primary copy address) of all objects whose owner is `G_1`, and send it to `G_1` when rebuilding the RPC link. -2. `G_1` will reconnect to the raylet and the workers on it after receiving the RPC from this raylet. -3. `G_1` sets the state to `READY` after rebuilding with all raylets. -4. Some RPC's (WaitForObjectFree) reply-callback will not be called before the state of `G_1` not `READY`. +**The Passive Way**: + +As the following illustration shows, every Raylet maintains an RPC connection to every global owner to watch the health of the global owners. When the global owner `G_1` dies and the job is not finished yet, + +1. Raylet will find out that `G_1` is dead through RPC disconnection. +2. When Raylet knows that `G_1` is restarted, Raylet sends the information below to `G_1`: + - References of all objects which are owned by `G_1`. (Collected by traversing all local workers.) + - Objects which are owned by `G_1` and are local (the data is in the local Object Store). + - The spill URLs of locally spilled objects which are owned by `G_1`. +3. `G_1` will reconnect to the Raylet and the workers on it after receiving the above collected information from this Raylet. +4. `G_1` sets the state to `READY` after finished rebuilding reference table. +5. The reply callback of some RPCs (`WaitForObjectFree`) will not be called until the rebuilding state of `G_1` is set to `READY`. + +**TODO: Which one is the accepted design? Active or passive?** **API**: + ```python -# Set the number of global owners, the default number is zero. The object store behaves the same as before when the number of global owners is zero. +# Set the number of global owners, the default value is 0. The object store behaves the same as before when the number of global owners is 0. ray.init(global_owner_number=16) ``` + +**TODO: The flag should be put into job config.** + ![image_2_global_owner](https://user-images.githubusercontent.com/11995469/164455556-b2f4101b-23f4-46db-808a-4407d48526a6.png) **Pros**: -- No need to rely on external storage. +- No need to rely on an external storage. - Performance will not be affected. **Cons**: -- Active: The rebuilding process may take a long time. -- **Passive**: When waiting for the RPC which is sent by raylets, the global owner needs to handle timeouts, which can be difficult. -- Relatively complex. +- **Active**: The rebuilding process may take a long time. +- **Passive**: When a global owner is waiting for collected information from Raylets, the global owner needs to handle timeouts, which can be difficult. +- Relatively complicated. -#### How to solve borrower failure? +#### How to solve object borrower failure? ##### Star Topology -Use Star Topology instead of Tree Topology, when the number of global owners is greater than zero, make the owner directly borrow the object to other workers. +We use star topology instead of tree topology when the number of global owners is greater than zero to make sure the owner directly lends an object to other workers. -As the illustration shows, the worker `W_A` owns the object `O_A`, and the worker `W_B` already borrows `O_A` from `W_A`. Here is the process by which `W_B` borrows `O_A` to `W_C` +As the following illustration shows, the worker `W_A` owns the object `O_A`, and the worker `W_B` have already borrowed `O_A` from `W_A`. Here is the process how `W_B` lends `O_A` to `W_C` -1. The worker `W_B` sends the object `O_A` to the worker `W_C`, and it is stored in the `W_C` reference table. -2. The worker `W_B` increments the reference to object `O_A` by one. Avoid the object being freed before finish borrowed. -3. The worker `W_C` sends an async RPC to the worker `W_A`, making `W_A` add `W_C` in the borrowers list. -4. The worker `W_A` sends an async RPC(WaitForRefRemoved) to the worker `W_C`. +1. The worker `W_B` sends the object `O_A` to the worker `W_C`, and the object is stored in the reference table of `W_C`. +2. The worker `W_B` increments the reference count of object `O_A` by one to avoid the object being freed before finishing the lending process. +3. The worker `W_C` sends an async RPC to the worker `W_A`, so `W_A` adds `W_C` into `O_A`'s borrower list in `W_A`. +4. The worker `W_A` sends an async RPC (`WaitForRefRemoved`) to the worker `W_C`. 5. The worker `W_C` sends an async RPC to `W_A`. -6. The worker `W_B` reduces the reference to object `O_A` by one. When the RPC which sends in step 1 replies. +6. The worker `W_B` decrements the reference count of object `O_A` by one on receiving the reply of the RPC in step 1. ![image_3_star_topology](https://user-images.githubusercontent.com/11995469/164456154-2163f505-d835-4d23-9901-fac6d867d368.png) -#### How to solve primary copy loss? +#### How to solve loss of all copies? -##### The Backup of Primary Copy +##### Multiple Primary Copies -After the primary copy is created, create a backup of it on another raylet. When the owner of object `O_A` finds its primary copy is unavailable, `G_1` will rebuild primary copy by fellow steps: -1. When RPC disconnects, it will turn backup to primary copy, and create a new backup. -2. Send a RPC request to the raylet which has `O_A` backup, turn it to primary copy. -3. `G_1` sends an async RPC request to another raylet to create a new backup. -4. The raylet which has the new backup will keep a RPC connected to watch `G_1`. +We make sure there are multiple (configurable) primary copies of the same object. + +**TODO: Rewrite this section. Primary copy and backup copy -> multiple primary copies.** + +**TODO: consider creating another copy once the object is sealed.** + +**TODO: How to configure the count of primary copies? Maybe `ray.put(obj, _primary_copies=2)` and `ray.remote(_primary_copies=2)`?** + +After the primary copy of an object is created, we create a backup of the object on another node. When the owner of object `O_A` finds out that the object's primary copy is unavailable, `G_1` will rebuild the primary copy with the following steps: + +1. When the RPC connection disconnects, `G_1` will turn a backup copy into a primary copy and create a new backup copy. +2. `G_1` sends an RPC request to the Raylet which has a backup copy of `O_A` and turn it into a primary copy. +3. `G_1` sends an async RPC request to another Raylet to create a new backup. +4. The Raylet which has the new backup copy will keep an RPC connection with `G_1` to watch the health of `G_1`. ![image_4_rebuild_primary_copy](https://user-images.githubusercontent.com/11995469/164456515-0f9e7d15-51be-4bb4-8852-ca5017a0411e.png) -When `G_1` finds the backup of object `O_A` is unavailable, `G_1` will send a RPC request to the Node `N_B`, and make it create a new Backup. As the illustration shows: -1. When RPC disconnects, `G_1` finds the backup of `O_A` is unavailable. -2. `G_1` sends a RPC request to make `N_B` create a new backup. -3. `G_1` sends a RPC request to `N_C`. -4. `N_C` creates the backup of `O_A`. -5. `N_C` keeps a RPC connected to watch `G_1`. +When `G_1` finds out that the backup copy of object `O_A` is unavailable, `G_1` will send an RPC request to the node `N_B` and ask it to create a new backup copy. As the illustration shows: + +1. When the RPC connection disconnects, `G_1` finds out that the backup copy of `O_A` is unavailable. +2. `G_1` sends an RPC request to ask `N_B` to create a new backup copy. +3. `G_1` sends an RPC request to `N_C`. +4. `N_C` creates a backup copy of `O_A`. +5. `N_C` keeps an RPC connection with `G_1` to watch the health of `G_1`. ![image_4_rebuild_backup](https://user-images.githubusercontent.com/11995469/164456742-c585aea8-4df8-47a4-b632-e7b61c756536.png) ## Compatibility, Deprecation, and Migration Plan -Fully forward compatible, the behavior of the object store will same as usual when the high-available mode is disabled. +All these features in this REP are optional. The default behavior is the exactly the same as before. Users need to explicitly configure new options to enable these features. ## Test Plan and Acceptance Criteria -We plan to use a ray job to test the object store HA. About this job: +We plan to use a Ray job to test the HA feature of the Object Store. -1. Multiple node, and each node has two type actors: producer and consumer. - - **producer**: Produce data and cache `ObjectRef`. Add or delete objects according to a certain strategy, for testing object gc. - - **consumer**: Get the `ActorHandle` for producer via actor name, and borrower object randomly through `ray.get`. -2. Adjust data scale according to parameters, include: - - Data size, proportion of plasma store. +1. In a multi-node cluster, each node runs two types of actors: producer and consumer. + - Each **producer** actor produces data and stores object refs at local. Adds or deletes objects according to a certain strategy for testing object GC. + - Each **consumer** actor gets an actor handle of a producer actor via the actor name and borrow objects from the producer actor randomly through `ray.get`. +2. Adjust data scale according to parameters: + - The size of an object. - The number of objects. + - The capacity of the Object Store. Acceptance criteria: -1. Performance reduction is acceptable when no process or node failure. -2. Any one raylet process of any workers processes failure, and the job will finish in the end. +1. Performance degradation is acceptable when no process or node failures happen. +2. When a single worker process or Raylet fails, the test job can finish eventually. ## (Optional) Follow-on Work +None. From e24c3cebc8062b006e0039fe8c4b5e8ec9dff4da Mon Sep 17 00:00:00 2001 From: Kai Yang Date: Mon, 25 Apr 2022 20:40:05 +0800 Subject: [PATCH 04/11] minor update --- reps/2022-04-21-object_store_HA.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reps/2022-04-21-object_store_HA.md b/reps/2022-04-21-object_store_HA.md index 7f3502a..02c7cfe 100644 --- a/reps/2022-04-21-object_store_HA.md +++ b/reps/2022-04-21-object_store_HA.md @@ -156,7 +156,7 @@ ray.init(global_owner_number=16) - **Passive**: When a global owner is waiting for collected information from Raylets, the global owner needs to handle timeouts, which can be difficult. - Relatively complicated. -#### How to solve object borrower failure? +#### How to solve the object borrower failure problem? ##### Star Topology @@ -173,7 +173,7 @@ As the following illustration shows, the worker `W_A` owns the object `O_A`, and ![image_3_star_topology](https://user-images.githubusercontent.com/11995469/164456154-2163f505-d835-4d23-9901-fac6d867d368.png) -#### How to solve loss of all copies? +#### How to solve the loss of all copies problem? ##### Multiple Primary Copies From fb8b286ca6633991c805d46c96710c08c67075bd Mon Sep 17 00:00:00 2001 From: hejialing Date: Thu, 28 Apr 2022 02:09:50 +0800 Subject: [PATCH 05/11] save --- reps/2022-04-21-object_store_HA.md | 185 ++++++++++++----------------- 1 file changed, 76 insertions(+), 109 deletions(-) diff --git a/reps/2022-04-21-object_store_HA.md b/reps/2022-04-21-object_store_HA.md index 02c7cfe..8302069 100644 --- a/reps/2022-04-21-object_store_HA.md +++ b/reps/2022-04-21-object_store_HA.md @@ -2,35 +2,17 @@ ### General Motivation -Ray is a general-purpose and powerful computing framework. With the Object Store, it can be easily extended into a data service to provide data for various machine learning tasks. Here are two typical scenarios. +Ray is a general-purpose and powerful computing framework. With the Object Store, it can be easily extended into a data service to provide data for various distributed tasks. Ray uses decentralized object ownership to avoid centralized bottlenecks in managing an object’s metadata (mainly its reference count and object directory entries), but it difficult to handle failover. -#### Scenarios 1: Data Provider for Deep Learning Offline Training +For now, users can only rely on lineage to recover the unavailable object. But lineage has many restriction: +- Can not recover the object which put in object store via `ray.put`. +- Can not recover the object returned by actor task. +- Require task is idempotent. -We can implement a data provider on top of Ray with the following functions: +#### Goal -1. Reading data from various data sources and storing them in the Object Store, which can be read by workers. -2. Simple data processing capability, such as data augmentation, data distillation, .etc. - -#### Scenarios 2: Mars on Ray - -[Mars](https://github.com/mars-project/mars) is a tensor-based, unified framework for large-scale data computation which scales numpy, pandas, scikit-learn and many other libraries. - -Mars on Ray uses Ray actors as execution backend. Mars leverages the Object Store for data exchanges between Mars workers. - -#### Why do we need high availability for the Object Store? - -There are two requirements from the above scenarios: - -1. **Stability**: As a long-running service, workers need to be automatically restored after failures. Currently, a worker failure may result in object loss due to the worker is the owner of these objects. -2. **Auto-Scaling**: The cluster needs to be scaled down when the workload is reduced. Currently, dropping a node may result in object loss if the only copy of an object is on this node. - -**TODO: Node failure is not mentioned in stability.** - -**TODO: Why these issues exist in these scenarios but not the others (e.g. Ray dataset)? Need explanation.** - -### Goal - -All objects should still be accessible if we encounter a single-node failure. +1. Objects can be specified for high availability mode, other objects are the same as before. +2. Any high availability objects should still be accessible if we encounter a single-node failure. ### Should this change be within `ray` or outside? @@ -40,11 +22,11 @@ Changes are within Ray core. ### Required Reviewers -@stephanie-wang, @ericl, @scv119 +@stephanie-wang, @ericl, @scv119, @kfstorm, @raulchen ### Shepherd of the Proposal (should be a senior committer) -@ericl +@ericl, @raulchen, @stephanie-wang ## Design and Architecture @@ -56,39 +38,34 @@ Changes are within Ray core. The owner of an object stores the metadata of the object, such as reference count and locations of the object. If the owner dies, other workers which hold the object ref cannot access the data of the object anymore because all copies of the object will be deleted from the Object Store. -For example, if the owner of the object `O_A` is on node `N_A`, the object `O_A` will be out-of-scope when node A dies. All copies of the object will be deleted, and any worker which wants to access `O_A` will fail with an `ObjectLostError`. - -**TODO: Naming refine. `O_A` -> `Object A`, `W_A` -> `Worker A`, `N_A` -> `Node A`, `G_1` -> `Global Owner 1`.** - #### Problem 2: Object Borrower Failure In the chained object borrowing case, the owner of an object is not aware of the indirect borrowers. If all direct borrower fails, the owner will consider the object out-of-scope and GC the object. Accessing the object on indirect borrowers will fail with an `ObjectLostError`. -**TODO: Add issue link.** - -For example, the owner of the object `O_A` borrowed `O_A` to the worker `W_A` which is on node `N_A`, and the worker `W_A` borrowed `O_A` to the worker `W_B`. If the worker `W_A` dies, the object `O_A` will be deleted. Calling `ray.get(O_A)` on the worker `W_B` will fail with an `ObjectLostError`. - -![image_1_the_object_borrower_failure](https://user-images.githubusercontent.com/11995469/164454286-6d7658c7-952b-4351-bd06-7404f49db919.png) +more details: [issues 18456](https://github.com/ray-project/ray/issues/18456) #### Problem 3: Loss of All Copies -If all copies of an object are lost due to node failures, trying to access the object on any other node will fail with an `ObjectLostError` because there's no way to recover the data of the object. If there's only one copy of the object, the failure of the node on which the only copy is stored will result in inaccessibility of the object in the cluster. - -For example, if the only copy of the object `O_A` is on node `N_A`, the object will be unavailable if the node `N_A` dies. +Data of objects stored in the plasma store. For now, the plasma store is a thread of the raylet process, failure of the raylet process will lose data which store in plasma. Some objects which only one copy in that failed plasma store, will be unavailable. ### Proposed Design #### How to solve the object owner failure problem? -##### Option 1: syncing reference table of owners to a highly available external storage. +**Global Owner**: A group of special Ray actors with `max_restarts=-1` will be created to own all objects of a job created by `ray.put` and the returned object (either from normal-task or actor-task). When one of the special actors fails, the restarted instance will rebuild the reference table based on information provided by Raylets and other non-special workers. -We keep the in-memory reference table of owners in sync with a high-availability external storage. When an owner restarts from failure, it can restore the reference table from the external storage and recreate the RPC connections with borrowers and Raylets. +These actors will be distributed onto different nodes with best effort. A job-level configuration will be added (disabled by default) to enable this feature and set the number of the global owners. -**TODO: Mention that option 1 is still based on global owners, and reorganize if needed.** +Just need to consider how to restore data on B after A restarts, as well as rebuild various RPC links and recover subscriptions, such as `WaitForRefRemoved` and `WaitForObjectFree`. + + +##### Option 1: Syncing reference table of owners to a highly available external storage. + +We keep the in-memory reference table of owners in sync with a high-availability external storage. When an owner restarts from failure, it can restore the reference table from the external storage and recreate the RPC connections with borrowers and Raylets. **Pros**: -- Relatively simple. +- Less intrusive to the design of object store. **Cons**: @@ -96,54 +73,42 @@ We keep the in-memory reference table of owners in sync with a high-availability - The modification of the reference table is a high-frequency operation. Syncing with an external storage may hurt performance. - Potentially some tricky consistency issues when an owner fails. -##### Option 2: dedicated global owners - -A group of special Ray actors with `max_restarts=-1` will be created to own all objects of a job created by `ray.put`. When one of the special actors fails, the restarted instance will rebuild the reference table based on information provided by Raylets and other non-special workers. - -**TODO: we should also support other types of objects, not just objects created by `ray.put`. e.g. task returns.** - -These actors will be distributed onto different nodes with best effort. - -A job-level configuration will be added (disabled by default) to enable this feature. +##### Option 2: Rebuild failed global owner via the information in the whole cluster -###### Detailed Design - -There are two options, active way and passive way, to rebuild the reference table and the RPC connections when the global owner `G_1` restarts. +There are two options, the active way and passive way, to rebuild the reference table and the RPC connections when a global owner restarts. **The Active Way**: -The global owner actively collect information about the alive objects it owns to rebuild the reference table. +The global owner actively collect information about the alive objects it owns to rebuild the reference table. Here are the details steps of global owner `G` rebuild: -1. `G_1` begins to rebuild the reference table, and sets the rebuilding status to `REBUILDING`. -2. `G_1` sends RPCs to all Raylets and ask every Raylet to traverse all local workers and reply with the information of all objects owned by `G_1`. `G_1` then uses the information to rebuild the reference table and re-establish RPC connections. -3. `G_1` sets the rebuilding state to `READY`. -4. The reply callback of some RPCs (WaitForObjectFree) will not be called until the rebuilding state of `G_1` is set to `READY`. +1. `G` begins to rebuild the reference table, and sets the rebuilding status to `REBUILDING`. +2. `G` sends RPCs to all Raylets and ask every Raylet to traverse all local workers and reply with the information of all objects owned by `G`. `G` then uses the information to rebuild the reference table and re-establish RPC connections. +3. `G` sets the rebuilding state to `READY`. -**The Passive Way**: +**Note**: -As the following illustration shows, every Raylet maintains an RPC connection to every global owner to watch the health of the global owners. When the global owner `G_1` dies and the job is not finished yet, +- RPCs sent by other workers to `G` should be retried until the state of `G's` Actor becomes `Alive`. +- The reply callback of some RPCs will not be invoked until the rebuilding state of `G` is set to `READY`. -1. Raylet will find out that `G_1` is dead through RPC disconnection. -2. When Raylet knows that `G_1` is restarted, Raylet sends the information below to `G_1`: - - References of all objects which are owned by `G_1`. (Collected by traversing all local workers.) - - Objects which are owned by `G_1` and are local (the data is in the local Object Store). - - The spill URLs of locally spilled objects which are owned by `G_1`. -3. `G_1` will reconnect to the Raylet and the workers on it after receiving the above collected information from this Raylet. -4. `G_1` sets the state to `READY` after finished rebuilding reference table. -5. The reply callback of some RPCs (`WaitForObjectFree`) will not be called until the rebuilding state of `G_1` is set to `READY`. +**The Passive Way**: -**TODO: Which one is the accepted design? Active or passive?** +Every Raylet maintains an RPC connection to every global owner to watch the status of the global owners. When the global owner `G` dies and the job is not finished yet. Each raylet will collect and merge the reference table of all workers on this node, and report to `G`. Details: -**API**: +1. Raylet will find out that `G` is dead through RPC disconnection. +2. When Raylet knows that `G` is restarted, Raylet sends the information below to `G`: + - References of all objects which are owned by `G`. (Collected by traversing all local workers.) + - Objects which are owned by `G` and are local (the data is in the local Object Store). + - The spill URLs of locally spilled objects which are owned by `G`. +3. `G` will reconnect to the Raylet and the workers on it after receiving the above collected information from this Raylet. +4. `G` sets the state to `READY` after finished rebuilding reference table. +5. The reply callback of some RPCs (`WaitForObjectFree`) will not be called until the rebuilding state of `G` is set to `READY`. -```python -# Set the number of global owners, the default value is 0. The object store behaves the same as before when the number of global owners is 0. -ray.init(global_owner_number=16) -``` +**Note**: -**TODO: The flag should be put into job config.** +- RPCs sent by other workers to `G` should be retried until the state of `G's` Actor becomes `Alive`. +- The reply callback of some RPCs will not be invoked until the rebuilding state of `G` is set to `READY`. -![image_2_global_owner](https://user-images.githubusercontent.com/11995469/164455556-b2f4101b-23f4-46db-808a-4407d48526a6.png) +We prefer __Option.2__, and due to the difficulty of implementation, we are more trend to choose the __active__ way to restore `G` **Pros**: @@ -160,49 +125,49 @@ ray.init(global_owner_number=16) ##### Star Topology -We use star topology instead of tree topology when the number of global owners is greater than zero to make sure the owner directly lends an object to other workers. +We use star topology instead of tree topology when the number of global owners is greater than zero to make sure the owner directly lends an object to other workers. As the illustration shows: -As the following illustration shows, the worker `W_A` owns the object `O_A`, and the worker `W_B` have already borrowed `O_A` from `W_A`. Here is the process how `W_B` lends `O_A` to `W_C` +![image_1_star_topology](https://user-images.githubusercontent.com/11995469/165585288-c6fc4ba4-efd6-42b5-935b-ea5979d0e735.png) -1. The worker `W_B` sends the object `O_A` to the worker `W_C`, and the object is stored in the reference table of `W_C`. -2. The worker `W_B` increments the reference count of object `O_A` by one to avoid the object being freed before finishing the lending process. -3. The worker `W_C` sends an async RPC to the worker `W_A`, so `W_A` adds `W_C` into `O_A`'s borrower list in `W_A`. -4. The worker `W_A` sends an async RPC (`WaitForRefRemoved`) to the worker `W_C`. -5. The worker `W_C` sends an async RPC to `W_A`. -6. The worker `W_B` decrements the reference count of object `O_A` by one on receiving the reply of the RPC in step 1. - -![image_3_star_topology](https://user-images.githubusercontent.com/11995469/164456154-2163f505-d835-4d23-9901-fac6d867d368.png) +1. `Worker C` borrows `Object A` from `Worker B` as before, and adds `Worker C` to `Worker B's` borrowers list. +2. Send a sync RPC to `G`, and borrows `Object A` from `G` when `object A` deserializes in `Worker C`. +3. Send an async RPC to delete `Worker C` from borrowers on `Worker B`. #### How to solve the loss of all copies problem? ##### Multiple Primary Copies -We make sure there are multiple (configurable) primary copies of the same object. - -**TODO: Rewrite this section. Primary copy and backup copy -> multiple primary copies.** +We make sure there are multiple (configurable) primary copies of the same object. We can create these additional copies asynchronously to reduce performance penalty, and creating another copy once the object is sealed. -**TODO: consider creating another copy once the object is sealed.** +#### API: -**TODO: How to configure the count of primary copies? Maybe `ray.put(obj, _primary_copies=2)` and `ray.remote(_primary_copies=2)`?** +``` python +# Set the number of global owner (default is zero) and the number of HA object's primary copies (default is zero). +ray.init( + job_config=ray.job_config.JobConfig( + global_owner_num=16, + primary_copies_num=3, + ) +) -After the primary copy of an object is created, we create a backup of the object on another node. When the owner of object `O_A` finds out that the object's primary copy is unavailable, `G_1` will rebuild the primary copy with the following steps: +# put a HA object. the default value of `enable_ha` is False. +ray.put(value, enable_ha=True) -1. When the RPC connection disconnects, `G_1` will turn a backup copy into a primary copy and create a new backup copy. -2. `G_1` sends an RPC request to the Raylet which has a backup copy of `O_A` and turn it into a primary copy. -3. `G_1` sends an async RPC request to another Raylet to create a new backup. -4. The Raylet which has the new backup copy will keep an RPC connection with `G_1` to watch the health of `G_1`. +# normal task: returns HA object. +# the default value of `enable_ha_for_return_objects` is False. +@ray.remote(enable_ha_for_return_objects=True) +def fun(*args, **kwargs): + ... -![image_4_rebuild_primary_copy](https://user-images.githubusercontent.com/11995469/164456515-0f9e7d15-51be-4bb4-8852-ca5017a0411e.png) +# actor task: returns HA object. +# the default value of `enable_ha_for_return_objects` is False. +@ray.remote(enable_ha_for_return_objects=True) +class Actor: + def func(self, *args, **kwargs): + ... -When `G_1` finds out that the backup copy of object `O_A` is unavailable, `G_1` will send an RPC request to the node `N_B` and ask it to create a new backup copy. As the illustration shows: - -1. When the RPC connection disconnects, `G_1` finds out that the backup copy of `O_A` is unavailable. -2. `G_1` sends an RPC request to ask `N_B` to create a new backup copy. -3. `G_1` sends an RPC request to `N_C`. -4. `N_C` creates a backup copy of `O_A`. -5. `N_C` keeps an RPC connection with `G_1` to watch the health of `G_1`. +``` -![image_4_rebuild_backup](https://user-images.githubusercontent.com/11995469/164456742-c585aea8-4df8-47a4-b632-e7b61c756536.png) ## Compatibility, Deprecation, and Migration Plan @@ -227,4 +192,6 @@ Acceptance criteria: ## (Optional) Follow-on Work -None. +- **Global Owner**: Automatically adjust the number of global owner according to the number of objects in the ray cluster. +- **Star Topology**: Change the synchronous RPC sent to `G` when deserializing on `Worker C` to asynchronous. +- **Multiple Primary Copies**: Rebuild failed copies, keeping the number of copies unchanged. From 97c5c1f5a87d4544bc27e929c1cf4d2b7a052bab Mon Sep 17 00:00:00 2001 From: hejialing Date: Thu, 28 Apr 2022 02:11:24 +0800 Subject: [PATCH 06/11] save --- reps/2022-04-21-object_store_HA.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/reps/2022-04-21-object_store_HA.md b/reps/2022-04-21-object_store_HA.md index 8302069..0bf26dd 100644 --- a/reps/2022-04-21-object_store_HA.md +++ b/reps/2022-04-21-object_store_HA.md @@ -32,8 +32,6 @@ Changes are within Ray core. ### Problem statement -**TODO: Is it better to have a separate section or separate document for the detailed design and let this REP mainly focus on motivation and high-level design?** - #### Problem 1: Object Owner Failure The owner of an object stores the metadata of the object, such as reference count and locations of the object. If the owner dies, other workers which hold the object ref cannot access the data of the object anymore because all copies of the object will be deleted from the Object Store. From 9c3bb1191fe0fb029ed5e641cdf97075875bbe5b Mon Sep 17 00:00:00 2001 From: hejialing Date: Fri, 29 Apr 2022 00:05:13 +0800 Subject: [PATCH 07/11] save --- reps/2022-04-21-object_store_HA.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/reps/2022-04-21-object_store_HA.md b/reps/2022-04-21-object_store_HA.md index 0bf26dd..3a55f10 100644 --- a/reps/2022-04-21-object_store_HA.md +++ b/reps/2022-04-21-object_store_HA.md @@ -106,7 +106,12 @@ Every Raylet maintains an RPC connection to every global owner to watch the stat - RPCs sent by other workers to `G` should be retried until the state of `G's` Actor becomes `Alive`. - The reply callback of some RPCs will not be invoked until the rebuilding state of `G` is set to `READY`. -We prefer __Option.2__, and due to the difficulty of implementation, we are more trend to choose the __active__ way to restore `G` +We prefer __Option.2__, and due to the difficulty of implementation, we are more trend to choose the __Active__ way to restore `G`, here are some cons of the __Passive__ way: + +- Raylets need to subscribe to the actor state notifications in order to know the new RPC address of a restarted actor and reconnect to it. To implement this, we need to add something similar to "actor manager" in core worker to Raylet. This is an additional coding effort. +- Raylets pushing collected information to the restarted actor v.s. the actor pulling information from Raylets is just like push-based v.s. pull-based resource reporting. This is what we've already discussed and we've concluded that pull-based resource reporting is easier to maintain and test. I believe this conclusion also applies to object reference and location reporting. + + **Pros**: @@ -143,8 +148,8 @@ We make sure there are multiple (configurable) primary copies of the same object # Set the number of global owner (default is zero) and the number of HA object's primary copies (default is zero). ray.init( job_config=ray.job_config.JobConfig( - global_owner_num=16, - primary_copies_num=3, + num_global_owners=16, + num_primary_copies=3, ) ) @@ -192,4 +197,4 @@ Acceptance criteria: - **Global Owner**: Automatically adjust the number of global owner according to the number of objects in the ray cluster. - **Star Topology**: Change the synchronous RPC sent to `G` when deserializing on `Worker C` to asynchronous. -- **Multiple Primary Copies**: Rebuild failed copies, keeping the number of copies unchanged. +- **Multiple Primary Copies**: Recreate new primary copies upon loss of any primary copy to meet the required number of primary copies. From f64b6007b3b13e2d400e678be9602e0f2bfe53c3 Mon Sep 17 00:00:00 2001 From: hejialing Date: Mon, 8 Aug 2022 19:39:44 +0800 Subject: [PATCH 08/11] refactor --- reps/2022-04-21-object_store_HA.md | 175 +++++++++++++---------------- 1 file changed, 79 insertions(+), 96 deletions(-) diff --git a/reps/2022-04-21-object_store_HA.md b/reps/2022-04-21-object_store_HA.md index 3a55f10..97a560f 100644 --- a/reps/2022-04-21-object_store_HA.md +++ b/reps/2022-04-21-object_store_HA.md @@ -48,99 +48,84 @@ Data of objects stored in the plasma store. For now, the plasma store is a threa ### Proposed Design -#### How to solve the object owner failure problem? - -**Global Owner**: A group of special Ray actors with `max_restarts=-1` will be created to own all objects of a job created by `ray.put` and the returned object (either from normal-task or actor-task). When one of the special actors fails, the restarted instance will rebuild the reference table based on information provided by Raylets and other non-special workers. - -These actors will be distributed onto different nodes with best effort. A job-level configuration will be added (disabled by default) to enable this feature and set the number of the global owners. - -Just need to consider how to restore data on B after A restarts, as well as rebuild various RPC links and recover subscriptions, such as `WaitForRefRemoved` and `WaitForObjectFree`. - - -##### Option 1: Syncing reference table of owners to a highly available external storage. - -We keep the in-memory reference table of owners in sync with a high-availability external storage. When an owner restarts from failure, it can restore the reference table from the external storage and recreate the RPC connections with borrowers and Raylets. - -**Pros**: - -- Less intrusive to the design of object store. - -**Cons**: - -- Deployment is more complicated. -- The modification of the reference table is a high-frequency operation. Syncing with an external storage may hurt performance. -- Potentially some tricky consistency issues when an owner fails. - -##### Option 2: Rebuild failed global owner via the information in the whole cluster - -There are two options, the active way and passive way, to rebuild the reference table and the RPC connections when a global owner restarts. - -**The Active Way**: - -The global owner actively collect information about the alive objects it owns to rebuild the reference table. Here are the details steps of global owner `G` rebuild: - -1. `G` begins to rebuild the reference table, and sets the rebuilding status to `REBUILDING`. -2. `G` sends RPCs to all Raylets and ask every Raylet to traverse all local workers and reply with the information of all objects owned by `G`. `G` then uses the information to rebuild the reference table and re-establish RPC connections. -3. `G` sets the rebuilding state to `READY`. - -**Note**: - -- RPCs sent by other workers to `G` should be retried until the state of `G's` Actor becomes `Alive`. -- The reply callback of some RPCs will not be invoked until the rebuilding state of `G` is set to `READY`. - -**The Passive Way**: - -Every Raylet maintains an RPC connection to every global owner to watch the status of the global owners. When the global owner `G` dies and the job is not finished yet. Each raylet will collect and merge the reference table of all workers on this node, and report to `G`. Details: - -1. Raylet will find out that `G` is dead through RPC disconnection. -2. When Raylet knows that `G` is restarted, Raylet sends the information below to `G`: - - References of all objects which are owned by `G`. (Collected by traversing all local workers.) - - Objects which are owned by `G` and are local (the data is in the local Object Store). - - The spill URLs of locally spilled objects which are owned by `G`. -3. `G` will reconnect to the Raylet and the workers on it after receiving the above collected information from this Raylet. -4. `G` sets the state to `READY` after finished rebuilding reference table. -5. The reply callback of some RPCs (`WaitForObjectFree`) will not be called until the rebuilding state of `G` is set to `READY`. - -**Note**: - -- RPCs sent by other workers to `G` should be retried until the state of `G's` Actor becomes `Alive`. -- The reply callback of some RPCs will not be invoked until the rebuilding state of `G` is set to `READY`. - -We prefer __Option.2__, and due to the difficulty of implementation, we are more trend to choose the __Active__ way to restore `G`, here are some cons of the __Passive__ way: - -- Raylets need to subscribe to the actor state notifications in order to know the new RPC address of a restarted actor and reconnect to it. To implement this, we need to add something similar to "actor manager" in core worker to Raylet. This is an additional coding effort. -- Raylets pushing collected information to the restarted actor v.s. the actor pulling information from Raylets is just like push-based v.s. pull-based resource reporting. This is what we've already discussed and we've concluded that pull-based resource reporting is easier to maintain and test. I believe this conclusion also applies to object reference and location reporting. - - - -**Pros**: - -- No need to rely on an external storage. -- Performance will not be affected. - -**Cons**: - -- **Active**: The rebuilding process may take a long time. -- **Passive**: When a global owner is waiting for collected information from Raylets, the global owner needs to handle timeouts, which can be difficult. -- Relatively complicated. - -#### How to solve the object borrower failure problem? - -##### Star Topology - -We use star topology instead of tree topology when the number of global owners is greater than zero to make sure the owner directly lends an object to other workers. As the illustration shows: - -![image_1_star_topology](https://user-images.githubusercontent.com/11995469/165585288-c6fc4ba4-efd6-42b5-935b-ea5979d0e735.png) - -1. `Worker C` borrows `Object A` from `Worker B` as before, and adds `Worker C` to `Worker B's` borrowers list. -2. Send a sync RPC to `G`, and borrows `Object A` from `G` when `object A` deserializes in `Worker C`. -3. Send an async RPC to delete `Worker C` from borrowers on `Worker B`. - -#### How to solve the loss of all copies problem? - -##### Multiple Primary Copies - -We make sure there are multiple (configurable) primary copies of the same object. We can create these additional copies asynchronously to reduce performance penalty, and creating another copy once the object is sealed. +We implement +# Options to implement object HA with checkpoint + +We implement object HA based on the checkpoint, so we can walk around **Problem 3: Loss of All Copies** + +## Option 1: Ownerless objects + +Every checkpointed object will be ownerless. When `ray.get` on a checkpointed object and the object data is not local, data will be loaded from checkpoint and stored in local object store as a secondary copy. + +In this case, We don't need metadata anymore, so we don't need to manage it anymore, and **problems 1 and 2** don't exist. + +- Pros + - The logic/protocol should be relatively straightforward. +- Cons + - No pulling/pushing object data between nodes. + - High IO pressure on external storage. + - External storage bottleneck. + - Bad loading performance. + - Existing code is purely ownership-based. Need to add many if-elses to support ownerless objects. (e.g. location pubsub. GC.) + - High dev cost. + - Maintenance burden. + - Some features aren't easy to support. e.g. locality-aware scheduling. + - Missing features. + +## Option 2: Divided owners on owner failure + +Once the owner of a checkpointed object is dead, subsequent access to the object on a worker will make the worker the new owner of the object. The metadata about this object in the reference table of the worker will be rewritten. If multiple workers hold the same object ref and want to access it after owner failure, each worker will become the new owner of the object, independently. i.e. multiple owners after owner failure. + +If both worker A and B are the new owners of an object and both pass object refs to worker C, worker C only records the owner of the object once. e.g. If worker C receives the object ref from worker A first, then worker C treats worker A as the owner of the object. + +**Problem 2** will cause the owner to release the reference of the object in advance. In the current case, it can be handled in the same way as the owner fails. + +- Pros + - The ownership protocol is still distributed. Critical updates to the owner reference table are still in-process. + - Good performance on metadata updates. +- Cons + - Owner collision. e.g. Raylet also stores and uses owner addresses and communicates with owners. How do we update the owner info in Raylet, especially if two workers on the same node claim the new owner of an object? + - High dev cost to sync new owner info to Raylet. + - High dev cost to maintain N owners of an object in Raylet. + - Primary copy collision. Imaging multiple workers on the same node call `ray.get` on an object. + - If we allow only one copy loaded into the local object store, we need to update the GC strategy to only unpin the primary copy if all owners say the object is out-of-scope. + - High dev cost. + - If we store multiple copies (i.e. one copy per owner), we need to update object manager and plasma store by changing the object key from object ID to object ID + owner address. + - High dev cost. + - High memory pressure to object store. + - High IO pressure on external storage. + - External storage bottleneck. + - Bad loading performance. + - No pulling/pushing object data between nodes. + - High IO pressure on external storage. + - External storage bottleneck. + - Bad loading performance. + - Corner case handling such as RPC failures. + - Potentially high dev cost. + +## Option 3: Global owner(s) + +We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with `max_restarts=-1`. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an `owner_is_gcs` flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner. + +Note that we don't need to restore the reference table in global owners by pulling info from the cluster because objects are already checkpointed. Checkpoint info is stored in the reference table and it will be encoded when serializing an object ref, hence checkpoint info is recoverable. If borrowers detected owner failure, they will try to reconnect to the owner and the recovered owner will recover the reference count and borrower list via these new RPC connections. + +- Pros + - No major protocol changes compared to the existing ownership assignment protocol. + - Low dev cost. + - No owner address updates because the `owner_is_gcs` flag or the actor name is encoded in it. + - Low dev cost. +- Cons + - Centralized/semi-centralized architecture. + - Potentially bottleneck. + - Bad performance. + - Corner case handling such as RPC failures. + - Potentially high dev cost. + +We prefer named actors rather than GCS as global owners. + +- The number of global owners is configurable, hence scalable. +- No need to embed (part of) core worker code into GCS. +- No increased complexity for GCS. #### API: @@ -195,6 +180,4 @@ Acceptance criteria: ## (Optional) Follow-on Work -- **Global Owner**: Automatically adjust the number of global owner according to the number of objects in the ray cluster. -- **Star Topology**: Change the synchronous RPC sent to `G` when deserializing on `Worker C` to asynchronous. -- **Multiple Primary Copies**: Recreate new primary copies upon loss of any primary copy to meet the required number of primary copies. +- **Prototype test** From 961846618c690dfcb71f26cbe713967674e07e50 Mon Sep 17 00:00:00 2001 From: hejialing Date: Mon, 8 Aug 2022 19:40:57 +0800 Subject: [PATCH 09/11] save --- reps/2022-04-21-object_store_HA.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/reps/2022-04-21-object_store_HA.md b/reps/2022-04-21-object_store_HA.md index 97a560f..9db9d74 100644 --- a/reps/2022-04-21-object_store_HA.md +++ b/reps/2022-04-21-object_store_HA.md @@ -49,11 +49,11 @@ Data of objects stored in the plasma store. For now, the plasma store is a threa ### Proposed Design We implement -# Options to implement object HA with checkpoint +#### Options to implement object HA with checkpoint We implement object HA based on the checkpoint, so we can walk around **Problem 3: Loss of All Copies** -## Option 1: Ownerless objects +##### Option 1: Ownerless objects Every checkpointed object will be ownerless. When `ray.get` on a checkpointed object and the object data is not local, data will be loaded from checkpoint and stored in local object store as a secondary copy. @@ -72,7 +72,7 @@ In this case, We don't need metadata anymore, so we don't need to manage it anym - Some features aren't easy to support. e.g. locality-aware scheduling. - Missing features. -## Option 2: Divided owners on owner failure +##### Option 2: Divided owners on owner failure Once the owner of a checkpointed object is dead, subsequent access to the object on a worker will make the worker the new owner of the object. The metadata about this object in the reference table of the worker will be rewritten. If multiple workers hold the same object ref and want to access it after owner failure, each worker will become the new owner of the object, independently. i.e. multiple owners after owner failure. @@ -103,7 +103,7 @@ If both worker A and B are the new owners of an object and both pass object refs - Corner case handling such as RPC failures. - Potentially high dev cost. -## Option 3: Global owner(s) +##### Option 3: Global owner(s) We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with `max_restarts=-1`. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an `owner_is_gcs` flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner. From 1155d7c338fb6ff2dfa73e13e31075594891dbcc Mon Sep 17 00:00:00 2001 From: hejialing Date: Mon, 8 Aug 2022 23:20:32 +0800 Subject: [PATCH 10/11] save --- reps/2022-04-21-object_store_HA.md | 55 ++---------------------------- 1 file changed, 2 insertions(+), 53 deletions(-) diff --git a/reps/2022-04-21-object_store_HA.md b/reps/2022-04-21-object_store_HA.md index 9db9d74..81fd0ad 100644 --- a/reps/2022-04-21-object_store_HA.md +++ b/reps/2022-04-21-object_store_HA.md @@ -51,59 +51,8 @@ Data of objects stored in the plasma store. For now, the plasma store is a threa We implement #### Options to implement object HA with checkpoint -We implement object HA based on the checkpoint, so we can walk around **Problem 3: Loss of All Copies** - -##### Option 1: Ownerless objects - -Every checkpointed object will be ownerless. When `ray.get` on a checkpointed object and the object data is not local, data will be loaded from checkpoint and stored in local object store as a secondary copy. - -In this case, We don't need metadata anymore, so we don't need to manage it anymore, and **problems 1 and 2** don't exist. - -- Pros - - The logic/protocol should be relatively straightforward. -- Cons - - No pulling/pushing object data between nodes. - - High IO pressure on external storage. - - External storage bottleneck. - - Bad loading performance. - - Existing code is purely ownership-based. Need to add many if-elses to support ownerless objects. (e.g. location pubsub. GC.) - - High dev cost. - - Maintenance burden. - - Some features aren't easy to support. e.g. locality-aware scheduling. - - Missing features. - -##### Option 2: Divided owners on owner failure - -Once the owner of a checkpointed object is dead, subsequent access to the object on a worker will make the worker the new owner of the object. The metadata about this object in the reference table of the worker will be rewritten. If multiple workers hold the same object ref and want to access it after owner failure, each worker will become the new owner of the object, independently. i.e. multiple owners after owner failure. - -If both worker A and B are the new owners of an object and both pass object refs to worker C, worker C only records the owner of the object once. e.g. If worker C receives the object ref from worker A first, then worker C treats worker A as the owner of the object. - -**Problem 2** will cause the owner to release the reference of the object in advance. In the current case, it can be handled in the same way as the owner fails. - -- Pros - - The ownership protocol is still distributed. Critical updates to the owner reference table are still in-process. - - Good performance on metadata updates. -- Cons - - Owner collision. e.g. Raylet also stores and uses owner addresses and communicates with owners. How do we update the owner info in Raylet, especially if two workers on the same node claim the new owner of an object? - - High dev cost to sync new owner info to Raylet. - - High dev cost to maintain N owners of an object in Raylet. - - Primary copy collision. Imaging multiple workers on the same node call `ray.get` on an object. - - If we allow only one copy loaded into the local object store, we need to update the GC strategy to only unpin the primary copy if all owners say the object is out-of-scope. - - High dev cost. - - If we store multiple copies (i.e. one copy per owner), we need to update object manager and plasma store by changing the object key from object ID to object ID + owner address. - - High dev cost. - - High memory pressure to object store. - - High IO pressure on external storage. - - External storage bottleneck. - - Bad loading performance. - - No pulling/pushing object data between nodes. - - High IO pressure on external storage. - - External storage bottleneck. - - Bad loading performance. - - Corner case handling such as RPC failures. - - Potentially high dev cost. - -##### Option 3: Global owner(s) +We implement object HA based on the checkpoint, so we can walk around **Problem 3: Loss of All Copies**, +Discussion details are here: https://github.com/ray-project/enhancements/pull/10#issuecomment-1127719640 We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with `max_restarts=-1`. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an `owner_is_gcs` flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner. From 2a9760e33b57b4b084be61aa6d23386975fcf1f2 Mon Sep 17 00:00:00 2001 From: hejialing Date: Mon, 8 Aug 2022 23:21:53 +0800 Subject: [PATCH 11/11] save --- reps/2022-04-21-object_store_HA.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reps/2022-04-21-object_store_HA.md b/reps/2022-04-21-object_store_HA.md index 81fd0ad..7864d52 100644 --- a/reps/2022-04-21-object_store_HA.md +++ b/reps/2022-04-21-object_store_HA.md @@ -52,7 +52,7 @@ We implement #### Options to implement object HA with checkpoint We implement object HA based on the checkpoint, so we can walk around **Problem 3: Loss of All Copies**, -Discussion details are here: https://github.com/ray-project/enhancements/pull/10#issuecomment-1127719640 +previously discussed options: https://github.com/ray-project/enhancements/pull/10#issuecomment-1127719640 We use highly available processes as global owners of checkpointed objects. Such highly available processes can be GCS or a group of named actors with `max_restarts=-1`. We reuse the existing ownership assignment RPCs to assign a checkpointed object to a global owner and encode the immutable info (an `owner_is_gcs` flag or the actor name) about the global owner into the owner address. The process to get an RPC client to the owner needs to be updated to be able to return a working RPC client to the up-to-date IP:port of the owner.