colors.drawio
Foundation model (FM) training and inference has led to a significant increase in computational needs across the industry. These models require massive amounts of accelerated compute to train and operate effectively, pushing the boundaries of traditional computing infrastructure. They require efficient systems for distributing workloads across multiple GPU accelerated servers, and optimizing developer velocity as well as performance.
Ray is an open source framework that makes it straightforward to create, deploy, and optimize distributed Python jobs. At its core, Ray offers a unified programming model that allows developers to seamlessly scale their applications from a single machine to a distributed cluster. It provides a set of high-level APIs for tasks, actors, and data that abstract away the complexities of distributed computing, enabling developers to focus on the core logic of their applications. Ray promotes the same coding patterns for both a simple machine learning (ML) experiment and a scalable, resilient production application. Ray’s key features include efficient task scheduling, fault tolerance, and automatic resource management, making it a powerful tool for building a wide range of distributed applications, from ML models to real-time data processing pipelines. With its growing ecosystem of libraries and tools, Ray has become a popular choice for organizations looking to use the power of distributed computing to tackle complex and data-intensive problems.
Amazon SageMaker HyperPod is a purpose-built infrastructure to develop and deploy large-scale FMs. SageMaker HyperPod not only provides the flexibility to create and use your own software stack, but also provides optimal performance through same spine placement of instances, as well as built-in resiliency. Combining the resiliency of SageMaker HyperPod and the efficiency of Ray provides a powerful framework to scale up your generative AI workloads.
In this post, we demonstrate the steps involved in running Ray jobs on SageMaker HyperPod.
This section provides a high-level overview of the Ray tools and frameworks for AI/ML workloads. We primarily focus on ML training use cases.
Ray is an open-source distributed computing framework designed to run highly scalable and parallel Python applications. Ray manages, executes, and optimizes compute needs across AI workloads. It unifies infrastructure through a single, flexible framework—enabling AI workloads from data processing, to model training, to model serving and beyond.
For distributed jobs, Ray provides intuitive tools for parallelizing and scaling ML workflows. It allows developers to focus on their training logic without the complexities of resource allocation, task scheduling, and inter-node communication.
At a high level, Ray is made up of three layers:
In this post, we dive deep into running Ray clusters on SageMaker HyperPod. A Ray cluster consists of a single head node and a number of connected worker nodes. The head node orchestrates task scheduling, resource allocation, and communication between nodes. The ray worker nodes execute the distributed workloads using Ray tasks and actors, such as model training or data preprocessing.
Ray clusters and Kubernetes clusters pair well together. By running a Ray cluster on Kubernetes using the KubeRay operator, both Ray users and Kubernetes administrators benefit from the smooth path from development to production. For this use case, we use a SageMaker HyperPod cluster orchestrated through Amazon Elastic Kubernetes Service (Amazon EKS).
The KubeRay operator enables you to run a Ray cluster on a Kubernetes cluster. KubeRay creates the following custom resource definitions (CRDs):
For the remainder of this post, we don’t focus on RayJob or RayService; we focus on creating a persistent Ray cluster to run distributed ML training jobs.
When Ray clusters are paired with SageMaker HyperPod clusters, Ray clusters unlock enhanced resiliency and auto-resume capabilities, which we will dive deeper into later in this post. This combination provides a solution for handling dynamic workloads, maintaining high availability, and providing seamless recovery from node failures, which is crucial for long-running jobs.
In this section, we introduce SageMaker HyperPod and its built-in resiliency features to provide infrastructure stability.
Generative AI workloads such as training, inference, and fine-tuning involve building, maintaining, and optimizing large clusters of thousands of GPU accelerated instances. For distributed training, the goal is to efficiently parallelize workloads across these instances in order to maximize cluster utilization and minimize time to train. For large-scale inference, it’s important to minimize latency, maximize throughput, and seamlessly scale across those instances for the best user experience. SageMaker HyperPod is a purpose-built infrastructure to address these needs. It removes the undifferentiated heavy lifting involved in building, maintaining, and optimizing a large GPU accelerated cluster. It also provides flexibility to fully customize your training or inference environment and compose your own software stack. You can use either Slurm or Amazon EKS for orchestration with SageMaker HyperPod.
Due to their massive size and the need to train on large amounts of data, FMs are often trained and deployed on large compute clusters composed of thousands of AI accelerators such as GPUs and AWS Trainium. A single failure in one of these thousand accelerators can interrupt the entire training process, requiring manual intervention to identify, isolate, debug, repair, and recover the faulty node in the cluster. This workflow can take several hours for each failure and as the scale of the cluster grows, it’s common to see a failure every few days or even every few hours. SageMaker HyperPod provides resiliency against infrastructure failures by applying agents that continuously run health checks on cluster instances, fix the bad instances, reload the last valid checkpoint, and resume the training—without user intervention. As a result, you can train your models up to 40% faster. You can also SSH into an instance in the cluster for debugging and gather insights on hardware-level optimization during multi-node training. Orchestrators like Slurm or Amazon EKS facilitate efficient allocation and management of resources, provide optimal job scheduling, monitor resource utilization, and automate fault tolerance.
This section provides an overview of how to run Ray jobs for multi-node distributed training on SageMaker HyperPod. We go over the architecture and the process of creating a SageMaker HyperPod cluster, installing the KubeRay operator, and deploying a Ray training job.
Although this post provides a step-by-step guide to manually create the cluster, feel free to check out the aws-do-ray project, which aims to simplify the deployment and scaling of distributed Python application using Ray on Amazon EKS or SageMaker HyperPod. It uses Docker to containerize the tools necessary to deploy and manage Ray clusters, jobs, and services. In addition to the aws-do-ray project, we’d like to highlight the Amazon SageMaker Hyperpod EKS workshop, which offers an end-to-end experience for running various workloads on SageMaker Hyperpod clusters. There are multiple examples of training and inference workloads from the GitHub repository awsome-distributed-training.
As introduced earlier in this post, KubeRay simplifies the deployment and management of Ray applications on Kubernetes. The following diagram illustrates the solution architecture.
Before deploying Ray on SageMaker HyperPod, you need a HyperPod cluster:
Create an FSx for Lustre Shared File System
.If you prefer to deploy HyperPod on an existing EKS cluster, please follow the instructions here which include:
The following provide an example workflow for creating a HyperPod cluster on an existing EKS Cluster after deploying prerequisites. This is for reference only and not required for the quick deploy option.
The provided configuration file contains two key highlights:
You can create a SageMaker HyperPod compute with the following AWS Command Line Interface (AWS CLI) command (AWS CLI version 2.17.47 or newer is required):
To verify the cluster status, you can use the following command:
This command displays the cluster details, including the cluster name, status, and creation time:
Alternatively, you can verify the cluster status on the SageMaker console. After a brief period, you can observe that the status for the nodes transitions to Running
.
For us to deploy the Ray cluster, we need the SageMaker HyperPod cluster to be up and running, and additionally we need a shared storage volume (for example, an Amazon FSx for Lustre file system). This is a shared file system that the SageMaker HyperPod nodes can access. This file system can be provisioned statically before launching your SageMaker HyperPod cluster or dynamically afterwards.
Specifying a shared storage location (such as cloud storage or NFS) is optional for single-node clusters, but it is required for multi-node clusters. Using a local path will raise an error during checkpointing for multi-node clusters.
The Amazon FSx for Lustre CSI driver uses IAM roles for service accounts (IRSA) to authenticate AWS API calls. To use IRSA, an IAM OpenID Connect (OIDC) provider needs to be associated with the OIDC issuer URL that comes provisioned your EKS cluster.
Create an IAM OIDC identity provider for your cluster with the following command:
Deploy the FSx for Lustre CSI driver:
This Helm chart includes a service account named fsx-csi-controller-sa
that gets deployed in the kube-system
namespace.
Use the eksctl CLI to create an AWS Identity and Access Management (IAM) role bound to the service account used by the driver, attaching the AmazonFSxFullAccess
AWS managed policy:
The --override-existing-serviceaccounts
flag lets eksctl know that the fsx-csi-controller-sa
service account already exists on the EKS cluster, so it skips creating a new one and updates the metadata of the current service account instead.
Annotate the driver’s service account with the Amazon Resource Name (ARN) of the AmazonEKSFSxLustreCSIDriverFullAccess
IAM role that was created:
This annotation lets the driver know what IAM role it should use to interact with the FSx for Lustre service on your behalf.
Verify that the service account has been properly annotated:
Restart the fsx-csi-controller
deployment for the changes to take effect:
The FSx for Lustre CSI driver presents you with two options for provisioning a file system:
For this example, we use dynamic provisioning. Start by creating a storage class that uses the fsx.csi.aws.com
provisioner:
SUBNET_ID
: The subnet ID that the FSx for Lustre filesystem. Should be the same private subnet that was used for HyperPod creation.SECURITYGROUP_ID
: The security group IDs that will be attached to the file system. Should be the same Security Group ID that is used in HyperPod and EKS.Next, create a PVC that uses the fsx-claim
storage claim:
This PVC will start the dynamic provisioning of an FSx for Lustre file system based on the specifications provided in the storage class.
Now that we have both the SageMaker HyperPod cluster and the FSx for Lustre file system created, we can set up the Ray cluster:
We recommend using KubeRay operator version 1.2.0 or higher, which supports automatic Ray Pod eviction and replacement in case of failures (for example, hardware issues on EKS or SageMaker HyperPod nodes).
rayproject/ray-ml
` images starting from Ray version 2.31.0, it’s necessary to create a custom container image for our Ray cluster. Therefore, we will build on top of the `rayproject/ray:2.42.1-py310-gpu
` image, which has all necessary Ray dependencies, and include our training dependencies to build our own custom image. Please feel free to modify this Dockerfile as you wish.First, create a Dockerfile that builds upon the base Ray GPU image and includes only the necessary dependencies:
Then, build and push the image to your container registry (Amazon ECR) using the provided script:
Now, our Ray container image is in Amazon ECR with all necessary Ray dependencies, as well as code library dependencies.
Note that there are two distinct sections in the cluster manifest. While the `headGroupSpec
` defines the head node of the Ray Cluster, the `workerGroupSpecs
` define the worker nodes of the Ray Cluster. While a job could technically run on the Head node as well, it is common to separate the head node from the actual worker nodes where jobs are executed. Therefore, the instance for the head node can typically be a smaller instance (i.e. we chose a m5.2xlarge). Since the head node also manages cluster-level metadata, it can be beneficial to have it run on a non-GPU node to minimize the risk of node failure (as GPU can be a potential source of node failure).
Now, you can visit http://localhost:8265/
to visit the Ray Dashboard.
For this example, we use the first method and submit the job through the SDK. Therefore, we simply run from a local environment where the training code is available in --working-dir
. Relative to this path, we specify the main training Python script located at --train.py
Within the working-dir
folder, we can also include additional scripts we might need to run the training.
The fsdp-ray.py
example is located in aws-do-ray/Container-Root/ray/raycluster/jobs/fsdp-ray/fsdp-ray.py
in the aws-do-ray GitHub repo.
For our Python training script to run, we need to make sure our training scripts are correctly set up to use Ray. This includes the following steps:
TorchTrainer
classFor further details on how to adjust your existing training script to get the most out of Ray, refer to the Ray documentation.
The following diagram illustrates the complete architecture you have built after completing these steps.
Ray is designed with robust fault tolerance mechanisms to provide resilience in distributed systems where failures are inevitable. These failures generally fall into two categories: application-level failures, which stem from bugs in user code or external system issues, and system-level failures, caused by node crashes, network disruptions, or internal bugs in Ray. To address these challenges, Ray provides tools and strategies that enable applications to detect, recover, and adapt seamlessly, providing reliability and performance in distributed environments. In this section, we look at two of the most common types of failures, and how to implement fault tolerance in them that SageMaker HyperPod compliments: Ray Train worker failures and Ray worker node failures.
At the time of writing, there are no official updates regarding head pod fault tolerance and auto resume capabilities. Though head pod failures are rare, in the unlikely event of such a failure, you will need to manually restart your training job. However, you can still resume progress from the last saved checkpoint. To minimize the risk of hardware-related head pod failures, it’s advised to place the head pod on a dedicated, CPU-only SageMaker HyperPod node, because GPU failures are a common training job failure point.
Ray Train is designed with fault tolerance to handle worker failures, such as RayActorErrors
. When a failure occurs, the affected workers are stopped, and new ones are automatically started to maintain operations. However, for training progress to continue seamlessly after a failure, saving and loading checkpoints is essential. Without proper checkpointing, the training script will restart, but all progress will be lost. Checkpointing is therefore a critical component of Ray Train’s fault tolerance mechanism and needs to be implemented in your code.
When a failure is detected, Ray shuts down failed workers and provisions new ones. Although this happens, we can tell the training function to always keep retrying until training can continue. Each instance of recovery from a worker failure is considered a retry. We can set the number of retries through the max_failures
attribute of the FailureConfig
, which is set in the RunConfig
passed to the Trainer
(for example, TorchTrainer
). See the following code:
For more information, see Handling Failures and Node Preemption.
A checkpoint in Ray Train is a lightweight interface representing a directory stored either locally or remotely. For example, a cloud-based checkpoint might point to s3://my-bucket/checkpoint-dir
, and a local checkpoint might point to /tmp/checkpoint-dir
. To learn more, see Saving checkpoints during training.
To save a checkpoint in the training loop, you first need to write your checkpoint to a local directory, which can be temporary. When saving, you can use checkpoint utilities from other frameworks like torch.save
, pl.Trainer.save_checkpoint
, accelerator.save_model
, save_pretrained
, tf.keras.Model.save
, and more. Then you create a checkpoint from the directory using Checkpoint.from_directory
. Finally, report the checkpoint to Ray Train using ray.train.report(metrics, checkpoint=...)
. The metrics reported alongside the checkpoint are used to keep track of the best-performing checkpoints. Reporting will upload the checkpoint to persistent storage.
If you save checkpoints with ray.train.report(..., checkpoint=...)
and run on a multi-node cluster, Ray Train will raise an error if NFS or cloud storage is not set up. This is because Ray Train expects all workers to be able to write the checkpoint to the same persistent storage location.
Finally, clean up the local temporary directory to free up disk space (for example, by exiting the tempfile.TemporaryDirectory
context). We can save a checkpoint every epoch or every few iterations.
The following diagram illustrates this setup.
The following code is an example of saving checkpoints using native PyTorch:
Ray Train also comes with CheckpointConfig
, a way to configure checkpointing options:
To restore training state from a checkpoint if your training job were to fail and retry, you should modify your training loop to auto resume and then restore a Ray Train job. By pointing to the path of your saved checkpoints, you can restore your trainer and continue training. Here’s a quick example:
To streamline restoration, you can add auto resume logic to your script. This checks if a valid experiment directory exists and restores the trainer if available. If not, it starts a new experiment:
To summarize, to provide fault tolerance and auto resume when using Ray Train libraries, set your max_failures
parameter in the FailureConfig
(we recommend setting it to -1
to make sure it will keep retrying until the SageMaker HyperPod node is rebooted or replaced), and make sure you have enabled checkpointing in your code.
In addition to the aforementioned mechanisms to recover from Ray Train worker failures, Ray also provides fault tolerance at the worker pod level. When a worker pod fails (this includes scenarios in which the raylet
process fails), the running tasks and actors on it will fail and the objects owned by worker processes of this pod will be lost. In this case, the tasks, actors, and objects fault tolerance mechanisms will start and try to recover the failures using other worker pods.
These mechanisms will be implicitly handled by the Ray Train library. To learn more about the underlying fault tolerance at the tasks, actors, objects (implemented at the Ray Core level), see Fault Tolerance.
In practice, this means that in case of a worker pod failure, the following occurs:
In the context of KubeRay, Ray worker nodes are represented by Kubernetes pods, and failures at this level can include issues such as pod eviction or preemption caused by software-level factors.
However, another critical scenario to consider is hardware failures. If the underlying SageMaker HyperPod node becomes unavailable due to a hardware issue, such as a GPU error, it would inevitably cause the Ray worker pod running on that node to fail as well. Now the fault tolerance and auto-healing mechanisms of your SageMaker HyperPod cluster start and will reboot or replace the faulty node. After the new healthy node is added into the SageMaker HyperPod cluster, Ray will schedule a new worker pod onto the SageMaker HyperPod node and recover the interrupted training. In this case, both the Ray fault tolerance mechanism and the SageMaker HyperPod resiliency features work together seamlessly and make sure that even in case of a hardware failure, your ML training workload can auto resume and pick up from where it was interrupted.
As you have seen, there are various built-in resiliency and fault-tolerance mechanisms that allow your Ray Train workload on SageMaker HyperPod to recover and auto resume. Because these mechanisms will essentially recover by restarting the training job, it is crucial that checkpointing is implemented in the training script. It is also generally advised to save the checkpoints on a shared and persistent path, such as an Amazon Simple Storage Service (Amazon S3) bucket or FSx for Lustre file system.
To delete your SageMaker HyperPod cluster created in this post, you can either use the SageMaker AI console or use the following AWS CLI command:
Cluster deletion will take a few minutes. You can confirm successful deletion after you see no clusters on the SageMaker AI console.
If you used the CloudFormation stack to create resources, you can delete it using the following command:
This post demonstrated how to set up and deploy Ray clusters on SageMaker HyperPod, highlighting key considerations such as storage configuration and fault tolerance and auto resume mechanisms.
Running Ray jobs on SageMaker HyperPod offers a powerful solution for distributed AI/ML workloads, combining the flexibility of Ray with the robust infrastructure of SageMaker HyperPod. This integration provides enhanced resiliency and auto resume capabilities, which are crucial for long-running and resource-intensive tasks. By using Ray’s distributed computing framework and the built-in features of SageMaker HyperPod, you can efficiently manage complex ML workflows, specifically training workloads as covered in this post. As AI/ML workloads continue to grow in scale and complexity, the combination of Ray and SageMaker HyperPod offers a scalable, resilient, and efficient platform for tackling the most demanding computational challenges in machine learning.
To get started with SageMaker HyperPod, refer to the Amazon EKS Support in Amazon SageMaker HyperPod workshop and the Amazon SageMaker HyperPod Developer Guide. To learn more about the aws-do-ray framework, refer to the GitHub repo.
Organizations increasingly adopt machine learning solutions into their daily operations and long-term strategies, and, as…
We’re exploring the frontiers of AGI, prioritizing technical safety, proactive risk assessment, and collaboration with…
Many robotics tasks, such as path planning or trajectory optimization, are formulated as optimal control…
Hugging Face warned that Yourbench is compute intensive but this might be a price enterprises…
President Donald Trump says taxing imports will strengthen domestic manufacturing. Hours before announcing new tariffs,…