Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hardware stats to train_head #46719

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ describe("ActorTable", () => {
utilizationGpu: 50,
memoryUsed: 0,
memoryTotal: 20,
processes: [{ pid: 25321, gpuMemoryUsage: 0 }],
processesPids: [{ pid: 25321, gpuMemoryUsage: 0 }],
},
],
},
Expand Down Expand Up @@ -293,7 +293,7 @@ describe("ActorTable", () => {
utilizationGpu: 0,
memoryUsed: 10,
memoryTotal: 20,
processes: [{ pid: 25322, gpuMemoryUsage: 10 }],
processesPids: [{ pid: 25322, gpuMemoryUsage: 10 }],
},
],
},
Expand Down
4 changes: 2 additions & 2 deletions python/ray/dashboard/client/src/pages/node/GPUColumn.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export const WorkerGpuRow = ({
}) => {
const workerGPUEntries = (gpus ?? [])
.map((gpu, i) => {
const process = gpu.processes?.find(
const process = gpu.processesPids?.find(
(process) => process.pid === workerPID,
);
if (!process) {
Expand All @@ -81,7 +81,7 @@ export const getSumGpuUtilization = (
// aggregate of the WorkerGpuRow and follows the same logic.
const workerGPUUtilizationEntries = (gpus ?? [])
.map((gpu, i) => {
const process = gpu.processes?.find(
const process = gpu.processesPids?.find(
(process) => process.pid === workerPID,
);
if (!process) {
Expand Down
4 changes: 2 additions & 2 deletions python/ray/dashboard/client/src/pages/node/GRAMColumn.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export const WorkerGRAM = ({
}) => {
const workerGRAMEntries = (gpus ?? [])
.map((gpu, i) => {
const process = gpu.processes?.find(
const process = gpu.processesPids?.find(
(process) => workerPID && process.pid === workerPID,
);
if (!process) {
Expand Down Expand Up @@ -73,7 +73,7 @@ export const getSumGRAMUsage = (
// aggregate of WorkerGRAM and follows the same logic.
const workerGRAMEntries = (gpus ?? [])
.map((gpu, i) => {
const process = gpu.processes?.find(
const process = gpu.processesPids?.find(
(process) => workerPID && process.pid === workerPID,
);
if (!process) {
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dashboard/client/src/type/node.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export type GPUStats = {
utilizationGpu?: number;
memoryUsed: number;
memoryTotal: number;
processes?: ProcessGPUUsage[];
processesPids?: ProcessGPUUsage[];
};

export type NodeDetailExtend = {
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dashboard/datacenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def _get_actor(actor):
for gpu_stats in node_physical_stats.get("gpus", []):
# gpu_stats.get("processes") can be None, an empty list or a
# list of dictionaries.
for process in gpu_stats.get("processes") or []:
for process in gpu_stats.get("processesPids") or []:
if process["pid"] == pid:
actor_process_gpu_stats.append(gpu_stats)
break
Expand Down
107 changes: 76 additions & 31 deletions python/ray/dashboard/modules/train/train_head.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
from typing import List

from aiohttp.web import Request, Response

import ray
import ray.dashboard.optional_utils as dashboard_optional_utils
import ray.dashboard.utils as dashboard_utils
from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc
from ray.dashboard.modules.actor.actor_head import actor_table_data_to_dict
from ray.core.generated import gcs_service_pb2_grpc
from ray.dashboard.datacenter import DataOrganizer
from ray.dashboard.modules.job.common import JobInfoStorageClient
from ray.dashboard.modules.job.utils import find_jobs_by_job_ids
from ray.util.annotations import DeveloperAPI
Expand All @@ -29,10 +30,7 @@ def __init__(self, dashboard_head):
@DeveloperAPI
async def get_train_runs(self, req: Request) -> Response:
try:
from ray.train._internal.state.schema import (
TrainRunInfoWithDetails,
TrainRunsResponse,
)
from ray.train._internal.state.schema import TrainRunsResponse
except ImportError:
logger.exception(
"Train is not installed. Please run `pip install ray[train]` "
Expand All @@ -58,24 +56,22 @@ async def get_train_runs(self, req: Request) -> Response:
else:
try:
train_runs = await stats_actor.get_all_train_runs.remote()
await self._add_actor_status_and_update_run_status(train_runs)
train_runs_with_details = (
await self._add_actor_status_and_update_run_status(train_runs)
)
# Sort train runs in reverse chronological order
train_runs = sorted(
train_runs.values(),
train_runs_with_details = sorted(
train_runs_with_details,
key=lambda run: run.start_time_ms,
reverse=True,
)
job_details = await find_jobs_by_job_ids(
self._dashboard_head.gcs_aio_client,
self._job_info_client,
[run.job_id for run in train_runs],
[run.job_id for run in train_runs_with_details],
)
train_runs_with_details = [
TrainRunInfoWithDetails(
**run.dict(), job_details=job_details.get(run.job_id)
)
for run in train_runs
]
for run in train_runs_with_details:
run.job_details = job_details.get(run.job_id)
details = TrainRunsResponse(train_runs=train_runs_with_details)
except ray.exceptions.RayTaskError as e:
# Task failure sometimes are due to GCS
Expand All @@ -95,43 +91,92 @@ async def get_train_runs(self, req: Request) -> Response:
)

async def _add_actor_status_and_update_run_status(self, train_runs):
from ray.train._internal.state.schema import ActorStatusEnum, RunStatusEnum
from ray.train._internal.state.schema import (
ActorStatusEnum,
RunStatusEnum,
TrainRunInfoWithDetails,
TrainWorkerInfoWithDetails,
)

actor_status_table = {}
try:
logger.info("Getting all actor info from GCS.")
request = gcs_service_pb2.GetAllActorInfoRequest()
reply = await self._gcs_actor_info_stub.GetAllActorInfo(request, timeout=5)
if reply.status.code == 0:
for message in reply.actor_table_data:
actor_table_data = actor_table_data_to_dict(message)
actor_status_table[actor_table_data["actorId"]] = actor_table_data[
"state"
]
actors = await DataOrganizer.get_all_actors()

except Exception:
logger.exception("Error Getting all actor info from GCS.")

train_runs_with_details: List[TrainRunInfoWithDetails] = []

for train_run in train_runs.values():
woshiyyya marked this conversation as resolved.
Show resolved Hide resolved
worker_infos_with_details: List[TrainWorkerInfoWithDetails] = []

for worker_info in train_run.workers:
worker_info.status = actor_status_table.get(worker_info.actor_id, None)
actor = actors.get(worker_info.actor_id, None)
# Add hardware metrics to API response
if actor:
gpus = [
gpu
for gpu in actor["gpus"]
if worker_info.pid
in [process["pid"] for process in gpu["processesPids"]]
]
# Need to convert processesPids into a proper list.
# It's some weird ImmutableList structureo
# We also convert the list of processes into a single item since
# an actor is only a single process and cannot match multiple
# processes.
formatted_gpus = [
{
**gpu,
"processInfo": [
process
for process in gpu["processesPids"]
if process["pid"] == worker_info.pid
][0],
}
for gpu in gpus
]

worker_info_with_details = TrainWorkerInfoWithDetails.parse_obj(
{
**worker_info.dict(),
"status": actor["state"],
"processStats": actor["processStats"],
"gpus": formatted_gpus,
}
)
else:
worker_info_with_details = TrainWorkerInfoWithDetails.parse_obj(
worker_info.dict()
)

worker_infos_with_details.append(worker_info_with_details)

train_run_with_details = TrainRunInfoWithDetails.parse_obj(
{**train_run.dict(), "workers": worker_infos_with_details}
)

# The train run can be unexpectedly terminated before the final run
# status was updated. This could be due to errors outside of the training
# function (e.g., system failure or user interruption) that crashed the
# train controller.
# We need to detect this case and mark the train run as ABORTED.
controller_actor_status = actor_status_table.get(
controller_actor_status = actors.get(
train_run.controller_actor_id, None
)
).get("state")
if (
controller_actor_status == ActorStatusEnum.DEAD
and train_run.run_status == RunStatusEnum.STARTED
):
train_run.run_status = RunStatusEnum.ABORTED
train_run.status_detail = (
train_run_with_details.run_status = RunStatusEnum.ABORTED
train_run_with_details.status_detail = (
"Unexpectedly terminated due to system errors."
)

train_runs_with_details.append(train_run_with_details)

return train_runs_with_details

@staticmethod
def is_minimal_module():
return False
Expand Down
52 changes: 52 additions & 0 deletions python/ray/train/_internal/state/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,55 @@ class TrainWorkerInfo(BaseModel):
)


@DeveloperAPI
class MemoryInfo(BaseModel):
woshiyyya marked this conversation as resolved.
Show resolved Hide resolved
rss: int
vms: int
pfaults: Optional[int]
pageins: Optional[int]


@DeveloperAPI
class ProcessStats(BaseModel):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this schema based on an existing one? See questions below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is the existing gpus schema for nodes and actors. I agree it's ugly...

I think with export API, we have a chance to really clean this up for the future but let's not bundle it in with the train dashboard changes.

cpuPercent: float
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be 0 to 1 or 0 to 100?

# total memory, free memory, memory used ratio
mem: Optional[List[int]]
memoryInfo: MemoryInfo
Comment on lines +61 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is mem a list and not individual fields? Why are these not part of memoryInfo?



class ProcessGPUUsage(BaseModel):
# This gpu usage stats from a process
pid: int
gpuMemoryUsage: int
Comment on lines +66 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this give the usage specific to the pid? In that case should GPUStats actually take a list of processInfos?



@DeveloperAPI
class GPUStats(BaseModel):
uuid: str
index: int
name: str
utilizationGpu: Optional[float]
memoryUsed: float
memoryTotal: float
Comment on lines +77 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be in ProcessGPUUSage?

processInfo: ProcessGPUUsage


@DeveloperAPI
class TrainWorkerInfoWithDetails(TrainWorkerInfo):
"""Metadata of a Ray Train worker."""

processStats: Optional[ProcessStats] = Field(
None, description="Process stats of the worker."
)
gpus: List[GPUStats] = Field(
default_factory=list,
description=(
"GPU stats of the worker. "
"Only returns GPUs that are attached to the worker process."
),
)


@DeveloperAPI
class TrainDatasetInfo(BaseModel):
name: str = Field(
Expand Down Expand Up @@ -91,6 +140,9 @@ class TrainRunInfo(BaseModel):
class TrainRunInfoWithDetails(TrainRunInfo):
"""Metadata for a Ray Train run and information about its workers."""

workers: List[TrainWorkerInfoWithDetails] = Field(
woshiyyya marked this conversation as resolved.
Show resolved Hide resolved
description="A List of Train workers sorted by global ranks."
)
job_details: Optional[JobDetails] = Field(
None, description="Details of the job that started this Train run."
)
Expand Down