Welcome to Ray!
Contents

Scaling with Ray
from typing import Dict
import numpy as np
import ray
# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
ds = ray.data.from_numpy(np.asarray(["Complete this", "for me"]))
# Step 2: Define a Predictor class for inference.
class HuggingFacePredictor:
def __init__(self):
from transformers import pipeline
# Initialize a pre-trained GPT2 Huggingface pipeline.
self.model = pipeline("text-generation", model="gpt2")
# Logic for inference on 1 batch of data.
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
# Get the predictions from the input batch.
predictions = self.model(
list(batch["data"]), max_length=20, num_return_sequences=1)
# `predictions` is a list of length-one lists. For example:
# [[{'generated_text': 'output_1'}], ..., [{'generated_text': 'output_2'}]]
# Modify the output to get it into the following format instead:
# ['output_1', 'output_2']
batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
return batch
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
scale = ray.data.ActorPoolStrategy(size=2)
# Step 3: Map the Predictor over the Dataset to get predictions.
predictions = ds.map_batches(HuggingFacePredictor, compute=scale)
# Step 4: Show one prediction output.
predictions.show(limit=1)
from ray.air.config import ScalingConfig
from ray.train.torch import TorchTrainer
# Step 1: setup PyTorch model training as you normally would
def train_loop_per_worker():
model = ...
train_dataset = ...
for epoch in range(num_epochs):
... # model training logic
# Step 2: setup Ray's PyTorch Trainer to run on 32 GPUs
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=32, use_gpu=True),
datasets={"train": train_dataset},
)
# Step 3: run distributed model training on 32 GPUs
result = trainer.fit()
from ray import tune
from ray.air.config import ScalingConfig
from ray.train.lightgbm import LightGBMTrainer
train_dataset, eval_dataset = ...
# Step 1: setup Ray's LightGBM Trainer to train on 64 CPUs
trainer = LightGBMTrainer(
...
scaling_config=ScalingConfig(num_workers=64),
datasets={"train": train_dataset, "eval": eval_dataset},
)
# Step 2: setup Ray Tuner to run 1000 trials
tuner = tune.Tuner(
trainer=trainer,
param_space=hyper_param_space,
tune_config=tune.TuneConfig(num_samples=1000),
)
# Step 3: run distributed HPO with 1000 trials; each trial runs on 64 CPUs
result_grid = tuner.fit()
import pandas as pd
from ray import serve
from starlette.requests import Request
@serve.deployment(ray_actor_options={"num_gpus": 1})
class PredictDeployment:
def __init__(self, model_id: str, revision: str = None):
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
self.model = AutoModelForCausalLM.from_pretrained(
model_id,
…
)
self.tokenizer = AutoTokenizer.from_pretrained(model_id)
def generate(self, text: str) -> pd.DataFrame:
input_ids = self.tokenizer(text, return_tensors="pt").input_ids.to(
self.model.device
)
gen_tokens = self.model.generate(
input_ids,
…
)
return pd.DataFrame(
self.tokenizer.batch_decode(gen_tokens), columns=["responses"]
)
async def __call__(self, http_request: Request) -> str:
prompts: list[str] = await http_request.json()["prompts"]
return self.generate(prompts)
from ray.rllib.algorithms.ppo import PPOConfig
# Step 1: configure PPO to run 64 parallel workers to collect samples from the env.
ppo_config = (
PPOConfig()
.environment(env="Taxi-v3")
.rollouts(num_rollout_workers=64)
.framework("torch")
.training(model=rnn_lage)
)
# Step 2: build the PPO algorithm
ppo_algo = ppo_config.build()
# Step 3: train and evaluate PPO
for _ in range(5):
print(ppo_algo.train())
ppo_algo.evaluate()
Getting Started

Learn basics
Understand how the Ray framework scales your ML workflows.
Learn more >

Install Ray
pip install -U "ray[air]"
Installation guide >

Try it out
Experiment with Ray with an introductory notebook.
Open the notebook>
Beyond the basics

Ray AI Runtime
Scale the entire ML pipeline from data ingest to model serving with high-level Python APIs that integrate with popular ecosystem frameworks.
Learn more about AIR >
Ray Core
Scale generic Python code with simple, foundational primitives that enable a high degree of control for building distributed applications or custom platforms.
Learn more about Core >
Ray Clusters
Deploy a Ray cluster on AWS, GCP, Azure or kubernetes from a laptop to a large cluster to seamlessly scale workloads for production
Learn more about clusters >Getting involved
Sorry! We could not find an example matching that filter.
Help us improve our examples by suggesting one. Tell us what example you would like to have.
Blog
How Ray solves common production challenges for generative AI infrastructure
Blog
Training 175B Parameter Language Models at 1000 GPU scale with Alpa and Ray
Blog
Faster stable diffusion fine-tuning with Ray AIR
Blog
How to fine tune and serve LLMs simply, quickly and cost effectively using Ray + DeepSpeed + HuggingFace
Blog
How OpenAI Uses Ray to Train Tools like ChatGPT
Code example
GPT-J-6B Fine-Tuning with Ray AIR and DeepSpeed
Tutorial
Get started with Ray AIR from an existing PyTorch codebase
Tutorial
Get started with Ray AIR from an existing Tensorflow/Keras
Code example
Distributed training with LightGBM
Tutorial
Distributed training with XGBoost
Tutorial
Distributed tuning with XGBoost
Code example
Integrating with Scikit-Learn (non-distributed)
Code example
Build an AutoML system for time-series forecasting with Ray AIR
Code example
Perform batch tuning on NYC Taxi Dataset with Ray AIR
Code example
Perform batch forecasting on NYC Taxi Dataset with Prophet, ARIMA and Ray AIR
Code example
How to use Ray AIR to run Hugging Face Transformers with DeepSpeed for fine-tuning a large model
Code example
How to use Ray AIR to do batch prediction with the Hugging Face Transformers GPT-J model
Code example
How to use Ray AIR to do online serving with the Hugging Face Transformers GPT-J model
Code example
How to fine-tune a DreamBooth text-to-image model with your own images.
Code example
How to fine-tune a dolly-v2-7b model with Ray AIR LightningTrainer and FSDP
Code example
Torch Image Classification Example with Ray AIR
Code example
Torch Object Detection Example with Ray AIR
Code example
Image Classification Batch Inference with PyTorch ResNet152
Code example
How to use Ray AIR to do batch prediction with the Stable Diffusion text-to-image model
Code example
Object Detection Batch Inference with PyTorch FasterRCNN_ResNet50
Code example
Image Classification Batch Inference with PyTorch ResNet18
Code example
Image Classification Batch Inference with Huggingface Vision Transformer
Code example
How to log results and upload models to Comet ML
Code example
How to log results and upload models to Weights and Biases
Code example
Serving RL models with Ray AIR
Code example
RL Online Learning with Ray AIR
Code example
RL Offline Learning with Ray AIR
Code example
Incrementally train and deploy a PyTorch CV model
Code example
Integrate with Feast feature store in both train and inference
Code example
Serving ML models with Ray Serve (Tensorflow, PyTorch, Scikit-Learn, others)
Code example
Batching tutorial for Ray Serve
Code example
Serving RLlib Models with Ray Serve
Code example
Scaling your Gradio app with Ray Serve
Code example
Visualizing a Deployment Graph with Gradio
Code example
Java tutorial for Ray Serve
Code example
Serving a Stable Diffusion Model
Code example
Serving a Distilbert Model
Code example
Serving an Object Detection Model
Code example
Fine-tuning DreamBooth with Ray AIR
Code example
Stable Diffusion Batch Prediction with Ray AIR
Code example
GPT-J-6B Serving with Ray AIR
Blog
Offline Batch Inference: Comparing Ray, Apache Spark, and SageMaker
Blog
Streaming distributed execution across CPUs and GPUs
Blog
Using Ray Data to parallelize LangChain inference
Blog
Batch Prediction using Ray Data
Code example
Batch Inference on NYC taxi data using Ray Data
Code example
Batch OCR processing using Ray Data
Blog
Training One Million ML Models in Record Time with Ray
Blog
Many Models Batch Training at Scale with Ray Core
Code example
Batch Training with Ray Core
Code example
Batch Training with Ray Data
Tutorial
Tune Basic Parallel Experiments
Code example
Batch Training and Tuning using Ray Tune
Video
Scaling Instacart fulfillment ML on Ray
Code example
Using Aim with Ray Tune For Experiment Management
Code example
Using Comet with Ray Tune For Experiment Management
Code example
Tracking Your Experiment Process Weights & Biases
Code example
Using MLflow Tracking & AutoLogging with Tune
Code example
How To Use Tune With Ax
Code example
How To Use Tune With Dragonfly
Code example
How To Use Tune With Scikit-Optimize
Code example
How To Use Tune With HyperOpt
Code example
How To Use Tune With BayesOpt
Code example
How To Use Tune With BlendSearch and CFO
Code example
How To Use Tune With TuneBOHB
Code example
How To Use Tune With Nevergrad
Code example
How To Use Tune With Optuna
Code example
How To Use Tune With ZOOpt
Code example
How To Use Tune With SigOpt
Code example
How To Use Tune With HEBO
Video
Productionizing ML at Scale with Ray Serve
Blog
Simplify your MLOps with Ray & Ray Serve
Tutorial
Getting Started with Ray Serve
Tutorial
Model Composition in Serve
Tutorial
Getting Started with Ray Tune
Blog
How to distribute hyperparameter tuning with Ray Tune
Video
Simple Distributed Hyperparameter Optimization
Blog
Hyperparameter Search with 🤗 Transformers
Code example
How To Use Tune With Keras & TF Models
Code example
How To Use Tune With PyTorch Models
Code example
How To Tune PyTorch Lightning Models
Code example
How To Tune MXNet Models
Code example
Model Selection & Serving With Ray Serve
Code example
Tuning RL Experiments With Ray Tune & Ray Serve
Code example
A Guide To Tuning XGBoost Parameters With Tune
Code example
A Guide To Tuning LightGBM Parameters With Tune
Code example
A Guide To Tuning Horovod Parameters With Tune
Code example
A Guide To Tuning Huggingface Transformers With Tune
Code example
More Tune use cases on the Blog
Video
Ray Train, PyTorch, TorchX, and distributed deep learning
Code example
Elastic Distributed Training with XGBoost on Ray
Tutorial
Getting Started with Ray Train
Code example
Fine-tune a 🤗 Transformers model
Code example
PyTorch Fashion MNIST Training Example
Code example
Transformers with PyTorch Training Example
Code example
TensorFlow MNIST Training Example
Code example
End-to-end Horovod Training Example
Code example
End-to-end PyTorch Lightning Training Example
Code example
Use LightningTrainer with Ray Data and Batch Predictor
Code example
Fine-tune LLM with AIR LightningTrainer and FSDP
Code example
End-to-end Example for Tuning a TensorFlow Model
Code example
End-to-end Example for Tuning a PyTorch Model with PBT
Code example
Logging Training Runs with MLflow
Code example
Using Experiment Tracking Tools in LightningTrainer
Course
Applied Reinforcement Learning with RLlib
Blog
Intro to RLlib: Example Environments
Code example
A collection of tuned hyperparameters by RLlib algorithm
Code example
A collection of reasonably optimized Atari and MuJoCo results for RLlib
Code example
RLlib’s trajectory view API and how it enables implementations of GTrXL (attention net) architectures
Code example
A how-to on connecting RLlib with the Unity3D game engine for running visual- and physics-based RL experiments
Code example
How we ported 12 of RLlib’s algorithms from TensorFlow to PyTorch and what we learnt on the way
Code example
This blog post is a brief tutorial on multi-agent RL and its design in RLlib
Code example
Exploration of a functional paradigm for implementing reinforcement learning (RL) algorithms
Code example
Example of defining and registering a gym env and model for use with RLlib
Code example
Rendering and recording of an environment
Code example
Coin game example with RLlib
Code example
RecSym environment example (for recommender systems) using the SlateQ algorithm
Code example
VizDoom example script using RLlib’s auto-attention wrapper
Code example
Attention Net (GTrXL) learning the “repeat-after-me” environment
Code example
Working with custom Keras models in RLlib
Tutorial
Getting Started with RLlib
Video
Deep reinforcement learning at Riot Games
Blog
The Magic of Merlin - Shopify’s New ML Platform
Tutorial
Large Scale Deep Learning Training and Tuning with Ray
Blog
Griffin: How Instacart’s ML Platform Tripled in a year
Video
Predibase - A low-code deep learning platform built for scale
Blog
Building a ML Platform with Kubeflow and Ray on GKE
Video
Ray Summit Panel - ML Platform on Ray
Code example
AutoML for Time Series with Ray
Blog
Highly Available and Scalable Online Applications on Ray at Ant Group
Blog
Ray Forward 2022 Conference: Hyper-scale Ray Application Use Cases
Blog
A new world record on the CloudSort benchmark using Ray
Code example
Speed up your web crawler by parallelizing it with Ray
The Ray Ecosystem
This page lists libraries that have integrations with Ray for distributed execution
in alphabetical order.
It’s easy to add your own integration to this list.
Simply open a pull request with a few lines of text, see the dropdown below for
more information.
Adding Your Integration
To add an integration, simply add an entry to the projects list of our
Gallery YAML on GitHub.
- name: the integration link button text
section_title: The section title for this integration
description: A quick description of your library and its integration with Ray
website: The URL of your website
repo: The URL of your project on GitHub
image: The URL of a logo of your project
That’s all!
Classy Vision is a new end-to-end, PyTorch-based framework for large-scale training of state-of-the-art image and video classification models. The library features a modular, flexible design that allows anyone to train machine learning models on top of PyTorch using very simple abstractions.
Classy Vision Integration
Dask provides advanced parallelism for analytics, enabling performance at scale for the tools you love. Dask uses existing Python APIs and data structures to make it easy to switch between Numpy, Pandas, Scikit-learn to their Dask-powered equivalents.
Dask Integration
Flambé is a machine learning experimentation framework built to accelerate the entire research life cycle. Flambé’s main objective is to provide a unified interface for prototyping models, running experiments containing complex pipelines, monitoring those experiments in real-time, reporting results, and deploying a final model for inference.
Flambé Integration
Flyte is a Kubernetes-native workflow automation platform for complex, mission-critical data and ML processes at scale. It has been battle-tested at Lyft, Spotify, Freenome, and others and is truly open-source.
Flyte Integration
Horovod is a distributed deep learning training framework for TensorFlow, Keras, PyTorch, and Apache MXNet. The goal of Horovod is to make distributed deep learning fast and easy to use.
Horovod Integration
State-of-the-art Natural Language Processing for Pytorch and TensorFlow 2.0. It integrates with Ray for distributed hyperparameter tuning of transformer models.
Hugging Face Transformers Integration
Analytics Zoo seamlessly scales TensorFlow, Keras and PyTorch to distributed big data (using Spark, Flink & Ray).
Intel Analytics Zoo Integration
The power of 350+ pre-trained NLP models, 100+ Word Embeddings, 50+ Sentence Embeddings, and 50+ Classifiers in 46 languages with 1 line of Python code.
NLU Integration
Ludwig is a toolbox that allows users to train and test deep learning models without the need to write code. With Ludwig, you can train a deep learning model on Ray in zero lines of code, automatically leveraging Dask on Ray for data preprocessing, Horovod on Ray for distributed training, and Ray Tune for hyperparameter optimization.
Ludwig Integration
Mars is a tensor-based unified framework for large-scale data computation which scales Numpy, Pandas and Scikit-learn. Mars can scale in to a single machine, and scale out to a cluster with thousands of machines.
MARS Integration
Scale your pandas workflows by changing one line of code. Modin transparently distributes the data and computation so that all you need to do is continue using the pandas API as you were before installing Modin.
Modin Integration
Prefect is an open source workflow orchestration platform in Python. It allows you to easily define, track and schedule workflows in Python. This integration makes it easy to run a Prefect workflow on a Ray cluster in a distributed way.
Prefect Integration
PyCaret is an open source low-code machine learning library in Python that aims to reduce the hypothesis to insights cycle time in a ML experiment. It enables data scientists to perform end-to-end experiments quickly and efficiently.
PyCaret Integration
RayDP (“Spark on Ray”) enables you to easily use Spark inside a Ray program. You can use Spark to read the input data, process the data using SQL, Spark DataFrame, or Pandas (via Koalas) API, extract and transform features using Spark MLLib, and use RayDP Estimator API for distributed training on the preprocessed dataset.
RayDP Integration
Scikit-learn is a free software machine learning library for the Python programming language. It features various classification, regression and clustering algorithms including support vector machines, random forests, gradient boosting, k-means and DBSCAN, and is designed to interoperate with the Python numerical and scientific libraries NumPy and SciPy.
Scikit Learn Integration
Alibi is an open source Python library aimed at machine learning model inspection and interpretation. The focus of the library is to provide high-quality implementations of black-box, white-box, local and global explanation methods for classification and regression models.
Seldon Alibi Integration
Sematic is an open-source ML pipelining tool written in Python. It enables users to write end-to-end pipelines that can seamlessly transition between your laptop and the cloud, with rich visualizations, traceability, reproducibility, and usability as first-class citizens. This integration enables dynamic allocation of Ray clusters within Sematic pipelines.
Sematic Integration
spaCy is a library for advanced Natural Language Processing in Python and Cython. It’s built on the very latest research, and was designed from day one to be used in real products.
spaCy Integration
XGBoost is a popular gradient boosting library for classification and regression. It is one of the most popular tools in data science and workhorse of many top-performing Kaggle kernels.
XGBoost Integration
LightGBM is a high-performance gradient boosting library for classification and regression. It is designed to be distributed and efficient.
LightGBM Integration
Volcano is system for running high-performance workloads on Kubernetes. It features powerful batch scheduling capabilities required by ML and other data-intensive workloads.
Volcano Integration
What is Ray Core?
Ray Core provides a small number of core primitives (i.e., tasks, actors, objects) for building and scaling distributed applications. Below we’ll walk through simple examples that show you how to turn your functions and classes easily into Ray tasks and actors, and how to work with Ray objects.
Getting Started
To get started, install Ray via pip install -U ray. See Installing Ray for more installation options. The following few sections will walk through the basics of using Ray Core.
The first step is to import and initialize Ray:
import ray
ray.init()
In recent versions of Ray (>=1.5), ray.init() is automatically called on the first use of a Ray remote API.
Running a Task
Ray lets you run functions as remote tasks in the cluster. To do this, you decorate your function with @ray.remote to declare that you want to run this function remotely.
Then, you call that function with .remote() instead of calling it normally.
This remote call returns a future, a so-called Ray object reference, that you can then fetch with ray.get:
# Define the square task.
@ray.remote
def square(x):
return x * x
# Launch four parallel square tasks.
futures = [square.remote(i) for i in range(4)]
# Retrieve results.
print(ray.get(futures))
# -> [0, 1, 4, 9]
Calling an Actor
Ray provides actors to allow you to parallelize computation across multiple actor instances. When you instantiate a class that is a Ray actor, Ray will start a remote instance of that class in the cluster. This actor can then execute remote method calls and maintain its own internal state:
# Define the Counter actor.
@ray.remote
class Counter:
def __init__(self):
self.i = 0
def get(self):
return self.i
def incr(self, value):
self.i += value
# Create a Counter actor.
c = Counter.remote()
# Submit calls to the actor. These calls run asynchronously but in
# submission order on the remote actor process.
for _ in range(10):
c.incr.remote(1)
# Retrieve final actor state.
print(ray.get(c.get.remote()))
# -> 10
The above covers very basic actor usage. For a more in-depth example, including using both tasks and actors together, check out Monte Carlo Estimation of π.
Passing an Object
As seen above, Ray stores task and actor call results in its distributed object store, returning object references that can be later retrieved. Object references can also be created explicitly via ray.put, and object references can be passed to tasks as substitutes for argument values:
import numpy as np
# Define a task that sums the values in a matrix.
@ray.remote
def sum_matrix(matrix):
return np.sum(matrix)
# Call the task with a literal argument value.
print(ray.get(sum_matrix.remote(np.ones((100, 100)))))
# -> 10000.0
# Put a large array into the object store.
matrix_ref = ray.put(np.ones((1000, 1000)))
# Call the task with the object reference as an argument.
print(ray.get(sum_matrix.remote(matrix_ref)))
# -> 1000000.0
Next Steps
To check how your application is doing, you can use the Ray dashboard.
Ray’s key primitives are simple, but can be composed together to express almost any kind of distributed computation.
Learn more about Ray’s key concepts with the following user guides:
Using remote functions (Tasks)
Using remote classes (Actors)
Working with Ray Objects
Key Concepts
This section overviews Ray’s key concepts. These primitives work together to enable Ray to flexibly support a broad range of distributed applications.
Tasks
Ray enables arbitrary functions to be executed asynchronously on separate Python workers. These asynchronous Ray functions are called “tasks”. Ray enables tasks to specify their resource requirements in terms of CPUs, GPUs, and custom resources. These resource requests are used by the cluster scheduler to distribute tasks across the cluster for parallelized execution.
See the User Guide for Tasks.
Actors
Actors extend the Ray API from functions (tasks) to classes. An actor is essentially a stateful worker (or a service). When a new actor is instantiated, a new worker is created, and methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker. Like tasks, actors support CPU, GPU, and custom resource requirements.
See the User Guide for Actors.
Objects
In Ray, tasks and actors create and compute on objects. We refer to these objects as remote objects because they can be stored anywhere in a Ray cluster, and we use object refs to refer to them. Remote objects are cached in Ray’s distributed shared-memory object store, and there is one object store per node in the cluster. In the cluster setting, a remote object can live on one or many nodes, independent of who holds the object ref(s).
See the User Guide for Objects.
Placement Groups
Placement groups allow users to atomically reserve groups of resources across multiple nodes (i.e., gang scheduling). They can be then used to schedule Ray tasks and actors packed as close as possible for locality (PACK), or spread apart (SPREAD). Placement groups are generally used for gang-scheduling actors, but also support tasks.
See the User Guide for Placement Groups.
Environment Dependencies
When Ray executes tasks and actors on remote machines, their environment dependencies (e.g., Python packages, local files, environment variables) must be available for the code to run. To address this problem, you can (1) prepare your dependencies on the cluster in advance using the Ray Cluster Launcher, or (2) use Ray’s runtime environments to install them on the fly.
See the User Guide for Environment Dependencies.
User Guides
This section explains how to use Ray’s key concepts to build distributed applications.
If you’re brand new to Ray, we recommend starting with the walkthrough.
Tasks
Ray enables arbitrary functions to be executed asynchronously on separate Python workers. Such functions are called Ray remote functions and their asynchronous invocations are called Ray tasks. Here is an example.
Python
import ray
import time
# A regular Python function.
def normal_function():
return 1
# By adding the `@ray.remote` decorator, a regular Python function
# becomes a Ray remote function.
@ray.remote
def my_function():
return 1
# To invoke this remote function, use the `remote` method.
# This will immediately return an object ref (a future) and then create
# a task that will be executed on a worker process.
obj_ref = my_function.remote()
# The result can be retrieved with ``ray.get``.
assert ray.get(obj_ref) == 1
@ray.remote
def slow_function():
time.sleep(10)
return 1
# Ray tasks are executed in parallel.
# All computation is performed in the background, driven by Ray's internal event loop.
for _ in range(4):
# This doesn't block.
slow_function.remote()
See the ray.remote package reference page for specific documentation on how to use ray.remote.
Java
public class MyRayApp {
// A regular Java static method.
public static int myFunction() {
return 1;
}
}
// Invoke the above method as a Ray task.
// This will immediately return an object ref (a future) and then create
// a task that will be executed on a worker process.
ObjectRef res = Ray.task(MyRayApp::myFunction).remote();
// The result can be retrieved with ``ObjectRef::get``.
Assert.assertTrue(res.get() == 1);
public class MyRayApp {
public static int slowFunction() throws InterruptedException {
TimeUnit.SECONDS.sleep(10);
return 1;
}
}
// Ray tasks are executed in parallel.
// All computation is performed in the background, driven by Ray's internal event loop.
for(int i = 0; i < 4; i++) {
// This doesn't block.
Ray.task(MyRayApp::slowFunction).remote();
}
C++
// A regular C++ function.
int MyFunction() {
return 1;
}
// Register as a remote function by `RAY_REMOTE`.
RAY_REMOTE(MyFunction);
// Invoke the above method as a Ray task.
// This will immediately return an object ref (a future) and then create
// a task that will be executed on a worker process.
auto res = ray::Task(MyFunction).Remote();
// The result can be retrieved with ``ray::ObjectRef::Get``.
assert(*res.Get() == 1);
int SlowFunction() {
std::this_thread::sleep_for(std::chrono::seconds(10));
return 1;
}
RAY_REMOTE(SlowFunction);
// Ray tasks are executed in parallel.
// All computation is performed in the background, driven by Ray's internal event loop.
for(int i = 0; i < 4; i++) {
// This doesn't block.
ray::Task(SlowFunction).Remote();
a
Use ray summary tasks from State API to see running and finished tasks and count:
# This API is only available when you download Ray via `pip install "ray[default]"`
ray summary tasks
======== Tasks Summary: 2023-05-26 11:09:32.092546 ========
Stats:
------------------------------------
total_actor_scheduled: 0
total_actor_tasks: 0
total_tasks: 5
Table (group by func_name):
------------------------------------
FUNC_OR_CLASS_NAME STATE_COUNTS TYPE
0 slow_function RUNNING: 4 NORMAL_TASK
1 my_function FINISHED: 1 NORMAL_TASK
Specifying required resources
You can specify resource requirements in tasks (see Specifying Task or Actor Resource Requirements for more details.)
Python
# Specify required resources.
@ray.remote(num_cpus=4, num_gpus=2)
def my_function():
return 1
# Override the default resource requirements.
my_function.options(num_cpus=3).remote()
Java
// Specify required resources.
Ray.task(MyRayApp::myFunction).setResource("CPU", 4.0).setResource("GPU", 2.0).remote();
C++
// Specify required resources.
ray::Task(MyFunction).SetResource("CPU", 4.0).SetResource("GPU", 2.0).Remote();
Passing object refs to Ray tasks
In addition to values, Object refs can also be passed into remote functions. When the task gets executed, inside the function body the argument will be the underlying value. For example, take this function:
Python
@ray.remote
def function_with_an_argument(value):
return value + 1
obj_ref1 = my_function.remote()
assert ray.get(obj_ref1) == 1
# You can pass an object ref as an argument to another Ray task.
obj_ref2 = function_with_an_argument.remote(obj_ref1)
assert ray.get(obj_ref2) == 2
Java
public class MyRayApp {
public static int functionWithAnArgument(int value) {
return value + 1;
}
}
ObjectRef objRef1 = Ray.task(MyRayApp::myFunction).remote();
Assert.assertTrue(objRef1.get() == 1);
// You can pass an object ref as an argument to another Ray task.
ObjectRef objRef2 = Ray.task(MyRayApp::functionWithAnArgument, objRef1).remote();
Assert.assertTrue(objRef2.get() == 2);
C++
static int FunctionWithAnArgument(int value) {
return value + 1;
}
RAY_REMOTE(FunctionWithAnArgument);
auto obj_ref1 = ray::Task(MyFunction).Remote();
assert(*obj_ref1.Get() == 1);
// You can pass an object ref as an argument to another Ray task.
auto obj_ref2 = ray::Task(FunctionWithAnArgument).Remote(obj_ref1);
assert(*obj_ref2.Get() == 2);
Note the following behaviors:
As the second task depends on the output of the first task, Ray will not execute the second task until the first task has finished.
If the two tasks are scheduled on different machines, the output of the
first task (the value corresponding to obj_ref1/objRef1) will be sent over the
network to the machine where the second task is scheduled.
Waiting for Partial Results
Calling ray.get on Ray task results will block until the task finished execution. After launching a number of tasks, you may want to know which ones have
finished executing without blocking on all of them. This could be achieved by ray.wait(). The function
works as follows.
Python
object_refs = [slow_function.remote() for _ in range(2)]
# Return as soon as one of the tasks finished execution.
ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None)
Java
WaitResult waitResult = Ray.wait(objectRefs, /*num_returns=*/0, /*timeoutMs=*/1000);
System.out.println(waitResult.getReady()); // List of ready objects.
System.out.println(waitResult.getUnready()); // list of unready objects.
C++
ray::WaitResult wait_result = ray::Wait(object_refs, /*num_objects=*/0, /*timeout_ms=*/1000);
Multiple returns
By default, a Ray task only returns a single Object Ref. However, you can configure Ray tasks to return multiple Object Refs, by setting the num_returns option.
Python
# By default, a Ray task only returns a single Object Ref.
@ray.remote
def return_single():
return 0, 1, 2
object_ref = return_single.remote()
assert ray.get(object_ref) == (0, 1, 2)
# However, you can configure Ray tasks to return multiple Object Refs.
@ray.remote(num_returns=3)
def return_multiple():
return 0, 1, 2
object_ref0, object_ref1, object_ref2 = return_multiple.remote()
assert ray.get(object_ref0) == 0
assert ray.get(object_ref1) == 1
assert ray.get(object_ref2) == 2
For tasks that return multiple objects, Ray also supports remote generators that allow a task to return one object at a time to reduce memory usage at the worker. Ray also supports an option to set the number of return values dynamically, which can be useful when the task caller does not know how many return values to expect. See the user guide for more details on use cases.
Python
@ray.remote(num_returns=3)
def return_multiple_as_generator():
for i in range(3):
yield i
# NOTE: Similar to normal functions, these objects will not be available
# until the full task is complete and all returns have been generated.
a, b, c = return_multiple_as_generator.remote()
Cancelling tasks
Ray tasks can be canceled by calling ray.cancel() on the returned Object ref.
Python
@ray.remote
def blocking_operation():
time.sleep(10e6)
obj_ref = blocking_operation.remote()
ray.cancel(obj_ref)
try:
ray.get(obj_ref)
except ray.exceptions.TaskCancelledError:
print("Object reference was cancelled.")
Scheduling
For each task, Ray will choose a node to run it
and the scheduling decision is based on a few factors like
the task’s resource requirements,
the specified scheduling strategy
and locations of task arguments.
See Ray scheduling for more details.
Fault Tolerance
By default, Ray will retry failed tasks
due to system failures and specified application-level failures.
You can change this behavior by setting
max_retries and retry_exceptions options
in ray.remote() and .options().
See Ray fault tolerance for more details.
More about Ray Tasks
Nested Remote Functions
Remote functions can call other remote functions, resulting in nested tasks.
For example, consider the following.
import ray
@ray.remote
def f():
return 1
@ray.remote
def g():
# Call f 4 times and return the resulting object refs.
return [f.remote() for _ in range(4)]
@ray.remote
def h():
# Call f 4 times, block until those 4 tasks finish,
# retrieve the results, and return the values.
return ray.get([f.remote() for _ in range(4)])
Then calling g and h produces the following behavior.
>>> ray.get(g.remote())
[ObjectRef(b1457ba0911ae84989aae86f89409e953dd9a80e),
ObjectRef(7c14a1d13a56d8dc01e800761a66f09201104275),
ObjectRef(99763728ffc1a2c0766a2000ebabded52514e9a6),
ObjectRef(9c2f372e1933b04b2936bb6f58161285829b9914)]
>>> ray.get(h.remote())
[1, 1, 1, 1]
One limitation is that the definition of f must come before the
definitions of g and h because as soon as g is defined, it
will be pickled and shipped to the workers, and so if f hasn’t been
defined yet, the definition will be incomplete.
Yielding Resources While Blocked
Ray will release CPU resources when being blocked. This prevents
deadlock cases where the nested tasks are waiting for the CPU
resources held by the parent task.
Consider the following remote function.
@ray.remote(num_cpus=1, num_gpus=1)
def g():
return ray.get(f.remote())
When a g task is executing, it will release its CPU resources when it gets
blocked in the call to ray.get. It will reacquire the CPU resources when
ray.get returns. It will retain its GPU resources throughout the lifetime of
the task because the task will most likely continue to use GPU memory.
Generators
Python generators are functions that behave like an iterator, yielding one
value per iteration. Ray supports remote generators for two use cases:
To reduce max heap memory usage when returning multiple values from a remote
function. See the design pattern guide for an
example.
When the number of return values is set dynamically by the remote function
instead of by the caller.
Remote generators can be used in both actor and non-actor tasks.
num_returns set by the task caller
Where possible, the caller should set the remote function’s number of return values using @ray.remote(num_returns=x) or foo.options(num_returns=x).remote().
Ray will return this many ObjectRefs to the caller.
The remote task should then return the same number of values, usually as a tuple or list.
Compared to setting the number of return values dynamically, this adds less complexity to user code and less performance overhead, as Ray will know exactly how many ObjectRefs to return to the caller ahead of time.
Without changing the caller’s syntax, we can also use a remote generator function to yield the values iteratively.
The generator should yield the same number of return values specified by the caller, and these will be stored one at a time in Ray’s object store.
An error will be raised for generators that yield a different number of values from the one specified by the caller.
For example, we can swap the following code that returns a list of return values:
import numpy as np
@ray.remote
def large_values(num_returns):
return [
np.random.randint(np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8)
for _ in range(num_returns)
]
for this code, which uses a generator function:
@ray.remote
def large_values_generator(num_returns):
for i in range(num_returns):
yield np.random.randint(
np.iinfo(np.int8).max, size=(100_000_000, 1), dtype=np.int8
)
print(f"yielded return value {i}")
The advantage of doing so is that the generator function does not need to hold all of its return values in memory at once.
It can yield the arrays one at a time to reduce memory pressure.
num_returns set by the task executor
In some cases, the caller may not know the number of return values to expect from a remote function.
For example, suppose we want to write a task that breaks up its argument into equal-size chunks and returns these.
We may not know the size of the argument until we execute the task, so we don’t know the number of return values to expect.
In these cases, we can use a remote generator function that returns a dynamic number of values.
To use this feature, set num_returns="dynamic" in the @ray.remote decorator or the remote function’s .options().
Then, when invoking the remote function, Ray will return a single ObjectRef that will get populated with an ObjectRefGenerator when the task completes.
The ObjectRefGenerator can be used to iterate over a list of ObjectRefs containing the actual values returned by the task.
import numpy as np
@ray.remote(num_returns="dynamic")
def split(array, chunk_size):
while len(array) > 0:
yield array[:chunk_size]
array = array[chunk_size:]
array_ref = ray.put(np.zeros(np.random.randint(1000_000)))
block_size = 1000
# Returns an ObjectRef[ObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
print(dynamic_ref)
# ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000)
i = -1
ref_generator = ray.get(dynamic_ref)
print(ref_generator)
#
for i, ref in enumerate(ref_generator):
# Each ObjectRefGenerator iteration returns an ObjectRef.
assert len(ray.get(ref)) <= block_size
num_blocks_generated = i + 1
array_size = len(ray.get(array_ref))
assert array_size <= num_blocks_generated * block_size
print(f"Split array of size {array_size} into {num_blocks_generated} blocks of "
f"size {block_size} each.")
# Split array of size 63153 into 64 blocks of size 1000 each.
# NOTE: The dynamic_ref points to the generated ObjectRefs. Make sure that this
# ObjectRef goes out of scope so that Ray can garbage-collect the internal
# ObjectRefs.
del dynamic_ref
We can also pass the ObjectRef returned by a task with num_returns="dynamic" to another task. The task will receive the ObjectRefGenerator, which it can use to iterate over the task’s return values. Similarly, you can also pass an ObjectRefGenerator as a task argument.
@ray.remote
def get_size(ref_generator : ObjectRefGenerator):
print(ref_generator)
num_elements = 0
for ref in ref_generator:
array = ray.get(ref)
assert len(array) <= block_size
num_elements += len(array)
return num_elements
# Returns an ObjectRef[ObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
assert array_size == ray.get(get_size.remote(dynamic_ref))
# (get_size pid=1504184)
# This also works, but should be avoided because you have to call an additional
# `ray.get`, which blocks the driver.
ref_generator = ray.get(dynamic_ref)
assert array_size == ray.get(get_size.remote(ref_generator))
# (get_size pid=1504184)
Exception handling
If a generator function raises an exception before yielding all its values, the values that it already stored will still be accessible through their ObjectRefs.
The remaining ObjectRefs will contain the raised exception.
This is true for both static and dynamic num_returns.
If the task was called with num_returns="dynamic", the exception will be stored as an additional final ObjectRef in the ObjectRefGenerator.
@ray.remote
def generator():
for i in range(2):
yield i
raise Exception("error")
ref1, ref2, ref3, ref4 = generator.options(num_returns=4).remote()
assert ray.get([ref1, ref2]) == [0, 1]
# All remaining ObjectRefs will contain the error.
try:
ray.get([ref3, ref4])
except Exception as error:
print(error)
dynamic_ref = generator.options(num_returns="dynamic").remote()
ref_generator = ray.get(dynamic_ref)
ref1, ref2, ref3 = ref_generator
assert ray.get([ref1, ref2]) == [0, 1]
# Generators with num_returns="dynamic" will store the exception in the final
# ObjectRef.
try:
ray.get(ref3)
except Exception as error:
print(error)
Note that there is currently a known bug where exceptions will not be propagated for generators that yield more values than expected. This can occur in two cases:
When num_returns is set by the caller, but the generator task returns more than this value.
When a generator task with num_returns="dynamic" is re-executed, and the re-executed task yields more values than the original execution. Note that in general, Ray does not guarantee correctness for task re-execution if the task is nondeterministic, and it is recommended to set @ray.remote(num_retries=0) for such tasks.
# Generators that yield more values than expected currently do not throw an
# exception (the error is only logged).
# See https://github.com/ray-project/ray/issues/28689.
ref1, ref2 = generator.options(num_returns=2).remote()
assert ray.get([ref1, ref2]) == [0, 1]
"""
(generator pid=2375938) 2022-09-28 11:08:51,386 ERROR worker.py:755 --
Unhandled error: Task threw exception, but all return values already
created. This should only occur when using generator tasks.
...
"""
Limitations
Although a generator function creates ObjectRefs one at a time, currently Ray will not schedule dependent tasks until the entire task is complete and all values have been created. This is similar to the semantics used by tasks that return multiple values as a list.
Actors
Actors extend the Ray API from functions (tasks) to classes.
An actor is essentially a stateful worker (or a service). When a new actor is
instantiated, a new worker is created, and methods of the actor are scheduled on
that specific worker and can access and mutate the state of that worker.
Python
The ray.remote decorator indicates that instances of the Counter class will be actors. Each actor runs in its own Python process.
import ray
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
def get_counter(self):
return self.value
# Create an actor from this class.
counter = Counter.remote()
Java
Ray.actor is used to create actors from regular Java classes.
// A regular Java class.
public class Counter {
private int value = 0;
public int increment() {
this.value += 1;
return this.value;
}
}
// Create an actor from this class.
// `Ray.actor` takes a factory method that can produce
// a `Counter` object. Here, we pass `Counter`'s constructor
// as the argument.
ActorHandle counter = Ray.actor(Counter::new).remote();
C++
ray::Actor is used to create actors from regular C++ classes.
// A regular C++ class.
class Counter {
private:
int value = 0;
public:
int Increment() {
value += 1;
return value;
}
};
// Factory function of Counter class.
static Counter *CreateCounter() {
return new Counter();
};
RAY_REMOTE(&Counter::Increment, CreateCounter);
// Create an actor from this class.
// `ray::Actor` takes a factory method that can produce
// a `Counter` object. Here, we pass `Counter`'s factory function
// as the argument.
auto counter = ray::Actor(CreateCounter).Remote();
Use ray list actors from State API to see actors states:
# This API is only available when you install Ray with `pip install "ray[default]"`.
ray list actors
======== List: 2023-05-25 10:10:50.095099 ========
Stats:
------------------------------
Total: 1
Table:
------------------------------
ACTOR_ID CLASS_NAME STATE JOB_ID NAME NODE_ID PID RAY_NAMESPACE
0 9e783840250840f87328c9f201000000 Counter ALIVE 01000000 13a475571662b784b4522847692893a823c78f1d3fd8fd32a2624923 38906 ef9de910-64fb-4575-8eb5-50573faa3ddf
Specifying required resources
You can specify resource requirements in actors too (see Specifying Task or Actor Resource Requirements for more details.)
Python
# Specify required resources for an actor.
@ray.remote(num_cpus=2, num_gpus=0.5)
class Actor:
pass
Java
// Specify required resources for an actor.
Ray.actor(Counter::new).setResource("CPU", 2.0).setResource("GPU", 0.5).remote();
C++
// Specify required resources for an actor.
ray::Actor(CreateCounter).SetResource("CPU", 2.0).SetResource("GPU", 0.5).Remote();
Calling the actor
We can interact with the actor by calling its methods with the remote
operator. We can then call get on the object ref to retrieve the actual
value.
Python
# Call the actor.
obj_ref = counter.increment.remote()
print(ray.get(obj_ref))
1
Java
// Call the actor.
ObjectRef objectRef = counter.task(&Counter::increment).remote();
Assert.assertTrue(objectRef.get() == 1);
C++
// Call the actor.
auto object_ref = counter.Task(&Counter::increment).Remote();
assert(*object_ref.Get() == 1);
Methods called on different actors can execute in parallel, and methods called on the same actor are executed serially in the order that they are called. Methods on the same actor will share state with one another, as shown below.
Python
# Create ten Counter actors.
counters = [Counter.remote() for _ in range(10)]
# Increment each Counter once and get the results. These tasks all happen in
# parallel.
results = ray.get([c.increment.remote() for c in counters])
print(results)
# Increment the first Counter five times. These tasks are executed serially
# and share state.
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[2, 3, 4, 5, 6]
Java
// Create ten Counter actors.
List> counters = new ArrayList<>();
for (int i = 0; i < 10; i++) {
counters.add(Ray.actor(Counter::new).remote());
}
// Increment each Counter once and get the results. These tasks all happen in
// parallel.
List> objectRefs = new ArrayList<>();
for (ActorHandle counterActor : counters) {
objectRefs.add(counterActor.task(Counter::increment).remote());
}
// prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
System.out.println(Ray.get(objectRefs));
// Increment the first Counter five times. These tasks are executed serially
// and share state.
objectRefs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
objectRefs.add(counters.get(0).task(Counter::increment).remote());
}
// prints [2, 3, 4, 5, 6]
System.out.println(Ray.get(objectRefs));
C++
// Create ten Counter actors.
std::vector> counters;
for (int i = 0; i < 10; i++) {
counters.emplace_back(ray::Actor(CreateCounter).Remote());
}
// Increment each Counter once and get the results. These tasks all happen in
// parallel.
std::vector> object_refs;
for (ray::ActorHandle counter_actor : counters) {
object_refs.emplace_back(counter_actor.Task(&Counter::Increment).Remote());
}
// prints 1, 1, 1, 1, 1, 1, 1, 1, 1, 1
auto results = ray::Get(object_refs);
for (const auto &result : results) {
std::cout << *result;
}
// Increment the first Counter five times. These tasks are executed serially
// and share state.
object_refs.clear();
for (int i = 0; i < 5; i++) {
object_refs.emplace_back(counters[0].Task(&Counter::Increment).Remote());
}
// prints 2, 3, 4, 5, 6
results = ray::Get(object_refs);
for (const auto &result : results) {
std::cout << *result;
}
Passing Around Actor Handles
Actor handles can be passed into other tasks. We can define remote functions (or actor methods) that use actor handles.
Python
import time
@ray.remote
def f(counter):
for _ in range(10):
time.sleep(0.1)
counter.increment.remote()
Java
public static class MyRayApp {
public static void foo(ActorHandle counter) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
TimeUnit.MILLISECONDS.sleep(100);
counter.task(Counter::increment).remote();
}
}
}
C++
void Foo(ray::ActorHandle counter) {
for (int i = 0; i < 1000; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
counter.Task(&Counter::Increment).Remote();
}
}
If we instantiate an actor, we can pass the handle around to various tasks.
Python
counter = Counter.remote()
# Start some tasks that use the actor.
[f.remote(counter) for _ in range(3)]
# Print the counter value.
for _ in range(10):
time.sleep(0.1)
print(ray.get(counter.get_counter.remote()))
0
3
8
10
15
18
20
25
30
30
Java
ActorHandle counter = Ray.actor(Counter::new).remote();
// Start some tasks that use the actor.
for (int i = 0; i < 3; i++) {
Ray.task(MyRayApp::foo, counter).remote();
}
// Print the counter value.
for (int i = 0; i < 10; i++) {
TimeUnit.SECONDS.sleep(1);
System.out.println(counter.task(Counter::getCounter).remote().get());
}
C++
auto counter = ray::Actor(CreateCounter).Remote();
// Start some tasks that use the actor.
for (int i = 0; i < 3; i++) {
ray::Task(Foo).Remote(counter);
}
// Print the counter value.
for (int i = 0; i < 10; i++) {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << *counter.Task(&Counter::GetCounter).Remote().Get() << std::endl;
}
Scheduling
For each actor, Ray will choose a node to run it
and the scheduling decision is based on a few factors like
the actor’s resource requirements
and the specified scheduling strategy.
See Ray scheduling for more details.
Fault Tolerance
By default, Ray actors won’t be restarted and
actor tasks won’t be retried when actors crash unexpectedly.
You can change this behavior by setting
max_restarts and max_task_retries options
in ray.remote() and .options().
See Ray fault tolerance for more details.
FAQ: Actors, Workers and Resources
What’s the difference between a worker and an actor?
Each “Ray worker” is a python process.
Workers are treated differently for tasks and actors. Any “Ray worker” is either 1. used to execute multiple Ray tasks or 2. is started as a dedicated Ray actor.
Tasks: When Ray starts on a machine, a number of Ray workers will be started automatically (1 per CPU by default). They will be used to execute tasks (like a process pool). If you execute 8 tasks with num_cpus=2, and total number of CPUs is 16 (ray.cluster_resources()["CPU"] == 16), you will end up with 8 of your 16 workers idling.
Actor: A Ray Actor is also a “Ray worker” but is instantiated at runtime (upon actor_cls.remote()). All of its methods will run on the same process, using the same resources (designated when defining the Actor). Note that unlike tasks, the python processes that runs Ray Actors are not reused and will be terminated when the Actor is deleted.
To maximally utilize your resources, you want to maximize the time that
your workers are working. You also want to allocate enough cluster resources
so that both all of your needed actors can run and any other tasks you
define can run. This also implies that tasks are scheduled more flexibly,
and that if you don’t need the stateful part of an actor, you’re mostly
better off using tasks.
More about Ray Actors
Named Actors
An actor can be given a unique name within their namespace.
This allows you to retrieve the actor from any job in the Ray cluster.
This can be useful if you cannot directly
pass the actor handle to the task that needs it, or if you are trying to
access an actor launched by another driver.
Note that the actor will still be garbage-collected if no handles to it
exist. See Actor Lifetimes for more details.
Python
import ray
@ray.remote
class Counter:
pass
# Create an actor with a name
counter = Counter.options(name="some_name").remote()
# Retrieve the actor later somewhere
counter = ray.get_actor("some_name")
Java
// Create an actor with a name.
ActorHandle counter = Ray.actor(Counter::new).setName("some_name").remote();
...
// Retrieve the actor later somewhere
Optional> counter = Ray.getActor("some_name");
Assert.assertTrue(counter.isPresent());
C++
// Create an actor with a globally unique name
ActorHandle counter = ray::Actor(CreateCounter).SetGlobalName("some_name").Remote();
...
// Retrieve the actor later somewhere
boost::optional> counter = ray::GetGlobalActor("some_name");
We also support non-global named actors in C++, which means that the actor name is only valid within the job and the actor cannot be accessed from another job
// Create an actor with a job-scope-unique name
ActorHandle counter = ray::Actor(CreateCounter).SetName("some_name").Remote();
...
// Retrieve the actor later somewhere in the same job
boost::optional> counter = ray::GetActor("some_name");
Named actors are scoped by namespace. If no namespace is assigned, they will
be placed in an anonymous namespace by default.
Python
import ray
@ray.remote
class Actor:
pass
# driver_1.py
# Job 1 creates an actor, "orange" in the "colors" namespace.
ray.init(address="auto", namespace="colors")
Actor.options(name="orange", lifetime="detached").remote()
# driver_2.py
# Job 2 is now connecting to a different namespace.
ray.init(address="auto", namespace="fruit")
# This fails because "orange" was defined in the "colors" namespace.
ray.get_actor("orange")
# You can also specify the namespace explicitly.
ray.get_actor("orange", namespace="colors")
# driver_3.py
# Job 3 connects to the original "colors" namespace
ray.init(address="auto", namespace="colors")
# This returns the "orange" actor we created in the first job.
ray.get_actor("orange")
Java
import ray
class Actor {
}
// Driver1.java
// Job 1 creates an actor, "orange" in the "colors" namespace.
System.setProperty("ray.job.namespace", "colors");
Ray.init();
Ray.actor(Actor::new).setName("orange").remote();
// Driver2.java
// Job 2 is now connecting to a different namespace.
System.setProperty("ray.job.namespace", "fruits");
Ray.init();
// This fails because "orange" was defined in the "colors" namespace.
Optional> actor = Ray.getActor("orange");
Assert.assertFalse(actor.isPresent()); // actor.isPresent() is false.
// Driver3.java
System.setProperty("ray.job.namespace", "colors");
Ray.init();
// This returns the "orange" actor we created in the first job.
Optional> actor = Ray.getActor("orange");
Assert.assertTrue(actor.isPresent()); // actor.isPresent() is true.
Get-Or-Create a Named Actor
A common use case is to create an actor only if it doesn’t exist.
Ray provides a get_if_exists option for actor creation that does this out of the box.
This method is available after you set a name for the actor via .options().
If the actor already exists, a handle to the actor will be returned
and the arguments will be ignored. Otherwise, a new actor will be
created with the specified arguments.
Python
import ray
@ray.remote
class Greeter:
def __init__(self, value):
self.value = value
def say_hello(self):
return self.value
# Actor `g1` doesn't yet exist, so it is created with the given args.
a = Greeter.options(name="g1", get_if_exists=True).remote("Old Greeting")
assert ray.get(a.say_hello.remote()) == "Old Greeting"
# Actor `g1` already exists, so it is returned (new args are ignored).
b = Greeter.options(name="g1", get_if_exists=True).remote("New Greeting")
assert ray.get(b.say_hello.remote()) == "Old Greeting"
Java
// This feature is not yet available in Java.
C++
// This feature is not yet available in C++.
Actor Lifetimes
Separately, actor lifetimes can be decoupled from the job, allowing an actor to persist even after the driver process of the job exits. We call these actors detached.
Python
counter = Counter.options(name="CounterActor", lifetime="detached").remote()
The CounterActor will be kept alive even after the driver running above script
exits. Therefore it is possible to run the following script in a different
driver:
counter = ray.get_actor("CounterActor")
Note that an actor can be named but not detached. If we only specified the
name without specifying lifetime="detached", then the CounterActor can
only be retrieved as long as the original driver is still running.
Java
System.setProperty("ray.job.namespace", "lifetime");
Ray.init();
ActorHandle counter = Ray.actor(Counter::new).setName("some_name").setLifetime(ActorLifetime.DETACHED).remote();
The CounterActor will be kept alive even after the driver running above process
exits. Therefore it is possible to run the following code in a different
driver:
System.setProperty("ray.job.namespace", "lifetime");
Ray.init();
Optional> counter = Ray.getActor("some_name");
Assert.assertTrue(counter.isPresent());
C++
Customizing lifetime of an actor hasn’t been implemented in C++ yet.
Unlike normal actors, detached actors are not automatically garbage-collected by Ray.
Detached actors must be manually destroyed once you are sure that they are no
longer needed. To do this, use ray.kill to manually terminate the actor.
After this call, the actor’s name may be reused.
Terminating Actors
Actor processes will be terminated automatically when all copies of the
actor handle have gone out of scope in Python, or if the original creator
process dies.
Note that automatic termination of actors is not yet supported in Java or C++.
Manual termination via an actor handle
In most cases, Ray will automatically terminate actors that have gone out of
scope, but you may sometimes need to terminate an actor forcefully. This should
be reserved for cases where an actor is unexpectedly hanging or leaking
resources, and for detached actors, which must be
manually destroyed.
Python
import ray
@ray.remote
class Actor:
pass
actor_handle = Actor.remote()
ray.kill(actor_handle)
# This will not go through the normal Python sys.exit
# teardown logic, so any exit handlers installed in
# the actor using ``atexit`` will not be called.
Java
actorHandle.kill();
// This will not go through the normal Java System.exit teardown logic, so any
// shutdown hooks installed in the actor using ``Runtime.addShutdownHook(...)`` will
// not be called.
C++
actor_handle.Kill();
// This will not go through the normal C++ std::exit
// teardown logic, so any exit handlers installed in
// the actor using ``std::atexit`` will not be called.
This will cause the actor to immediately exit its process, causing any current,
pending, and future tasks to fail with a RayActorError. If you would like
Ray to automatically restart the actor, make sure to set a nonzero
max_restarts in the @ray.remote options for the actor, then pass the
flag no_restart=False to ray.kill.
For named and detached actors, calling ray.kill on
an actor handle destroys the actor and allow the name to be reused.
Use ray list actors --detail from State API to see the death cause of dead actors:
# This API is only available when you download Ray via `pip install "ray[default]"`
ray list actors --detail
---
- actor_id: e8702085880657b355bf7ef001000000
class_name: Actor
state: DEAD
job_id: '01000000'
name: ''
node_id: null
pid: 0
ray_namespace: dbab546b-7ce5-4cbb-96f1-d0f64588ae60
serialized_runtime_env: '{}'
required_resources: {}
death_cause:
actor_died_error_context: # <---- You could see the error message w.r.t why the actor exits.
error_message: The actor is dead because `ray.kill` killed it.
owner_id: 01000000ffffffffffffffffffffffffffffffffffffffffffffffff
owner_ip_address: 127.0.0.1
ray_namespace: dbab546b-7ce5-4cbb-96f1-d0f64588ae60
class_name: Actor
actor_id: e8702085880657b355bf7ef001000000
never_started: true
node_ip_address: ''
pid: 0
name: ''
is_detached: false
placement_group_id: null
repr_name: ''
Manual termination within the actor
If necessary, you can manually terminate an actor from within one of the actor methods.
This will kill the actor process and release resources associated/assigned to the actor.
Python
@ray.remote
class Actor:
def exit(self):
ray.actor.exit_actor()
actor = Actor.remote()
actor.exit.remote()
This approach should generally not be necessary as actors are automatically garbage
collected. The ObjectRef resulting from the task can be waited on to wait
for the actor to exit (calling ray.get() on it will raise a RayActorError).
Java
Ray.exitActor();
Garbage collection for actors haven’t been implemented yet, so this is currently the
only way to terminate an actor gracefully. The ObjectRef resulting from the task
can be waited on to wait for the actor to exit (calling ObjectRef::get on it will
throw a RayActorException).
C++
ray::ExitActor();
Garbage collection for actors haven’t been implemented yet, so this is currently the
only way to terminate an actor gracefully. The ObjectRef resulting from the task
can be waited on to wait for the actor to exit (calling ObjectRef::Get on it will
throw a RayActorException).
Note that this method of termination waits until any previously submitted
tasks finish executing and then exits the process gracefully with sys.exit.
You could see the actor is dead as a result of the user’s exit_actor() call:
# This API is only available when you download Ray via `pip install "ray[default]"`
ray list actors --detail
---
- actor_id: 070eb5f0c9194b851bb1cf1602000000
class_name: Actor
state: DEAD
job_id: '02000000'
name: ''
node_id: 47ccba54e3ea71bac244c015d680e202f187fbbd2f60066174a11ced
pid: 47978
ray_namespace: 18898403-dda0-485a-9c11-e9f94dffcbed
serialized_runtime_env: '{}'
required_resources: {}
death_cause:
actor_died_error_context:
error_message: 'The actor is dead because its worker process has died.
Worker exit type: INTENDED_USER_EXIT Worker exit detail: Worker exits
by an user request. exit_actor() is called.'
owner_id: 02000000ffffffffffffffffffffffffffffffffffffffffffffffff
owner_ip_address: 127.0.0.1
node_ip_address: 127.0.0.1
pid: 47978
ray_namespace: 18898403-dda0-485a-9c11-e9f94dffcbed
class_name: Actor
actor_id: 070eb5f0c9194b851bb1cf1602000000
name: ''
never_started: false
is_detached: false
placement_group_id: null
repr_name: ''
AsyncIO / Concurrency for Actors
Within a single actor process, it is possible to execute concurrent threads.
Ray offers two types of concurrency within an actor:
async execution
threading
Keep in mind that the Python’s Global Interpreter Lock (GIL) will only allow one thread of Python code running at once.
This means if you are just parallelizing Python code, you won’t get true parallelism. If you call Numpy, Cython, Tensorflow, or PyTorch code, these libraries will release the GIL when calling into C/C++ functions.
Neither the Threaded Actors nor AsyncIO for Actors model will allow you to bypass the GIL.
AsyncIO for Actors
Since Python 3.5, it is possible to write concurrent code using the
async/await syntax.
Ray natively integrates with asyncio. You can use ray alongside with popular
async frameworks like aiohttp, aioredis, etc.
import ray
import asyncio
@ray.remote
class AsyncActor:
# multiple invocation of this method can be running in
# the event loop at the same time
async def run_concurrent(self):
print("started")
await asyncio.sleep(2) # concurrent workload here
print("finished")
actor = AsyncActor.remote()
# regular ray.get
ray.get([actor.run_concurrent.remote() for _ in range(4)])
# async ray.get
async def async_get():
await actor.run_concurrent.remote()
asyncio.run(async_get())
(AsyncActor pid=40293) started
(AsyncActor pid=40293) started
(AsyncActor pid=40293) started
(AsyncActor pid=40293) started
(AsyncActor pid=40293) finished
(AsyncActor pid=40293) finished
(AsyncActor pid=40293) finished
(AsyncActor pid=40293) finished
# NOTE: The outputs from the previous code block can show up in subsequent tests.
# To prevent flakiness, we wait for the async calls finish.
import time
print("Sleeping...")
time.sleep(3)
...
ObjectRefs as asyncio.Futures
ObjectRefs can be translated to asyncio.Futures. This feature
make it possible to await on ray futures in existing concurrent
applications.
Instead of:
import ray
@ray.remote
def some_task():
return 1
ray.get(some_task.remote())
ray.wait([some_task.remote()])
you can do:
import ray
import asyncio
@ray.remote
def some_task():
return 1
async def await_obj_ref():
await some_task.remote()
await asyncio.wait([some_task.remote()])
asyncio.run(await_obj_ref())
Please refer to asyncio doc
for more asyncio patterns including timeouts and asyncio.gather.
If you need to directly access the future object, you can call:
import asyncio
async def convert_to_asyncio_future():
ref = some_task.remote()
fut: asyncio.Future = asyncio.wrap_future(ref.future())
print(await fut)
asyncio.run(convert_to_asyncio_future())
1
ObjectRefs as concurrent.futures.Futures
ObjectRefs can also be wrapped into concurrent.futures.Future objects. This
is useful for interfacing with existing concurrent.futures APIs:
import concurrent
refs = [some_task.remote() for _ in range(4)]
futs = [ref.future() for ref in refs]
for fut in concurrent.futures.as_completed(futs):
assert fut.done()
print(fut.result())
1
1
1
1
Defining an Async Actor
By using async method definitions, Ray will automatically detect whether an actor support async calls or not.
import asyncio
@ray.remote
class AsyncActor:
async def run_task(self):
print("started")
await asyncio.sleep(2) # Network, I/O task here
print("ended")
actor = AsyncActor.remote()
# All 5 tasks should start at once. After 2 second they should all finish.
# they should finish at the same time
ray.get([actor.run_task.remote() for _ in range(5)])
(AsyncActor pid=3456) started
(AsyncActor pid=3456) started
(AsyncActor pid=3456) started
(AsyncActor pid=3456) started
(AsyncActor pid=3456) started
(AsyncActor pid=3456) ended
(AsyncActor pid=3456) ended
(AsyncActor pid=3456) ended
(AsyncActor pid=3456) ended
(AsyncActor pid=3456) ended
Under the hood, Ray runs all of the methods inside a single python event loop.
Please note that running blocking ray.get or ray.wait inside async
actor method is not allowed, because ray.get will block the execution
of the event loop.
In async actors, only one task can be running at any point in time (though tasks can be multi-plexed). There will be only one thread in AsyncActor! See Threaded Actors if you want a threadpool.
Setting concurrency in Async Actors
You can set the number of “concurrent” task running at once using the
max_concurrency flag. By default, 1000 tasks can be running concurrently.
import asyncio
@ray.remote
class AsyncActor:
async def run_task(self):
print("started")
await asyncio.sleep(1) # Network, I/O task here
print("ended")
actor = AsyncActor.options(max_concurrency=2).remote()
# Only 2 tasks will be running concurrently. Once 2 finish, the next 2 should run.
ray.get([actor.run_task.remote() for _ in range(8)])
(AsyncActor pid=5859) started
(AsyncActor pid=5859) started
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) started
(AsyncActor pid=5859) started
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) started
(AsyncActor pid=5859) started
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) started
(AsyncActor pid=5859) started
(AsyncActor pid=5859) ended
(AsyncActor pid=5859) ended
Threaded Actors
Sometimes, asyncio is not an ideal solution for your actor. For example, you may
have one method that performs some computation heavy task while blocking the event loop, not giving up control via await. This would hurt the performance of an Async Actor because Async Actors can only execute 1 task at a time and rely on await to context switch.
Instead, you can use the max_concurrency Actor options without any async methods, allowng you to achieve threaded concurrency (like a thread pool).
When there is at least one async def method in actor definition, Ray
will recognize the actor as AsyncActor instead of ThreadedActor.
@ray.remote
class ThreadedActor:
def task_1(self): print("I'm running in a thread!")
def task_2(self): print("I'm running in another thread!")
a = ThreadedActor.options(max_concurrency=2).remote()
ray.get([a.task_1.remote(), a.task_2.remote()])
(ThreadedActor pid=4822) I'm running in a thread!
(ThreadedActor pid=4822) I'm running in another thread!
Each invocation of the threaded actor will be running in a thread pool. The size of the threadpool is limited by the max_concurrency value.
AsyncIO for Remote Tasks
We don’t support asyncio for remote tasks. The following snippet will fail:
@ray.remote
async def f():
pass
Instead, you can wrap the async function with a wrapper to run the task synchronously:
async def f():
pass
@ray.remote
def wrapper():
import asyncio
asyncio.run(f())
Limiting Concurrency Per-Method with Concurrency Groups
Besides setting the max concurrency overall for an asyncio actor, Ray allows methods to be separated into concurrency groups, each with its own asyncio event loop. This allows you to limit the concurrency per-method, e.g., allow a health-check method to be given its own concurrency quota separate from request serving methods.
Concurrency groups are only supported for asyncio actors, not threaded actors.
Defining Concurrency Groups
This defines two concurrency groups, “io” with max concurrency = 2 and
“compute” with max concurrency = 4. The methods f1 and f2 are
placed in the “io” group, and the methods f3 and f4 are placed
into the “compute” group. Note that there is always a default
concurrency group, which has a default concurrency of 1000 in Python and
1 in Java.
Python
You can define concurrency groups for asyncio actors using the concurrency_group decorator argument:
import ray
@ray.remote(concurrency_groups={"io": 2, "compute": 4})
class AsyncIOActor:
def __init__(self):
pass
@ray.method(concurrency_group="io")
async def f1(self):
pass
@ray.method(concurrency_group="io")
async def f2(self):
pass
@ray.method(concurrency_group="compute")
async def f3(self):
pass
@ray.method(concurrency_group="compute")
async def f4(self):
pass
async def f5(self):
pass
a = AsyncIOActor.remote()
a.f1.remote() # executed in the "io" group.
a.f2.remote() # executed in the "io" group.
a.f3.remote() # executed in the "compute" group.
a.f4.remote() # executed in the "compute" group.
a.f5.remote() # executed in the default group.
Java
You can define concurrency groups for concurrent actors using the API setConcurrencyGroups() argument:
class ConcurrentActor {
public long f1() {
return Thread.currentThread().getId();
}
public long f2() {
return Thread.currentThread().getId();
}
public long f3(int a, int b) {
return Thread.currentThread().getId();
}
public long f4() {
return Thread.currentThread().getId();
}
public long f5() {
return Thread.currentThread().getId();
}
}
ConcurrencyGroup group1 =
new ConcurrencyGroupBuilder()
.setName("io")
.setMaxConcurrency(1)
.addMethod(ConcurrentActor::f1)
.addMethod(ConcurrentActor::f2)
.build();
ConcurrencyGroup group2 =
new ConcurrencyGroupBuilder()
.setName("compute")
.setMaxConcurrency(1)
.addMethod(ConcurrentActor::f3)
.addMethod(ConcurrentActor::f4)
.build();
ActorHandle myActor = Ray.actor(ConcurrentActor::new)
.setConcurrencyGroups(group1, group2)
.remote();
myActor.task(ConcurrentActor::f1).remote(); // executed in the "io" group.
myActor.task(ConcurrentActor::f2).remote(); // executed in the "io" group.
myActor.task(ConcurrentActor::f3, 3, 5).remote(); // executed in the "compute" group.
myActor.task(ConcurrentActor::f4).remote(); // executed in the "compute" group.
myActor.task(ConcurrentActor::f5).remote(); // executed in the "default" group.
Default Concurrency Group
By default, methods are placed in a default concurrency group which has a concurrency limit of 1000 in Python, 1 in Java.
The concurrency of the default group can be changed by setting the max_concurrency actor option.
Python
The following AsyncIOActor has 2 concurrency groups: “io” and “default”.
The max concurrency of “io” is 2, and the max concurrency of “default” is 10.
@ray.remote(concurrency_groups={"io": 2})
class AsyncIOActor:
async def f1(self):
pass
actor = AsyncIOActor.options(max_concurrency=10).remote()
Java
The following concurrent actor has 2 concurrency groups: “io” and “default”.
The max concurrency of “io” is 2, and the max concurrency of “default” is 10.
class ConcurrentActor:
public long f1() {
return Thread.currentThread().getId();
}
ConcurrencyGroup group =
new ConcurrencyGroupBuilder()
.setName("io")
.setMaxConcurrency(2)
.addMethod(ConcurrentActor::f1)
.build();
ActorHandle myActor = Ray.actor(ConcurrentActor::new)
.setConcurrencyGroups(group1)
.setMaxConcurrency(10)
.remote();
Setting the Concurrency Group at Runtime
You can also dispatch actor methods into a specific concurrency group at runtime.
The following snippet demonstrates setting the concurrency group of the
f2 method dynamically at runtime.
Python
You can use the .options method.
# Executed in the "io" group (as defined in the actor class).
a.f2.options().remote()
# Executed in the "compute" group.
a.f2.options(concurrency_group="compute").remote()
Java
You can use setConcurrencyGroup method.
// Executed in the "io" group (as defined in the actor creation).
myActor.task(ConcurrentActor::f2).remote();
// Executed in the "compute" group.
myActor.task(ConcurrentActor::f2).setConcurrencyGroup("compute").remote();
Utility Classes
Actor Pool
Python
The ray.util module contains a utility class, ActorPool.
This class is similar to multiprocessing.Pool and lets you schedule Ray tasks over a fixed pool of actors.
import ray
from ray.util import ActorPool
@ray.remote
class Actor:
def double(self, n):
return n * 2
a1, a2 = Actor.remote(), Actor.remote()
pool = ActorPool([a1, a2])
# pool.map(..) returns a Python generator object ActorPool.map
gen = pool.map(lambda a, v: a.double.remote(v), [1, 2, 3, 4])
print(list(gen))
# [2, 4, 6, 8]
See the package reference for more information.
Java
Actor pool hasn’t been implemented in Java yet.
C++
Actor pool hasn’t been implemented in C++ yet.
Message passing using Ray Queue
Sometimes just using one signal to synchronize is not enough. If you need to send data among many tasks or
actors, you can use ray.util.queue.Queue.
import ray
from ray.util.queue import Queue, Empty
ray.init()
# You can pass this object around to different tasks/actors
queue = Queue(maxsize=100)
@ray.remote
def consumer(id, queue):
try:
while True:
next_item = queue.get(block=True, timeout=1)
print(f"consumer {id} got work {next_item}")
except Empty:
pass
[queue.put(i) for i in range(10)]
print("Put work 1 - 10 to queue...")
consumers = [consumer.remote(id, queue) for id in range(2)]
ray.get(consumers)
Ray’s Queue API has a similar API to Python’s asyncio.Queue and queue.Queue.
Out-of-band Communication
Typically, Ray actor communication is done through actor method calls and data is shared through the distributed object store.
However, in some use cases out-of-band communication can be useful.
Wrapping Library Processes
Many libraries already have mature, high-performance internal communication stacks and
they leverage Ray as a language-integrated actor scheduler.
The actual communication between actors is mostly done out-of-band using existing communication stacks.
For example, Horovod-on-Ray uses NCCL or MPI-based collective communications, and RayDP uses Spark’s internal RPC and object manager.
See Ray Distributed Library Patterns for more details.
Ray Collective
Ray’s collective communication library (ray.util.collective) allows efficient out-of-band collective and point-to-point communication between distributed CPUs or GPUs.
See Ray Collective for more details.
HTTP Server
You can start a http server inside the actor and expose http endpoints to clients
so users outside of the ray cluster can communicate with the actor.
Python
import ray
import asyncio
import requests
from aiohttp import web
@ray.remote
class Counter:
async def __init__(self):
self.counter = 0
asyncio.get_running_loop().create_task(self.run_http_server())
async def run_http_server(self):
app = web.Application()
app.add_routes([web.get("/", self.get)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", 25001)
await site.start()
async def get(self, request):
return web.Response(text=str(self.counter))
async def increment(self):
self.counter = self.counter + 1
ray.init()
counter = Counter.remote()
[ray.get(counter.increment.remote()) for i in range(5)]
r = requests.get("http://127.0.0.1:25001/")
assert r.text == "5"
Similarly, you can expose other types of servers as well (e.g., gRPC servers).
Limitations
When using out-of-band communication with Ray actors, keep in mind that Ray does not manage the calls between actors. This means that functionality like distributed reference counting will not work with out-of-band communication, so you should take care not to pass object references in this way.
Actor Task Execution Order
Synchronous, Single-Threaded Actor
In Ray, an actor receives tasks from multiple submitters (including driver and workers).
For tasks received from the same submitter, a synchronous, single-threaded actor executes
them following the submission order.
In other words, a given task will not be executed until previously submitted tasks from
the same submitter have finished execution.
Python
import ray
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def add(self, addition):
self.value += addition
return self.value
counter = Counter.remote()
# For tasks from the same submitter,
# they are executed according to submission order.
value0 = counter.add.remote(1)
value1 = counter.add.remote(2)
# Output: 1. The first submitted task is executed first.
print(ray.get(value0))
# Output: 3. The later submitted task is executed later.
print(ray.get(value1))
1
3
However, the actor does not guarantee the execution order of the tasks from different
submitters. For example, suppose an unfulfilled argument blocks a previously submitted
task. In this case, the actor can still execute tasks submitted by a different worker.
Python
import time
import ray
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def add(self, addition):
self.value += addition
return self.value
counter = Counter.remote()
# Submit task from a worker
@ray.remote
def submitter(value):
return ray.get(counter.add.remote(value))
# Simulate delayed result resolution.
@ray.remote
def delayed_resolution(value):
time.sleep(5)
return value
# Submit tasks from different workers, with
# the first submitted task waiting for
# dependency resolution.
value0 = submitter.remote(delayed_resolution.remote(1))
value1 = submitter.remote(2)
# Output: 3. The first submitted task is executed later.
print(ray.get(value0))
# Output: 2. The later submitted task is executed first.
print(ray.get(value1))
3
2
Asynchronous or Threaded Actor
Asynchronous or threaded actors do not guarantee the
task execution order. This means the system might execute a task
even though previously submitted tasks are pending execution.
Python
import time
import ray
@ray.remote
class AsyncCounter:
def __init__(self):
self.value = 0
async def add(self, addition):
self.value += addition
return self.value
counter = AsyncCounter.remote()
# Simulate delayed result resolution.
@ray.remote
def delayed_resolution(value):
time.sleep(5)
return value
# Submit tasks from the driver, with
# the first submitted task waiting for
# dependency resolution.
value0 = counter.add.remote(delayed_resolution.remote(1))
value1 = counter.add.remote(2)
# Output: 3. The first submitted task is executed later.
print(ray.get(value0))
# Output: 2. The later submitted task is executed first.
print(ray.get(value1))
3
2
Objects
In Ray, tasks and actors create and compute on objects. We refer to these objects as remote objects because they can be stored anywhere in a Ray cluster, and we use object refs to refer to them. Remote objects are cached in Ray’s distributed shared-memory object store, and there is one object store per node in the cluster. In the cluster setting, a remote object can live on one or many nodes, independent of who holds the object ref(s).
An object ref is essentially a pointer or a unique ID that can be used to refer to a
remote object without seeing its value. If you’re familiar with futures, Ray object refs are conceptually
similar.
Object refs can be created in two ways.
They are returned by remote function calls.
They are returned by ray.put().
Python
import ray
# Put an object in Ray's object store.
y = 1
object_ref = ray.put(y)
Java
// Put an object in Ray's object store.
int y = 1;
ObjectRef objectRef = Ray.put(y);
C++
// Put an object in Ray's object store.
int y = 1;
ray::ObjectRef object_ref = ray::Put(y);
Remote objects are immutable. That is, their values cannot be changed after
creation. This allows remote objects to be replicated in multiple object
stores without needing to synchronize the copies.
Fetching Object Data
You can use the ray.get() method to fetch the result of a remote object from an object ref.
If the current node’s object store does not contain the object, the object is downloaded.
Python
If the object is a numpy array
or a collection of numpy arrays, the get call is zero-copy and returns arrays backed by shared object store memory.
Otherwise, we deserialize the object data into a Python object.
import ray
import time
# Get the value of one object ref.
obj_ref = ray.put(1)
assert ray.get(obj_ref) == 1
# Get the values of multiple object refs in parallel.
assert ray.get([ray.put(i) for i in range(3)]) == [0, 1, 2]
# You can also set a timeout to return early from a ``get``
# that's blocking for too long.
from ray.exceptions import GetTimeoutError
# ``GetTimeoutError`` is a subclass of ``TimeoutError``.
@ray.remote
def long_running_function():
time.sleep(8)
obj_ref = long_running_function.remote()
try:
ray.get(obj_ref, timeout=4)
except GetTimeoutError: # You can capture the standard "TimeoutError" instead
print("`get` timed out.")
`get` timed out.
Java
// Get the value of one object ref.
ObjectRef objRef = Ray.put(1);
Assert.assertTrue(objRef.get() == 1);
// You can also set a timeout(ms) to return early from a ``get`` that's blocking for too long.
Assert.assertTrue(objRef.get(1000) == 1);
// Get the values of multiple object refs in parallel.
List> objectRefs = new ArrayList<>();
for (int i = 0; i < 3; i++) {
objectRefs.add(Ray.put(i));
}
List results = Ray.get(objectRefs);
Assert.assertEquals(results, ImmutableList.of(0, 1, 2));
// Ray.get timeout example: Ray.get will throw an RayTimeoutException if time out.
public class MyRayApp {
public static int slowFunction() throws InterruptedException {
TimeUnit.SECONDS.sleep(10);
return 1;
}
}
Assert.assertThrows(RayTimeoutException.class,
() -> Ray.get(Ray.task(MyRayApp::slowFunction).remote(), 3000));
C++
// Get the value of one object ref.
ray::ObjectRef obj_ref = ray::Put(1);
assert(*obj_ref.Get() == 1);
// Get the values of multiple object refs in parallel.
std::vector> obj_refs;
for (int i = 0; i < 3; i++) {
obj_refs.emplace_back(ray::Put(i));
}
auto results = ray::Get(obj_refs);
assert(results.size() == 3);
assert(*results[0] == 0);
assert(*results[1] == 1);
assert(*results[2] == 2);
Passing Object Arguments
Ray object references can be freely passed around a Ray application. This means that they can be passed as arguments to tasks, actor methods, and even stored in other objects. Objects are tracked via distributed reference counting, and their data is automatically freed once all references to the object are deleted.
There are two different ways one can pass an object to a Ray task or method. Depending on the way an object is passed, Ray will decide whether to de-reference the object prior to task execution.
Passing an object as a top-level argument: When an object is passed directly as a top-level argument to a task, Ray will de-reference the object. This means that Ray will fetch the underlying data for all top-level object reference arguments, not executing the task until the object data becomes fully available.
import ray
@ray.remote
def echo(a: int, b: int, c: int):
"""This function prints its input values to stdout."""
print(a, b, c)
# Passing the literal values (1, 2, 3) to `echo`.
echo.remote(1, 2, 3)
# -> prints "1 2 3"
# Put the values (1, 2, 3) into Ray's object store.
a, b, c = ray.put(1), ray.put(2), ray.put(3)
# Passing an object as a top-level argument to `echo`. Ray will de-reference top-level
# arguments, so `echo` will see the literal values (1, 2, 3) in this case as well.
echo.remote(a, b, c)
# -> prints "1 2 3"
Passing an object as a nested argument: When an object is passed within a nested object, for example, within a Python list, Ray will not de-reference it. This means that the task will need to call ray.get() on the reference to fetch the concrete value. However, if the task never calls ray.get(), then the object value never needs to be transferred to the machine the task is running on. We recommend passing objects as top-level arguments where possible, but nested arguments can be useful for passing objects on to other tasks without needing to see the data.
import ray
@ray.remote
def echo_and_get(x_list): # List[ObjectRef]
"""This function prints its input values to stdout."""
print("args:", x_list)
print("values:", ray.get(x_list))
# Put the values (1, 2, 3) into Ray's object store.
a, b, c = ray.put(1), ray.put(2), ray.put(3)
# Passing an object as a nested argument to `echo_and_get`. Ray does not
# de-reference nested args, so `echo_and_get` sees the references.
echo_and_get.remote([a, b, c])
# -> prints args: [ObjectRef(...), ObjectRef(...), ObjectRef(...)]
# values: [1, 2, 3]
The top-level vs not top-level passing convention also applies to actor constructors and actor method calls:
@ray.remote
class Actor:
def __init__(self, arg):
pass
def method(self, arg):
pass
obj = ray.put(2)
# Examples of passing objects to actor constructors.
actor_handle = Actor.remote(obj) # by-value
actor_handle = Actor.remote([obj]) # by-reference
# Examples of passing objects to actor method calls.
actor_handle.method.remote(obj) # by-value
actor_handle.method.remote([obj]) # by-reference
Closure Capture of Objects
You can also pass objects to tasks via closure-capture. This can be convenient when you have a large object that you want to share verbatim between many tasks or actors, and don’t want to pass it repeatedly as an argument. Be aware however that defining a task that closes over an object ref will pin the object via reference-counting, so the object will not be evicted until the job completes.
import ray
# Put the values (1, 2, 3) into Ray's object store.
a, b, c = ray.put(1), ray.put(2), ray.put(3)
@ray.remote
def print_via_capture():
"""This function prints the values of (a, b, c) to stdout."""
print(ray.get([a, b, c]))
# Passing object references via closure-capture. Inside the `print_via_capture`
# function, the global object refs (a, b, c) can be retrieved and printed.
print_via_capture.remote()
# -> prints [1, 2, 3]
Nested Objects
Ray also supports nested object references. This allows you to build composite objects that themselves hold references to further sub-objects.
# Objects can be nested within each other. Ray will keep the inner object
# alive via reference counting until all outer object references are deleted.
object_ref_2 = ray.put([object_ref])
Fault Tolerance
Ray can automatically recover from object data loss
via lineage reconstruction
but not owner failure.
See Ray fault tolerance for more details.
More about Ray Objects
Serialization
Since Ray processes do not share memory space, data transferred between workers and nodes will need to serialized and deserialized. Ray uses the Plasma object store to efficiently transfer objects across different processes and different nodes. Numpy arrays in the object store are shared between workers on the same node (zero-copy deserialization).
Overview
Ray has decided to use a customized Pickle protocol version 5 backport to replace the original PyArrow serializer. This gets rid of several previous limitations (e.g. cannot serialize recursive objects).
Ray is currently compatible with Pickle protocol version 5, while Ray supports serialization of a wider range of objects (e.g. lambda & nested functions, dynamic classes) with the help of cloudpickle.
Plasma Object Store
Plasma is an in-memory object store. It has been originally developed as part of Apache Arrow. Prior to Ray’s version 1.0.0 release, Ray forked Arrow’s Plasma code into Ray’s code base in order to disentangle and continue development with respect to Ray’s architecture and performance needs.
Plasma is used to efficiently transfer objects across different processes and different nodes. All objects in Plasma object store are immutable and held in shared memory. This is so that they can be accessed efficiently by many workers on the same node.
Each node has its own object store. When data is put into the object store, it does not get automatically broadcasted to other nodes. Data remains local to the writer until requested by another task or actor on another node.
Numpy Arrays
Ray optimizes for numpy arrays by using Pickle protocol 5 with out-of-band data.
The numpy array is stored as a read-only object, and all Ray workers on the same node can read the numpy array in the object store without copying (zero-copy reads). Each numpy array object in the worker process holds a pointer to the relevant array held in shared memory. Any writes to the read-only object will require the user to first copy it into the local process memory.
You can often avoid serialization issues by using only native types (e.g., numpy arrays or lists/dicts of numpy arrays and other primitive types), or by using Actors hold objects that cannot be serialized.
Fixing “assignment destination is read-only”
Because Ray puts numpy arrays in the object store, when deserialized as arguments in remote functions they will become read-only. For example, the following code snippet will crash:
import ray
import numpy as np
@ray.remote
def f(arr):
# arr = arr.copy() # Adding a copy will fix the error.
arr[0] = 1
try:
ray.get(f.remote(np.zeros(100)))
except ray.exceptions.RayTaskError as e:
print(e)
# ray.exceptions.RayTaskError(ValueError): ray::f()
# File "test.py", line 6, in f
# arr[0] = 1
# ValueError: assignment destination is read-only
To avoid this issue, you can manually copy the array at the destination if you need to mutate it (arr = arr.copy()). Note that this is effectively like disabling the zero-copy deserialization feature provided by Ray.
Serialization notes
Ray is currently using Pickle protocol version 5. The default pickle protocol used by most python distributions is protocol 3. Protocol 4 & 5 are more efficient than protocol 3 for larger objects.
For non-native objects, Ray will always keep a single copy even it is referred multiple times in an object:
import ray
import numpy as np
obj = [np.zeros(42)] * 99
l = ray.get(ray.put(obj))
assert l[0] is l[1] # no problem!
Whenever possible, use numpy arrays or Python collections of numpy arrays for maximum performance.
Lock objects are mostly unserializable, because copying a lock is meaningless and could cause serious concurrency problems. You may have to come up with a workaround if your object contains a lock.
Customized Serialization
Sometimes you may want to customize your serialization process because
the default serializer used by Ray (pickle5 + cloudpickle) does
not work for you (fail to serialize some objects, too slow for certain objects, etc.).
There are at least 3 ways to define your custom serialization process:
If you want to customize the serialization of a type of objects,
and you have access to the code, you can define __reduce__
function inside the corresponding class. This is commonly done
by most Python libraries. Example code:
import ray
import sqlite3
class DBConnection:
def __init__(self, path):
self.path = path
self.conn = sqlite3.connect(path)
# without '__reduce__', the instance is unserializable.
def __reduce__(self):
deserializer = DBConnection
serialized_data = (self.path,)
return deserializer, serialized_data
original = DBConnection("/tmp/db")
print(original.conn)
copied = ray.get(ray.put(original))
print(copied.conn)
If you want to customize the serialization of a type of objects,
but you cannot access or modify the corresponding class, you can
register the class with the serializer you use:
import ray
import threading
class A:
def __init__(self, x):
self.x = x
self.lock = threading.Lock() # could not be serialized!
try:
ray.get(ray.put(A(1))) # fail!
except TypeError:
pass
def custom_serializer(a):
return a.x
def custom_deserializer(b):
return A(b)
# Register serializer and deserializer for class A:
ray.util.register_serializer(
A, serializer=custom_serializer, deserializer=custom_deserializer)
ray.get(ray.put(A(1))) # success!
# You can deregister the serializer at any time.
ray.util.deregister_serializer(A)
try:
ray.get(ray.put(A(1))) # fail!
except TypeError:
pass
# Nothing happens when deregister an unavailable serializer.
ray.util.deregister_serializer(A)
NOTE: Serializers are managed locally for each Ray worker. So for every Ray worker,
if you want to use the serializer, you need to register the serializer. Deregister
a serializer also only applies locally.
If you register a new serializer for a class, the new serializer would replace
the old serializer immediately in the worker. This API is also idempotent, there are
no side effects caused by re-registering the same serializer.
We also provide you an example, if you want to customize the serialization
of a specific object:
import threading
class A:
def __init__(self, x):
self.x = x
self.lock = threading.Lock() # could not serialize!
try:
ray.get(ray.put(A(1))) # fail!
except TypeError:
pass
class SerializationHelperForA:
"""A helper class for serialization."""
def __init__(self, a):
self.a = a
def __reduce__(self):
return A, (self.a.x,)
ray.get(ray.put(SerializationHelperForA(A(1)))) # success!
# the serializer only works for a specific object, not all A
# instances, so we still expect failure here.
try:
ray.get(ray.put(A(1))) # still fail!
except TypeError:
pass
Troubleshooting
Use ray.util.inspect_serializability to identify tricky pickling issues. This function can be used to trace a potential non-serializable object within any Python object – whether it be a function, class, or object instance.
Below, we demonstrate this behavior on a function with a non-serializable object (threading lock):
from ray.util import inspect_serializability
import threading
lock = threading.Lock()
def test():
print(lock)
inspect_serializability(test, name="test")
The resulting output is:
=============================================================
Checking Serializability of
=============================================================
!!! FAIL serialization: cannot pickle '_thread.lock' object
Detected 1 global variables. Checking serializability...
Serializing 'lock' ...
!!! FAIL serialization: cannot pickle '_thread.lock' object
WARNING: Did not find non-serializable object in . This may be an oversight.
=============================================================
Variable:
FailTuple(lock [obj=, parent=])
was found to be non-serializable. There may be multiple other undetected variables that were non-serializable.
Consider either removing the instantiation/imports of these variables or moving the instantiation into the scope of the function/class.
=============================================================
Check https://docs.ray.io/en/master/ray-core/objects/serialization.html#troubleshooting for more information.
If you have any suggestions on how to improve this error message, please reach out to the Ray developers on github.com/ray-project/ray/issues/
=============================================================
For even more detailed information, set environmental variable RAY_PICKLE_VERBOSE_DEBUG='2' before importing Ray. This enables
serialization with python-based backend instead of C-Pickle, so you can debug into python code at the middle of serialization.
However, this would make serialization much slower.
Known Issues
Users could experience memory leak when using certain python3.8 & 3.9 versions. This is due to a bug in python’s pickle module.
This issue has been solved for Python 3.8.2rc1, Python 3.9.0 alpha 4 or late versions.
Object Spilling
Ray 1.3+ spills objects to external storage once the object store is full. By default, objects are spilled to Ray’s temporary directory in the local filesystem.
Single node
Ray uses object spilling by default. Without any setting, objects are spilled to [temp_folder]/spill. On Linux and MacOS, the temp_folder is /tmp by default.
To configure the directory where objects are spilled to, use:
import ray
ray.shutdown()
import json
import ray
ray.init(
_system_config={
"object_spilling_config": json.dumps(
{"type": "filesystem", "params": {"directory_path": "/tmp/spill"}},
)
},
)
You can also specify multiple directories for spilling to spread the IO load and disk space
usage across multiple physical devices if needed (e.g., SSD devices):
ray.shutdown()
import json
import ray
ray.init(
_system_config={
"max_io_workers": 4, # More IO workers for parallelism.
"object_spilling_config": json.dumps(
{
"type": "filesystem",
"params": {
# Multiple directories can be specified to distribute
# IO across multiple mounted physical devices.
"directory_path": [
"/tmp/spill",
"/tmp/spill_1",
"/tmp/spill_2",
]
},
}
)
},
)
To optimize the performance, it is recommended to use an SSD instead of an HDD when using object spilling for memory-intensive workloads.
If you are using an HDD, it is recommended that you specify a large buffer size (> 1MB) to reduce IO requests during spilling.
ray.shutdown()
import json
import ray
ray.init(
_system_config={
"object_spilling_config": json.dumps(
{
"type": "filesystem",
"params": {
"directory_path": "/tmp/spill",
"buffer_size": 1_000_000,
}
},
)
},
)
To prevent running out of disk space, local object spilling will throw OutOfDiskError if the disk utilization exceeds the predefined threshold.
If multiple physical devices are used, any physical device’s over-usage will trigger the OutOfDiskError.
The default threshold is 0.95 (95%). You can adjust the threshold by setting local_fs_capacity_threshold, or set it to 1 to disable the protection.
ray.shutdown()
import json
import ray
ray.init(
_system_config={
# Allow spilling until the local disk is 99% utilized.
# This only affects spilling to the local file system.
"local_fs_capacity_threshold": 0.99,
"object_spilling_config": json.dumps(
{
"type": "filesystem",
"params": {
"directory_path": "/tmp/spill",
}
},
)
},
)
To enable object spilling to remote storage (any URI supported by smart_open):
ray.shutdown()
import json
import ray
ray.init(
_system_config={
"max_io_workers": 4, # More IO workers for remote storage.
"min_spilling_size": 100 * 1024 * 1024, # Spill at least 100MB at a time.
"object_spilling_config": json.dumps(
{
"type": "smart_open",
"params": {
"uri": "s3://bucket/path"
},
"buffer_size": 100 * 1024 * 1024, # Use a 100MB buffer for writes
},
)
},
)
It is recommended that you specify a large buffer size (> 1MB) to reduce IO requests during spilling.
Spilling to multiple remote storages is also supported.
ray.shutdown()
import json
import ray
ray.init(
_system_config={
"max_io_workers": 4, # More IO workers for remote storage.
"min_spilling_size": 100 * 1024 * 1024, # Spill at least 100MB at a time.
"object_spilling_config": json.dumps(
{
"type": "smart_open",
"params": {
"uri": ["s3://bucket/path1", "s3://bucket/path2", "s3://bucket/path3"],
},
"buffer_size": 100 * 1024 * 1024, # Use a 100MB buffer for writes
},
)
},
)
Remote storage support is still experimental.
Cluster mode
To enable object spilling in multi node clusters:
# Note that `object_spilling_config`'s value should be json format.
# You only need to specify the config when starting the head node, all the worker nodes will get the same config from the head node.
ray start --head --system-config='{"object_spilling_config":"{\"type\":\"filesystem\",\"params\":{\"directory_path\":\"/tmp/spill\"}}"}'
Stats
When spilling is happening, the following INFO level messages will be printed to the raylet logs (e.g., /tmp/ray/session_latest/logs/raylet.out):
local_object_manager.cc:166: Spilled 50 MiB, 1 objects, write throughput 230 MiB/s
local_object_manager.cc:334: Restored 50 MiB, 1 objects, read throughput 505 MiB/s
You can also view cluster-wide spill stats by using the ray memory command:
--- Aggregate object store stats across all nodes ---
Plasma memory usage 50 MiB, 1 objects, 50.0% full
Spilled 200 MiB, 4 objects, avg write throughput 570 MiB/s
Restored 150 MiB, 3 objects, avg read throughput 1361 MiB/s
If you only want to display cluster-wide spill stats, use ray memory --stats-only.
Environment Dependencies
Your Ray application may have dependencies that exist outside of your Ray script. For example:
Your Ray script may import/depend on some Python packages.
Your Ray script may be looking for some specific environment variables to be available.
Your Ray script may import some files outside of the script.
One frequent problem when running on a cluster is that Ray expects these “dependencies” to exist on each Ray node. If these are not present, you may run into issues such as ModuleNotFoundError, FileNotFoundError and so on.
To address this problem, you can (1) prepare your dependencies on the cluster in advance (e.g. using a container image) using the Ray Cluster Launcher, or (2) use Ray’s runtime environments to install them on the fly.
For production usage or non-changing environments, we recommend installing your dependencies into a container image and specifying the image using the Cluster Launcher.
For dynamic environments (e.g. for development and experimentation), we recommend using runtime environments.
Concepts
Ray Application. A program including a Ray script that calls ray.init() and uses Ray tasks or actors.
Dependencies, or Environment. Anything outside of the Ray script that your application needs to run, including files, packages, and environment variables.
Files. Code files, data files or other files that your Ray application needs to run.
Packages. External libraries or executables required by your Ray application, often installed via pip or conda.
Local machine and Cluster. Usually, you may want to separate the Ray cluster compute machines/pods from the machine/pod that handles and submits the application. You can submit a Ray Job via the Ray Job Submission mechanism, or use ray attach to connect to a cluster interactively. We call the machine submitting the job your local machine.
Job. A Ray job is a single application: it is the collection of Ray tasks, objects, and actors that originate from the same script.
Preparing an environment using the Ray Cluster launcher
The first way to set up dependencies is to is to prepare a single environment across the cluster before starting the Ray runtime.
You can build all your files and dependencies into a container image and specify this in your your Cluster YAML Configuration.
You can also install packages using setup_commands in the Ray Cluster configuration file (reference); these commands will be run as each node joins the cluster.
Note that for production settings, it is recommended to build any necessary packages into a container image instead.
You can push local files to the cluster using ray rsync_up (reference).
Runtime environments
This feature requires a full installation of Ray using pip install "ray[default]". This feature is available starting with Ray 1.4.0 and is currently supported on macOS and Linux, with beta support on Windows.
The second way to set up dependencies is to install them dynamically while Ray is running.
A runtime environment describes the dependencies your Ray application needs to run, including files, packages, environment variables, and more.
It is installed dynamically on the cluster at runtime and cached for future use (see Caching and Garbage Collection for details about the lifecycle).
Runtime environments can be used on top of the prepared environment from the Ray Cluster launcher if it was used.
For example, you can use the Cluster launcher to install a base set of packages, and then use runtime environments to install additional packages.
In contrast with the base cluster environment, a runtime environment will only be active for Ray processes. (For example, if using a runtime environment specifying a pip package my_pkg, the statement import my_pkg will fail if called outside of a Ray task, actor, or job.)
Runtime environments also allow you to set dependencies per-task, per-actor, and per-job on a long-running Ray cluster.
import ray
ray.shutdown()
import ray
runtime_env = {"pip": ["emoji"]}
ray.init(runtime_env=runtime_env)
@ray.remote
def f():
import emoji
return emoji.emojize('Python is :thumbs_up:')
print(ray.get(f.remote()))
Python is 👍
A runtime environment can be described by a Python dict:
runtime_env = {
"pip": ["emoji"],
"env_vars": {"TF_WARNINGS": "none"}
}
Alternatively, you can use ray.runtime_env.RuntimeEnv:
from ray.runtime_env import RuntimeEnv
runtime_env = RuntimeEnv(
pip=["emoji"],
env_vars={"TF_WARNINGS": "none"}
)
For more examples, jump to the API Reference.
There are two primary scopes for which you can specify a runtime environment:
Per-Job, and
Per-Task/Actor, within a job.
Specifying a Runtime Environment Per-Job
You can specify a runtime environment for your whole job, whether running a script directly on the cluster, using the Ray Jobs API:
# Option 1: Starting a single-node local Ray cluster or connecting to existing local cluster
ray.init(runtime_env=runtime_env)
# Option 2: Using Ray Jobs API (Python SDK)
from ray.job_submission import JobSubmissionClient
client = JobSubmissionClient("http://:8265")
job_id = client.submit_job(
entrypoint="python my_ray_script.py",
runtime_env=runtime_env,
)
# Option 3: Using Ray Jobs API (CLI). (Note: can use --runtime-env to pass a YAML file instead of an inline JSON string.)
$ ray job submit --address="http://:8265" --runtime-env-json='{"working_dir": "/data/my_files", "pip": ["emoji"]}' -- python my_ray_script.py
If using the Ray Jobs API (either the Python SDK or the CLI), specify the runtime_env argument in the submit_job call or the ray job submit, not in the ray.init() call in the entrypoint script (in this example, my_ray_script.py).
This ensures the runtime environment is installed on the cluster before the entrypoint script is run.
There are two options for when to install the runtime environment:
As soon as the job starts (i.e., as soon as ray.init() is called), the dependencies are eagerly downloaded and installed.
The dependencies are installed only when a task is invoked or an actor is created.
The default is option 1. To change the behavior to option 2, add "eager_install": False to the config of runtime_env.
Specifying a Runtime Environment Per-Task or Per-Actor
You can specify different runtime environments per-actor or per-task using .options() or the @ray.remote decorator:
# Invoke a remote task that will run in a specified runtime environment.
f.options(runtime_env=runtime_env).remote()
# Instantiate an actor that will run in a specified runtime environment.
actor = SomeClass.options(runtime_env=runtime_env).remote()
# Specify a runtime environment in the task definition. Future invocations via
# `g.remote()` will use this runtime environment unless overridden by using
# `.options()` as above.
@ray.remote(runtime_env=runtime_env)
def g():
pass
# Specify a runtime environment in the actor definition. Future instantiations
# via `MyClass.remote()` will use this runtime environment unless overridden by
# using `.options()` as above.
@ray.remote(runtime_env=runtime_env)
class MyClass:
pass
This allows you to have actors and tasks running in their own environments, independent of the surrounding environment. (The surrounding environment could be the job’s runtime environment, or the system environment of the cluster.)
Ray does not guarantee compatibility between tasks and actors with conflicting runtime environments.
For example, if an actor whose runtime environment contains a pip package tries to communicate with an actor with a different version of that package, it can lead to unexpected behavior such as unpickling errors.
Common Workflows
This section describes some common use cases for runtime environments. These use cases are not mutually exclusive; all of the options described below can be combined in a single runtime environment.
Using Local Files
Your Ray application might depend on source files or data files.
For a development workflow, these might live on your local machine, but when it comes time to run things at scale, you will need to get them to your remote cluster.
The following simple example explains how to get your local files on the cluster.
import ray
ray.shutdown()
import os
import ray
os.makedirs("/tmp/runtime_env_working_dir", exist_ok=True)
with open("/tmp/runtime_env_working_dir/hello.txt", "w") as hello_file:
hello_file.write("Hello World!")
# Specify a runtime environment for the entire Ray job
ray.init(runtime_env={"working_dir": "/tmp/runtime_env_working_dir"})
# Create a Ray task, which inherits the above runtime env.
@ray.remote
def f():
# The function will have its working directory changed to its node's
# local copy of /tmp/runtime_env_working_dir.
return open("hello.txt").read()
print(ray.get(f.remote()))
Hello World!
The example above is written to run on a local machine, but as for all of these examples, it also works when specifying a Ray cluster to connect to
(e.g., using ray.init("ray://123.456.7.89:10001", runtime_env=...) or ray.init(address="auto", runtime_env=...)).
The specified local directory will automatically be pushed to the cluster nodes when ray.init() is called.
You can also specify files via a remote cloud storage URI; see Remote URIs for details.
Using conda or pip packages
Your Ray application might depend on Python packages (for example, pendulum or requests) via import statements.
Ray ordinarily expects all imported packages to be preinstalled on every node of the cluster; in particular, these packages are not automatically shipped from your local machine to the cluster or downloaded from any repository.
However, using runtime environments you can dynamically specify packages to be automatically downloaded and installed in a virtual environment for your Ray job, or for specific Ray tasks or actors.
import ray
ray.shutdown()
import ray
import requests
# This example runs on a local machine, but you can also do
# ray.init(address=..., runtime_env=...) to connect to a cluster.
ray.init(runtime_env={"pip": ["requests"]})
@ray.remote
def reqs():
return requests.get("https://www.ray.io/").status_code
print(ray.get(reqs.remote()))
200
You may also specify your pip dependencies either via a Python list or a local requirements.txt file.
Alternatively, you can specify a conda environment, either as a Python dictionary or via a local environment.yml file. This conda environment can include pip packages.
For details, head to the API Reference.
Since the packages in the runtime_env are installed at runtime, be cautious when specifying conda or pip packages whose installations involve building from source, as this can be slow.
When using the "pip" field, the specified packages will be installed “on top of” the base environment using virtualenv, so existing packages on your cluster will still be importable. By contrast, when using the conda field, your Ray tasks and actors will run in an isolated environment. The conda and pip fields cannot both be used in a single runtime_env.
The ray[default] package itself will automatically be installed in the environment. For the conda field only, if you are using any other Ray libraries (for example, Ray Serve), then you will need to specify the library in the runtime environment (e.g. runtime_env = {"conda": {"dependencies": ["pytorch", "pip", {"pip": ["requests", "ray[serve]"]}]}}.)
conda environments must have the same Python version as the Ray cluster. Do not list ray in the conda dependencies, as it will be automatically installed.
Library Development
Suppose you are developing a library my_module on Ray.
A typical iteration cycle will involve
Making some changes to the source code of my_module
Running a Ray script to test the changes, perhaps on a distributed cluster.
To ensure your local changes show up across all Ray workers and can be imported properly, use the py_modules field.
import ray
import my_module
ray.init("ray://123.456.7.89:10001", runtime_env={"py_modules": [my_module]})
@ray.remote
def test_my_module():
# No need to import my_module inside this function.
my_module.test()
ray.get(f.remote())
Note: This feature is currently limited to modules that are packages with a single directory containing an __init__.py file. For single-file modules, you may use working_dir.
API Reference
The runtime_env is a Python dictionary or a Python class ray.runtime_env.RuntimeEnv including one or more of the following fields:
working_dir (str): Specifies the working directory for the Ray workers. This must either be (1) an local existing directory with total size at most 100 MiB, (2) a local existing zipped file with total unzipped size at most 100 MiB (Note: excludes has no effect), or (3) a URI to a remotely-stored zip file containing the working directory for your job. See Remote URIs for details.
The specified directory will be downloaded to each node on the cluster, and Ray workers will be started in their node’s copy of this directory.
Examples
"." # cwd
"/src/my_project"
"/src/my_project.zip"
"s3://path/to/my_dir.zip"
Note: Setting a local directory per-task or per-actor is currently unsupported; it can only be set per-job (i.e., in ray.init()).
Note: If the local directory contains a .gitignore file, the files and paths specified there are not uploaded to the cluster. You can disable this by setting the environment variable RAY_RUNTIME_ENV_IGNORE_GITIGNORE=1 on the machine doing the uploading.
py_modules (List[str|module]): Specifies Python modules to be available for import in the Ray workers. (For more ways to specify packages, see also the pip and conda fields below.)
Each entry must be either (1) a path to a local directory, (2) a URI to a remote zip file (see Remote URIs for details), (3) a Python module object, or (4) a path to a local whl file.
Examples of entries in the list:
"."
"/local_dependency/my_module"
"s3://bucket/my_module.zip"
my_module # Assumes my_module has already been imported, e.g. via 'import my_module'
my_module.whl
The modules will be downloaded to each node on the cluster.
Note: Setting options (1), (3) and (4) per-task or per-actor is currently unsupported, it can only be set per-job (i.e., in ray.init()).
Note: For option (1), if the local directory contains a .gitignore file, the files and paths specified there are not uploaded to the cluster. You can disable this by setting the environment variable RAY_RUNTIME_ENV_IGNORE_GITIGNORE=1 on the machine doing the uploading.
Note: This feature is currently limited to modules that are packages with a single directory containing an __init__.py file. For single-file modules, you may use working_dir.
excludes (List[str]): When used with working_dir or py_modules, specifies a list of files or paths to exclude from being uploaded to the cluster.
This field uses the pattern-matching syntax used by .gitignore files: see https://git-scm.com/docs/gitignore for details.
Note: In accordance with .gitignore syntax, if there is a separator (/) at the beginning or middle (or both) of the pattern, then the pattern is interpreted relative to the level of the working_dir.
In particular, you shouldn’t use absolute paths (e.g. /Users/my_working_dir/subdir/) with excludes; rather, you should use the relative path /subdir/ (written here with a leading / to match only the top-level subdir directory, rather than all directories named subdir at all levels.)
Example: {"working_dir": "/Users/my_working_dir/", "excludes": ["my_file.txt", "/subdir/, "path/to/dir", "*.log"]}
pip (dict | List[str] | str): Either (1) a list of pip requirements specifiers, (2) a string containing the path to a local pip
“requirements.txt” file, or (3) a python dictionary that has three fields: (a) packages (required, List[str]): a list of pip packages,
(b) pip_check (optional, bool): whether to enable pip check at the end of pip install, defaults to False.
(c) pip_version (optional, str): the version of pip; Ray will spell the package name “pip” in front of the pip_version to form the final requirement string.
The syntax of a requirement specifier is defined in full in PEP 508.
This will be installed in the Ray workers at runtime. Packages in the preinstalled cluster environment will still be available.
To use a library like Ray Serve or Ray Tune, you will need to include "ray[serve]" or "ray[tune]" here.
The Ray version must match that of the cluster.
Example: ["requests==1.0.0", "aiohttp", "ray[serve]"]
Example: "./requirements.txt"
Example: {"packages":["tensorflow", "requests"], "pip_check": False, "pip_version": "==22.0.2;python_version=='3.8.11'"}
When specifying a path to a requirements.txt file, the file must be present on your local machine and it must be a valid absolute path or relative filepath relative to your local current working directory, not relative to the working_dir specified in the runtime_env.
Furthermore, referencing local files within a requirements.txt file is not supported (e.g., -r ./my-laptop/more-requirements.txt, ./my-pkg.whl).
conda (dict | str): Either (1) a dict representing the conda environment YAML, (2) a string containing the path to a local
conda “environment.yml” file,
or (3) the name of a local conda environment already installed on each node in your cluster (e.g., "pytorch_p36").
In the first two cases, the Ray and Python dependencies will be automatically injected into the environment to ensure compatibility, so there is no need to manually include them.
The Python and Ray version must match that of the cluster, so you likely should not specify them manually.
Note that the conda and pip keys of runtime_env cannot both be specified at the same time—to use them together, please use conda and add your pip dependencies in the "pip" field in your conda environment.yaml.
Example: {"dependencies": ["pytorch", "torchvision", "pip", {"pip": ["pendulum"]}]}
Example: "./environment.yml"
Example: "pytorch_p36"
When specifying a path to a environment.yml file, the file must be present on your local machine and it must be a valid absolute path or a relative filepath relative to your local current working directory, not relative to the working_dir specified in the runtime_env.
Furthermore, referencing local files within a environment.yml file is not supported.
env_vars (Dict[str, str]): Environment variables to set. Environment variables already set on the cluster will still be visible to the Ray workers; so there is
no need to include os.environ or similar in the env_vars field.
By default, these environment variables override the same name environment variables on the cluster.
You can also reference existing environment variables using ${ENV_VAR} to achieve the appending behavior.
Only PATH, LD_LIBRARY_PATH, DYLD_LIBRARY_PATH, and LD_PRELOAD are supported. See below for an example:
Example: {"OMP_NUM_THREADS": "32", "TF_WARNINGS": "none"}
Example: {"LD_LIBRARY_PATH": "${LD_LIBRARY_PATH}:/home/admin/my_lib"}
container (dict): Require a given (Docker) image, and the worker process will run in a container with this image.
The worker_path is the default_worker.py path. It is required only if ray installation directory in the container is different from raylet host.
The run_options list spec is here.
Example: {"image": "anyscale/ray-ml:nightly-py38-cpu", "worker_path": "/root/python/ray/workers/default_worker.py", "run_options": ["--cap-drop SYS_ADMIN","--log-level=debug"]}
Note: container is experimental now. If you have some requirements or run into any problems, raise issues in github.
config (dict | ray.runtime_env.RuntimeEnvConfig): config for runtime environment. Either a dict or a RuntimeEnvConfig.
Fields:
(1) setup_timeout_seconds, the timeout of runtime environment creation, timeout is in seconds.
Example: {"setup_timeout_seconds": 10}
Example: RuntimeEnvConfig(setup_timeout_seconds=10)
(2) eager_install (bool): Indicates whether to install the runtime environment on the cluster at ray.init() time, before the workers are leased. This flag is set to True by default.
If set to False, the runtime environment will be only installed when the first task is invoked or when the first actor is created.
Currently, specifying this option per-actor or per-task is not supported.
Example: {"eager_install": False}
Example: RuntimeEnvConfig(eager_install=False)
Caching and Garbage Collection
Runtime environment resources on each node (such as conda environments, pip packages, or downloaded working_dir or py_modules directories) will be cached on the cluster to enable quick reuse across different runtime environments within a job. Each field (working_dir, py_modules, etc.) has its own cache whose size defaults to 10 GB. To change this default, you may set the environment variable RAY_RUNTIME_ENV__CACHE_SIZE_GB on each node in your cluster before starting Ray e.g. export RAY_RUNTIME_ENV_WORKING_DIR_CACHE_SIZE_GB=1.5.
When the cache size limit is exceeded, resources not currently used by any actor, task or job will be deleted.
Inheritance
The runtime environment is inheritable, so it will apply to all tasks/actors within a job and all child tasks/actors of a task or actor once set, unless it is overridden.
If an actor or task specifies a new runtime_env, it will override the parent’s runtime_env (i.e., the parent actor/task’s runtime_env, or the job’s runtime_env if there is no parent actor or task) as follows:
The runtime_env["env_vars"] field will be merged with the runtime_env["env_vars"] field of the parent.
This allows for environment variables set in the parent’s runtime environment to be automatically propagated to the child, even if new environment variables are set in the child’s runtime environment.
Every other field in the runtime_env will be overridden by the child, not merged. For example, if runtime_env["py_modules"] is specified, it will replace the runtime_env["py_modules"] field of the parent.
Example:
# Parent's `runtime_env`
{"pip": ["requests", "chess"],
"env_vars": {"A": "a", "B": "b"}}
# Child's specified `runtime_env`
{"pip": ["torch", "ray[serve]"],
"env_vars": {"B": "new", "C": "c"}}
# Child's actual `runtime_env` (merged with parent's)
{"pip": ["torch", "ray[serve]"],
"env_vars": {"A": "a", "B": "new", "C": "c"}}
Frequently Asked Questions
Are environments installed on every node?
If a runtime environment is specified in ray.init(runtime_env=...), then the environment will be installed on every node. See Per-Job for more details.
(Note, by default the runtime environment will be installed eagerly on every node in the cluster. If you want to lazily install the runtime environment on demand, set the eager_install option to false: ray.init(runtime_env={..., "config": {"eager_install": False}}.)
When is the environment installed?
When specified per-job, the environment is installed when you call ray.init() (unless "eager_install": False is set).
When specified per-task or per-actor, the environment is installed when the task is invoked or the actor is instantiated (i.e. when you call my_task.remote() or my_actor.remote().)
See Per-Job Per-Task/Actor, within a job for more details.
Where are the environments cached?
Any local files downloaded by the environments are cached at /tmp/ray/session_latest/runtime_resources.
How long does it take to install or to load from cache?
The install time usually mostly consists of the time it takes to run pip install or conda create / conda activate, or to upload/download a working_dir, depending on which runtime_env options you’re using.
This could take seconds or minutes.
On the other hand, loading a runtime environment from the cache should be nearly as fast as the ordinary Ray worker startup time, which is on the order of a few seconds. A new Ray worker is started for every Ray actor or task that requires a new runtime environment.
(Note that loading a cached conda environment could still be slow, since the conda activate command sometimes takes a few seconds.)
You can set setup_timeout_seconds config to avoid the installation hanging for a long time. If the installation is not finished within this time, your tasks or actors will fail to start.
What is the relationship between runtime environments and Docker?
They can be used independently or together.
A container image can be specified in the Cluster Launcher for large or static dependencies, and runtime environments can be specified per-job or per-task/actor for more dynamic use cases.
The runtime environment will inherit packages, files, and environment variables from the container image.
My runtime_env was installed, but when I log into the node I can’t import the packages.
The runtime environment is only active for the Ray worker processes; it does not install any packages “globally” on the node.
Remote URIs
The working_dir and py_modules arguments in the runtime_env dictionary can specify either local path(s) or remote URI(s).
A local path must be a directory path. The directory’s contents will be directly accessed as the working_dir or a py_module.
A remote URI must be a link directly to a zip file. The zip file must contain only a single top-level directory.
The contents of this directory will be directly accessed as the working_dir or a py_module.
For example, suppose you want to use the contents in your local /some_path/example_dir directory as your working_dir.
If you want to specify this directory as a local path, your runtime_env dictionary should contain:
runtime_env = {..., "working_dir": "/some_path/example_dir", ...}
Suppose instead you want to host your files in your /some_path/example_dir directory remotely and provide a remote URI.
You would need to first compress the example_dir directory into a zip file.
There should be no other files or directories at the top level of the zip file, other than example_dir.
You can use the following command in the Terminal to do this:
cd /some_path
zip -r zip_file_name.zip example_dir
Note that this command must be run from the parent directory of the desired working_dir to ensure that the resulting zip file contains a single top-level directory.
In general, the zip file’s name and the top-level directory’s name can be anything.
The top-level directory’s contents will be used as the working_dir (or py_module).
You can check that the zip file contains a single top-level directory by running the following command in the Terminal:
zipinfo -1 zip_file_name.zip
# example_dir/
# example_dir/my_file_1.txt
# example_dir/subdir/my_file_2.txt
Suppose you upload the compressed example_dir directory to AWS S3 at the S3 URI s3://example_bucket/example.zip.
Your runtime_env dictionary should contain:
runtime_env = {..., "working_dir": "s3://example_bucket/example.zip", ...}
Check for hidden files and metadata directories in zipped dependencies.
You can inspect a zip file’s contents by running the zipinfo -1 zip_file_name.zip command in the Terminal.
Some zipping methods can cause hidden files or metadata directories to appear in the zip file at the top level.
To avoid this, use the zip -r command directly on the directory you want to compress from its parent’s directory. For example, if you have a directory structure such as: a/b and you what to compress b, issue the zip -r b command from the directory a.
If Ray detects more than a single directory at the top level, it will use the entire zip file instead of the top-level directory, which may lead to unexpected behavior.
Currently, three types of remote URIs are supported for hosting working_dir and py_modules packages:
HTTPS: HTTPS refers to URLs that start with https.
These are particularly useful because remote Git providers (e.g. GitHub, Bitbucket, GitLab, etc.) use https URLs as download links for repository archives.
This allows you to host your dependencies on remote Git providers, push updates to them, and specify which dependency versions (i.e. commits) your jobs should use.
To use packages via HTTPS URIs, you must have the smart_open library (you can install it using pip install smart_open).
Example:
runtime_env = {"working_dir": "https://github.com/example_username/example_respository/archive/HEAD.zip"}
S3: S3 refers to URIs starting with s3:// that point to compressed packages stored in AWS S3.
To use packages via S3 URIs, you must have the smart_open and boto3 libraries (you can install them using pip install smart_open and pip install boto3).
Ray does not explicitly pass in any credentials to boto3 for authentication.
boto3 will use your environment variables, shared credentials file, and/or AWS config file to authenticate access.
See the AWS boto3 documentation to learn how to configure these.
Example:
runtime_env = {"working_dir": "s3://example_bucket/example_file.zip"}
GS: GS refers to URIs starting with gs:// that point to compressed packages stored in Google Cloud Storage.
To use packages via GS URIs, you must have the smart_open and google-cloud-storage libraries (you can install them using pip install smart_open and pip install google-cloud-storage).
Ray does not explicitly pass in any credentials to the google-cloud-storage’s Client object.
google-cloud-storage will use your local service account key(s) and environment variables by default.
Follow the steps on Google Cloud Storage’s Getting started with authentication guide to set up your credentials, which allow Ray to access your remote package.
Example:
runtime_env = {"working_dir": "gs://example_bucket/example_file.zip"}
Note that the smart_open, boto3, and google-cloud-storage packages are not installed by default, and it is not sufficient to specify them in the pip section of your runtime_env.
The relevant packages must already be installed on all nodes of the cluster when Ray starts.
Hosting a Dependency on a Remote Git Provider: Step-by-Step Guide
You can store your dependencies in repositories on a remote Git provider (e.g. GitHub, Bitbucket, GitLab, etc.), and you can periodically push changes to keep them updated.
In this section, you will learn how to store a dependency on GitHub and use it in your runtime environment.
These steps will also be useful if you use another large, remote Git provider (e.g. BitBucket, GitLab, etc.).
For simplicity, this section refers to GitHub alone, but you can follow along on your provider.
First, create a repository on GitHub to store your working_dir contents or your py_module dependency.
By default, when you download a zip file of your repository, the zip file will already contain a single top-level directory that holds the repository contents,
so you can directly upload your working_dir contents or your py_module dependency to the GitHub repository.
Once you have uploaded your working_dir contents or your py_module dependency, you need the HTTPS URL of the repository zip file, so you can specify it in your runtime_env dictionary.
You have two options to get the HTTPS URL.
Option 1: Download Zip (quicker to implement, but not recommended for production environments)
The first option is to use the remote Git provider’s “Download Zip” feature, which provides an HTTPS link that zips and downloads your repository.
This is quick, but it is not recommended because it only allows you to download a zip file of a repository branch’s latest commit.
To find a GitHub URL, navigate to your repository on GitHub, choose a branch, and click on the green “Code” drop down button:
This will drop down a menu that provides three options: “Clone” which provides HTTPS/SSH links to clone the repository,
“Open with GitHub Desktop”, and “Download ZIP.”
Right-click on “Download Zip.”
This will open a pop-up near your cursor. Select “Copy Link Address”:
Now your HTTPS link is copied to your clipboard. You can paste it into your runtime_env dictionary.
Using the HTTPS URL from your Git provider’s “Download as Zip” feature is not recommended if the URL always points to the latest commit.
For instance, using this method on GitHub generates a link that always points to the latest commit on the chosen branch.
By specifying this link in the runtime_env dictionary, your Ray Cluster always uses the chosen branch’s latest commit.
This creates a consistency risk: if you push an update to your remote Git repository while your cluster’s nodes are pulling the repository’s contents,
some nodes may pull the version of your package just before you pushed, and some nodes may pull the version just after.
For consistency, it is better to specify a particular commit, so all the nodes use the same package.
See “Option 2: Manually Create URL” to create a URL pointing to a specific commit.
Option 2: Manually Create URL (slower to implement, but recommended for production environments)
The second option is to manually create this URL by pattern-matching your specific use case with one of the following examples.
This is recommended because it provides finer-grained control over which repository branch and commit to use when generating your dependency zip file.
These options prevent consistency issues on Ray Clusters (see the warning above for more info).
To create the URL, pick a URL template below that fits your use case, and fill in all parameters in brackets (e.g. [username], [repository], etc.) with the specific values from your repository.
For instance, suppose your GitHub username is example_user, the repository’s name is example_repository, and the desired commit hash is abcdefg.
If example_repository is public and you want to retrieve the abcdefg commit (which matches the first example use case), the URL would be:
runtime_env = {"working_dir": ("https://github.com"
"/example_user/example_repository/archive/abcdefg.zip")}
Here is a list of different use cases and corresponding URLs:
Example: Retrieve package from a specific commit hash on a public GitHub repository
runtime_env = {"working_dir": ("https://github.com"
"/[username]/[repository]/archive/[commit hash].zip")}
Example: Retrieve package from a private GitHub repository using a Personal Access Token during development. For production see this document to learn how to authenticate private dependencies safely.
runtime_env = {"working_dir": ("https://[username]:[personal access token]@github.com"
"/[username]/[private repository]/archive/[commit hash].zip")}
Example: Retrieve package from a public GitHub repository’s latest commit
runtime_env = {"working_dir": ("https://github.com"
"/[username]/[repository]/archive/HEAD.zip")}
Example: Retrieve package from a specific commit hash on a public Bitbucket repository
runtime_env = {"working_dir": ("https://bitbucket.org"
"/[owner]/[repository]/get/[commit hash].tar.gz")}
It is recommended to specify a particular commit instead of always using the latest commit.
This prevents consistency issues on a multi-node Ray Cluster.
See the warning below “Option 1: Download Zip” for more info.
Once you have specified the URL in your runtime_env dictionary, you can pass the dictionary
into a ray.init() or .options() call. Congratulations! You have now hosted a runtime_env dependency
remotely on GitHub!
Debugging
If runtime_env cannot be set up (e.g., network issues, download failures, etc.), Ray will fail to schedule tasks/actors
that require the runtime_env. If you call ray.get, it will raise RuntimeEnvSetupError with
the error message in detail.
import ray
import time
@ray.remote
def f():
pass
@ray.remote
class A:
def f(self):
pass
start = time.time()
bad_env = {"conda": {"dependencies": ["this_doesnt_exist"]}}
# [Tasks] will raise `RuntimeEnvSetupError`.
try:
ray.get(f.options(runtime_env=bad_env).remote())
except ray.exceptions.RuntimeEnvSetupError:
print("Task fails with RuntimeEnvSetupError")
# [Actors] will raise `RuntimeEnvSetupError`.
a = A.options(runtime_env=bad_env).remote()
try:
ray.get(a.f.remote())
except ray.exceptions.RuntimeEnvSetupError:
print("Actor fails with RuntimeEnvSetupError")
Task fails with RuntimeEnvSetupError
Actor fails with RuntimeEnvSetupError
Full logs can always be found in the file runtime_env_setup-[job_id].log for per-actor, per-task and per-job environments, or in
runtime_env_setup-ray_client_server_[port].log for per-job environments when using Ray Client.
You can also enable runtime_env debugging log streaming by setting an environment variable RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED=1 on each node before starting Ray, for example using setup_commands in the Ray Cluster configuration file (reference).
This will print the full runtime_env setup log messages to the driver (the script that calls ray.init()).
Example log output:
ray.shutdown()
ray.init(runtime_env={"pip": ["requests"]})
(pid=runtime_env) 2022-02-28 14:12:33,653 INFO pip.py:188 -- Creating virtualenv at /tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/virtualenv, current python dir /Users/user/anaconda3/envs/ray-py38
(pid=runtime_env) 2022-02-28 14:12:33,653 INFO utils.py:76 -- Run cmd[1] ['/Users/user/anaconda3/envs/ray-py38/bin/python', '-m', 'virtualenv', '--app-data', '/tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/virtualenv_app_data', '--reset-app-data', '--no-periodic-update', '--system-site-packages', '--no-download', '/tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/virtualenv']
(pid=runtime_env) 2022-02-28 14:12:34,267 INFO utils.py:97 -- Output of cmd[1]: created virtual environment CPython3.8.11.final.0-64 in 473ms
(pid=runtime_env) creator CPython3Posix(dest=/private/tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/virtualenv, clear=False, no_vcs_ignore=False, global=True)
(pid=runtime_env) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/private/tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/virtualenv_app_data)
(pid=runtime_env) added seed packages: pip==22.0.3, setuptools==60.6.0, wheel==0.37.1
(pid=runtime_env) activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
(pid=runtime_env)
(pid=runtime_env) 2022-02-28 14:12:34,268 INFO utils.py:76 -- Run cmd[2] ['/tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/virtualenv/bin/python', '-c', 'import ray; print(ray.__version__, ray.__path__[0])']
(pid=runtime_env) 2022-02-28 14:12:35,118 INFO utils.py:97 -- Output of cmd[2]: 3.0.0.dev0 /Users/user/ray/python/ray
(pid=runtime_env)
(pid=runtime_env) 2022-02-28 14:12:35,120 INFO pip.py:236 -- Installing python requirements to /tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/virtualenv
(pid=runtime_env) 2022-02-28 14:12:35,122 INFO utils.py:76 -- Run cmd[3] ['/tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/virtualenv/bin/python', '-m', 'pip', 'install', '--disable-pip-version-check', '--no-cache-dir', '-r', '/tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/requirements.txt']
(pid=runtime_env) 2022-02-28 14:12:38,000 INFO utils.py:97 -- Output of cmd[3]: Requirement already satisfied: requests in /Users/user/anaconda3/envs/ray-py38/lib/python3.8/site-packages (from -r /tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/requirements.txt (line 1)) (2.26.0)
(pid=runtime_env) Requirement already satisfied: idna<4,>=2.5 in /Users/user/anaconda3/envs/ray-py38/lib/python3.8/site-packages (from requests->-r /tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/requirements.txt (line 1)) (3.2)
(pid=runtime_env) Requirement already satisfied: certifi>=2017.4.17 in /Users/user/anaconda3/envs/ray-py38/lib/python3.8/site-packages (from requests->-r /tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/requirements.txt (line 1)) (2021.10.8)
(pid=runtime_env) Requirement already satisfied: urllib3<1.27,>=1.21.1 in /Users/user/anaconda3/envs/ray-py38/lib/python3.8/site-packages (from requests->-r /tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/requirements.txt (line 1)) (1.26.7)
(pid=runtime_env) Requirement already satisfied: charset-normalizer~=2.0.0 in /Users/user/anaconda3/envs/ray-py38/lib/python3.8/site-packages (from requests->-r /tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/requirements.txt (line 1)) (2.0.6)
(pid=runtime_env)
(pid=runtime_env) 2022-02-28 14:12:38,001 INFO utils.py:76 -- Run cmd[4] ['/tmp/ray/session_2022-02-28_14-12-29_909064_87908/runtime_resources/pip/0cc818a054853c3841171109300436cad4dcf594/virtualenv/bin/python', '-c', 'import ray; print(ray.__version__, ray.__path__[0])']
(pid=runtime_env) 2022-02-28 14:12:38,804 INFO utils.py:97 -- Output of cmd[4]: 3.0.0.dev0 /Users/user/ray/python/ray
See Logging Directory Structure for more details.
Scheduling
For each task or actor, Ray will choose a node to run it and the scheduling decision is based on the following factors.
Resources
Each task or actor has the specified resource requirements.
Given that, a node can be in one of the following states:
Feasible: the node has the required resources to run the task or actor.
Depending on the current availability of these resources, there are two sub-states:
Available: the node has the required resources and they are free now.
Unavailable: the node has the required resources but they are currently being used by other tasks or actors.
Infeasible: the node doesn’t have the required resources. For example a CPU-only node is infeasible for a GPU task.
Resource requirements are hard requirements meaning that only feasible nodes are eligible to run the task or actor.
If there are feasible nodes, Ray will either choose an available node or wait until a unavailable node to become available
depending on other factors discussed below.
If all nodes are infeasible, the task or actor cannot be scheduled until feasible nodes are added to the cluster.
Scheduling Strategies
Tasks or actors support a scheduling_strategy option to specify the strategy used to decide the best node among feasible nodes.
Currently the supported strategies are the followings.
“DEFAULT”
"DEFAULT" is the default strategy used by Ray.
Ray schedules tasks or actors onto a group of the top k nodes.
Specifically, the nodes are sorted to first favor those that already have tasks or actors scheduled (for locality),
then to favor those that have low resource utilization (for load balancing).
Within the top k group, nodes are chosen randomly to further improve load-balancing and mitigate delays from cold-start in large clusters.
Implementation-wise, Ray calculates a score for each node in a cluster based on the utilization of its logical resources.
If the utilization is below a threshold (controlled by the OS environment variable RAY_scheduler_spread_threshold, default is 0.5), the score is 0,
otherwise it is the resource utilization itself (score 1 means the node is fully utilized).
Ray selects the best node for scheduling by randomly picking from the top k nodes with the lowest scores.
The value of k is the max of (number of nodes in the cluster * RAY_scheduler_top_k_fraction environment variable) and RAY_scheduler_top_k_absolute environment variable.
By default, it’s 20% of the total number of nodes.
Currently Ray handles actors that don’t require any resources (i.e., num_cpus=0 with no other resources) specially by randomly choosing a node in the cluster without considering resource utilization.
Since nodes are randomly chosen, actors that don’t require any resources are effectively SPREAD across the cluster.
@ray.remote
def func():
return 1
@ray.remote(num_cpus=1)
class Actor:
pass
# If unspecified, "DEFAULT" scheduling strategy is used.
func.remote()
actor = Actor.remote()
# Explicitly set scheduling strategy to "DEFAULT".
func.options(scheduling_strategy="DEFAULT").remote()
actor = Actor.options(scheduling_strategy="DEFAULT").remote()
# Zero-CPU (and no other resources) actors are randomly assigned to nodes.
actor = Actor.options(num_cpus=0).remote()
“SPREAD”
"SPREAD" strategy will try to spread the tasks or actors among available nodes.
@ray.remote(scheduling_strategy="SPREAD")
def spread_func():
return 2
@ray.remote(num_cpus=1)
class SpreadActor:
pass
# Spread tasks across the cluster.
[spread_func.remote() for _ in range(10)]
# Spread actors across the cluster.
actors = [SpreadActor.options(scheduling_strategy="SPREAD").remote() for _ in range(10)]
PlacementGroupSchedulingStrategy
PlacementGroupSchedulingStrategy will schedule the task or actor to where the placement group is located.
This is useful for actor gang scheduling. See Placement Group for more details.
NodeAffinitySchedulingStrategy
NodeAffinitySchedulingStrategy is a low-level strategy that allows a task or actor to be scheduled onto a particular node specified by its node id.
The soft flag specifies whether the task or actor is allowed to run somewhere else if the specified node doesn’t exist (e.g. if the node dies)
or is infeasible because it does not have the resources required to run the task or actor.
In these cases, if soft is True, the task or actor will be scheduled onto a different feasible node.
Otherwise, the task or actor will fail with TaskUnschedulableError or ActorUnschedulableError.
As long as the specified node is alive and feasible, the task or actor will only run there
regardless of the soft flag. This means if the node currently has no available resources, the task or actor will wait until resources
become available.
This strategy should only be used if other high level scheduling strategies (e.g. placement group) cannot give the
desired task or actor placements. It has the following known limitations:
It’s a low-level strategy which prevents optimizations by a smart scheduler.
It cannot fully utilize an autoscaling cluster since node ids must be known when the tasks or actors are created.
It can be difficult to make the best static placement decision
especially in a multi-tenant cluster: for example, an application won’t know what else is being scheduled onto the same nodes.
@ray.remote
def node_affinity_func():
return ray.get_runtime_context().get_node_id()
@ray.remote(num_cpus=1)
class NodeAffinityActor:
pass
# Only run the task on the local node.
node_affinity_func.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().get_node_id(),
soft=False,
)
).remote()
# Run the two node_affinity_func tasks on the same node if possible.
node_affinity_func.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get(node_affinity_func.remote()),
soft=True,
)
).remote()
# Only run the actor on the local node.
actor = NodeAffinityActor.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().get_node_id(),
soft=False,
)
).remote()
Locality-Aware Scheduling
By default, Ray prefers available nodes that have large task arguments local
to avoid transferring data over the network. If there are multiple large task arguments,
the node with most object bytes local is preferred.
This takes precedence over the "DEFAULT" scheduling strategy,
which means Ray will try to run the task on the locality preferred node regardless of the node resource utilization.
However, if the locality preferred node is not available, Ray may run the task somewhere else.
When other scheduling strategies are specified,
they have higher precedence and data locality is no longer considered.
Locality-aware scheduling is only for tasks not actors.
@ray.remote
def large_object_func():
# Large object is stored in the local object store
# and available in the distributed memory,
# instead of returning inline directly to the caller.
return [1] * (1024 * 1024)
@ray.remote
def small_object_func():
# Small object is returned inline directly to the caller,
# instead of storing in the distributed memory.
return [1]
@ray.remote
def consume_func(data):
return len(data)
large_object = large_object_func.remote()
small_object = small_object_func.remote()
# Ray will try to run consume_func on the same node
# where large_object_func runs.
consume_func.remote(large_object)
# Ray will try to spread consume_func across the entire cluster
# instead of only running on the node where large_object_func runs.
[
consume_func.options(scheduling_strategy="SPREAD").remote(large_object)
for i in range(10)
]
# Ray won't consider locality for scheduling consume_func
# since the argument is small and will be sent to the worker node inline directly.
consume_func.remote(small_object)
More about Ray Scheduling
Resources
Ray allows you to seamlessly scale your applications from a laptop to a cluster without code change.
Ray resources are key to this capability.
They abstract away physical machines and let you express your computation in terms of resources,
while the system manages scheduling and autoscaling based on resource requests.
A resource in Ray is a key-value pair where the key denotes a resource name, and the value is a float quantity.
For convenience, Ray has native support for CPU, GPU, and memory resource types; CPU, GPU and memory are called pre-defined resources.
Besides those, Ray also supports custom resources.
Physical Resources and Logical Resources
Physical resources are resources that a machine physically has such as physical CPUs and GPUs
and logical resources are virtual resources defined by a system.
Ray resources are logical and don’t need to have 1-to-1 mapping with physical resources.
For example, you can start a Ray head node with 3 GPUs via ray start --head --num-gpus=3 even if it physically has zero.
They are mainly used for admission control during scheduling.
The fact that resources are logical has several implications:
Resource requirements of tasks or actors do NOT impose limits on actual physical resource usage.
For example, Ray doesn’t prevent a num_cpus=1 task from launching multiple threads and using multiple physical CPUs.
It’s your responsibility to make sure tasks or actors use no more resources than specified via resource requirements.
Ray doesn’t provide CPU isolation for tasks or actors.
For example, Ray won’t reserve a physical CPU exclusively and pin a num_cpus=1 task to it.
Ray will let the operating system schedule and run the task instead.
If needed, you can use operating system APIs like sched_setaffinity to pin a task to a physical CPU.
Ray does provide GPU isolation in the form of visible devices by automatically setting the CUDA_VISIBLE_DEVICES environment variable,
which most ML frameworks will respect for purposes of GPU assignment.
Physical resources vs logical resources
Custom Resources
Besides pre-defined resources, you can also specify a Ray node’s custom resources and request them in your tasks or actors.
Some use cases for custom resources:
Your node has special hardware and you can represent it as a custom resource.
Then your tasks or actors can request the custom resource via @ray.remote(resources={"special_hardware": 1})
and Ray will schedule the tasks or actors to the node that has the custom resource.
You can use custom resources as labels to tag nodes and you can achieve label based affinity scheduling.
For example, you can do ray.remote(resources={"custom_label": 0.001}) to schedule tasks or actors to nodes with custom_label custom resource.
For this use case, the actual quantity doesn’t matter, and the convention is to specify a tiny number so that the label resource is
not the limiting factor for parallelism.
Specifying Node Resources
By default, Ray nodes start with pre-defined CPU, GPU, and memory resources. The quantities of these resources on each node are set to the physical quantities auto detected by Ray.
By default, logical resources are configured by the following rule.
Ray does not permit dynamic updates of resource capacities after Ray has been started on a node.
Number of logical CPUs (``num_cpus``): Set to the number of CPUs of the machine/container.
Number of logical GPUs (``num_gpus``): Set to the number of GPUs of the machine/container.
Memory (``memory``): Set to 70% of “available memory” when ray runtime starts.
Object Store Memory (``object_store_memory``): Set to 30% of “available memory” when ray runtime starts. Note that the object store memory is not logical resource, and users cannot use it for scheduling.
However, you can always override that by manually specifying the quantities of pre-defined resources and adding custom resources.
There are several ways to do that depending on how you start the Ray cluster:
ray.init()
If you are using ray.init() to start a single node Ray cluster, you can do the following to manually specify node resources:
# This will start a Ray node with 3 logical cpus, 4 logical gpus,
# 1 special_hardware resource and 1 custom_label resource.
ray.init(num_cpus=3, num_gpus=4, resources={"special_hardware": 1, "custom_label": 1})
ray start
If you are using ray start to start a Ray node, you can run:
ray start --head --num-cpus=3 --num-gpus=4 --resources='{"special_hardware": 1, "custom_label": 1}'
ray up
If you are using ray up to start a Ray cluster, you can set the resources field in the yaml file:
available_node_types:
head:
...
resources:
CPU: 3
GPU: 4
special_hardware: 1
custom_label: 1
KubeRay
If you are using KubeRay to start a Ray cluster, you can set the rayStartParams field in the yaml file:
headGroupSpec:
rayStartParams:
num-cpus: "3"
num-gpus: "4"
resources: '"{\"special_hardware\": 1, \"custom_label\": 1}"'
Specifying Task or Actor Resource Requirements
Ray allows specifying a task or actor’s resource requirements (e.g., CPU, GPU, and custom resources).
The task or actor will only run on a node if there are enough required resources
available to execute the task or actor.
By default, Ray tasks use 1 CPU resource and Ray actors use 1 CPU for scheduling and 0 CPU for running
(This means, by default, actors cannot get scheduled on a zero-cpu node, but an infinite number of them can run on any non-zero cpu node.
The default resource requirements for actors was chosen for historical reasons.
It’s recommended to always explicitly set num_cpus for actors to avoid any surprises.
If resources are specified explicitly, they are required for both scheduling and running.)
You can also explicitly specify a task’s or actor’s resource requirements (for example, one task may require a GPU) instead of using default ones via ray.remote()
and task.options()/actor.options().
Python
# Specify the default resource requirements for this remote function.
@ray.remote(num_cpus=2, num_gpus=2, resources={"special_hardware": 1})
def func():
return 1
# You can override the default resource requirements.
func.options(num_cpus=3, num_gpus=1, resources={"special_hardware": 0}).remote()
@ray.remote(num_cpus=0, num_gpus=1)
class Actor:
pass
# You can override the default resource requirements for actors as well.
actor = Actor.options(num_cpus=1, num_gpus=0).remote()
Java
// Specify required resources.
Ray.task(MyRayApp::myFunction).setResource("CPU", 1.0).setResource("GPU", 1.0).setResource("special_hardware", 1.0).remote();
Ray.actor(Counter::new).setResource("CPU", 2.0).setResource("GPU", 1.0).remote();
C++
// Specify required resources.
ray::Task(MyFunction).SetResource("CPU", 1.0).SetResource("GPU", 1.0).SetResource("special_hardware", 1.0).Remote();
ray::Actor(CreateCounter).SetResource("CPU", 2.0).SetResource("GPU", 1.0).Remote();
Task and actor resource requirements have implications for the Ray’s scheduling concurrency.
In particular, the sum of the resource requirements of all of the
concurrently executing tasks and actors on a given node cannot exceed the node’s total resources.
This property can be used to limit the number of concurrently running tasks or actors to avoid issues like OOM.
Fractional Resource Requirements
Ray supports fractional resource requirements.
For example, if your task or actor is IO bound and has low CPU usage, you can specify fractional CPU num_cpus=0.5 or even zero CPU num_cpus=0.
The precision of the fractional resource requirement is 0.0001 so you should avoid specifying a double that’s beyond that precision.
@ray.remote(num_cpus=0.5)
def io_bound_task():
import time
time.sleep(1)
return 2
io_bound_task.remote()
@ray.remote(num_gpus=0.5)
class IOActor:
def ping(self):
import os
print(f"CUDA_VISIBLE_DEVICES: {os.environ['CUDA_VISIBLE_DEVICES']}")
# Two actors can share the same GPU.
io_actor1 = IOActor.remote()
io_actor2 = IOActor.remote()
ray.get(io_actor1.ping.remote())
ray.get(io_actor2.ping.remote())
# Output:
# (IOActor pid=96328) CUDA_VISIBLE_DEVICES: 1
# (IOActor pid=96329) CUDA_VISIBLE_DEVICES: 1
Besides resource requirements, you can also specify an environment for a task or actor to run in,
which can include Python packages, local files, environment variables, and more—see Runtime Environments for details.
GPU Support
GPUs are critical for many machine learning applications.
Ray natively supports GPU as a pre-defined resource type and allows tasks and actors to specify their GPU resource requirements.
Starting Ray Nodes with GPUs
By default, Ray will set the quantity of GPU resources of a node to the physical quantities of GPUs auto detected by Ray.
If you need to, you can override this.
There is nothing preventing you from specifying a larger value of
num_gpus than the true number of GPUs on the machine given Ray resources are logical.
In this case, Ray will act as if the machine has the number of GPUs you specified
for the purposes of scheduling tasks and actors that require GPUs.
Trouble will only occur if those tasks and actors
attempt to actually use GPUs that don’t exist.
You can set CUDA_VISIBLE_DEVICES environment variable before starting a Ray node
to limit the GPUs that are visible to Ray.
For example, CUDA_VISIBLE_DEVICES=1,3 ray start --head --num-gpus=2
will let Ray only see devices 1 and 3.
Using GPUs in Tasks and Actors
If a task or actor requires GPUs, you can specify the corresponding resource requirements (e.g. @ray.remote(num_gpus=1)).
Ray will then schedule the task or actor to a node that has enough free GPU resources
and assign GPUs to the task or actor by setting the CUDA_VISIBLE_DEVICES environment variable before running the task or actor code.
import os
import ray
ray.init(num_gpus=2)
@ray.remote(num_gpus=1)
class GPUActor:
def ping(self):
print("ray.get_gpu_ids(): {}".format(ray.get_gpu_ids()))
print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))
@ray.remote(num_gpus=1)
def use_gpu():
print("ray.get_gpu_ids(): {}".format(ray.get_gpu_ids()))
print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))
gpu_actor = GPUActor.remote()
ray.get(gpu_actor.ping.remote())
# The actor uses the first GPU so the task will use the second one.
ray.get(use_gpu.remote())
# (GPUActor pid=52420) ray.get_gpu_ids(): [0]
# (GPUActor pid=52420) CUDA_VISIBLE_DEVICES: 0
# (use_gpu pid=51830) ray.get_gpu_ids(): [1]
# (use_gpu pid=51830) CUDA_VISIBLE_DEVICES: 1
Inside a task or actor, ray.get_gpu_ids() will return a
list of GPU IDs that are available to the task or actor.
Typically, it is not necessary to call ray.get_gpu_ids() because Ray will
automatically set the CUDA_VISIBLE_DEVICES environment variable,
which most ML frameworks will respect for purposes of GPU assignment.
Note: The function use_gpu defined above doesn’t actually use any
GPUs. Ray will schedule it on a node which has at least one GPU, and will
reserve one GPU for it while it is being executed, however it is up to the
function to actually make use of the GPU. This is typically done through an
external library like TensorFlow. Here is an example that actually uses GPUs.
In order for this example to work, you will need to install the GPU version of
TensorFlow.
@ray.remote(num_gpus=1)
def use_gpu():
import tensorflow as tf
# Create a TensorFlow session. TensorFlow will restrict itself to use the
# GPUs specified by the CUDA_VISIBLE_DEVICES environment variable.
tf.Session()
Note: It is certainly possible for the person implementing use_gpu to
ignore ray.get_gpu_ids() and to use all of the GPUs on the machine. Ray does
not prevent this from happening, and this can lead to too many tasks or actors using the
same GPU at the same time. However, Ray does automatically set the
CUDA_VISIBLE_DEVICES environment variable, which will restrict the GPUs used
by most deep learning frameworks assuming it’s not overridden by the user.
Fractional GPUs
Ray supports fractional resource requirements
so multiple tasks and actors can share the same GPU.
ray.init(num_cpus=4, num_gpus=1)
@ray.remote(num_gpus=0.25)
def f():
import time
time.sleep(1)
# The four tasks created here can execute concurrently
# and share the same GPU.
ray.get([f.remote() for _ in range(4)])
Note: It is the user’s responsibility to make sure that the individual tasks
don’t use more than their share of the GPU memory.
TensorFlow can be configured to limit its memory usage.
When Ray assigns GPUs of a node to tasks or actors with fractional resource requirements,
it will pack one GPU before moving on to the next one to avoid fragmentation.
ray.init(num_gpus=3)
@ray.remote(num_gpus=0.5)
class FractionalGPUActor:
def ping(self):
print("ray.get_gpu_ids(): {}".format(ray.get_gpu_ids()))
fractional_gpu_actors = [FractionalGPUActor.remote() for _ in range(3)]
# Ray will try to pack GPUs if possible.
[ray.get(fractional_gpu_actors[i].ping.remote()) for i in range(3)]
# (FractionalGPUActor pid=57417) ray.get_gpu_ids(): [0]
# (FractionalGPUActor pid=57416) ray.get_gpu_ids(): [0]
# (FractionalGPUActor pid=57418) ray.get_gpu_ids(): [1]
Workers not Releasing GPU Resources
Currently, when a worker executes a task that uses a GPU (e.g.,
through TensorFlow), the task may allocate memory on the GPU and may not release
it when the task finishes executing. This can lead to problems the next time a
task tries to use the same GPU. To address the problem, Ray disables the worker
process reuse between GPU tasks by default, where the GPU resources is released after
the task process exits. Since this adds overhead to GPU task scheduling,
you can re-enable worker reuse by setting max_calls=0
in the ray.remote decorator.
# By default, ray will not reuse workers for GPU tasks to prevent
# GPU resource leakage.
@ray.remote(num_gpus=1)
def leak_gpus():
import tensorflow as tf
# This task will allocate memory on the GPU and then never release it.
tf.Session()
Accelerator Types
Ray supports resource specific accelerator types. The accelerator_type option can be used to force to a task or actor to run on a node with a specific type of accelerator.
Under the hood, the accelerator type option is implemented as a custom resource requirement of "accelerator_type:": 0.001.
This forces the task or actor to be placed on a node with that particular accelerator type available.
This also lets the multi-node-type autoscaler know that there is demand for that type of resource, potentially triggering the launch of new nodes providing that accelerator.
from ray.util.accelerators import NVIDIA_TESLA_V100
@ray.remote(num_gpus=1, accelerator_type=NVIDIA_TESLA_V100)
def train(data):
return "This function was run on a node with a Tesla V100 GPU"
ray.get(train.remote(1))
See ray.util.accelerators for available accelerator types. Current automatically detected accelerator types include Nvidia GPUs.
Placement Groups
Placement groups allow users to atomically reserve groups of resources across multiple nodes (i.e., gang scheduling).
They can be then used to schedule Ray tasks and actors packed as close as possible for locality (PACK), or spread apart
(SPREAD). Placement groups are generally used for gang-scheduling actors, but also support tasks.
Here are some real-world use cases:
Distributed Machine Learning Training: Distributed Training (e.g., Ray Train and Ray Tune) uses the placement group APIs to enable gang scheduling. In these settings, all resources for a trial must be available at the same time. Gang scheduling is a critical technique to enable all-or-nothing scheduling for deep learning training.
Fault tolerance in distributed training: Placement groups can be used to configure fault tolerance. In Ray Tune, it can be beneficial to pack related resources from a single trial together, so that a node failure impacts a low number of trials. In libraries that support elastic training (e.g., XGBoost-Ray), spreading the resources across multiple nodes can help to ensure that training continues even when a node dies.
Key Concepts
Bundles
A bundle is a collection of “resources”. It could be a single resource, {"CPU": 1}, or a group of resources, {"CPU": 1, "GPU": 4}.
A bundle is a unit of reservation for placement groups. “Scheduling a bundle” means we find a node that fits the bundle and reserve the resources specified by the bundle.
A bundle must be able to fit on a single node on the Ray cluster. For example, if you only have an 8 CPU node, and if you have a bundle that requires {"CPU": 9}, this bundle cannot be scheduled.
Placement Group
A placement group reserves the resources from the cluster. The reserved resources can only be used by tasks or actors that use the PlacementGroupSchedulingStrategy.
Placement groups are represented by a list of bundles. For example, {"CPU": 1} * 4 means you’d like to reserve 4 bundles of 1 CPU (i.e., it reserves 4 CPUs).
Bundles are then placed according to the placement strategies across nodes on the cluster.
After the placement group is created, tasks or actors can be then scheduled according to the placement group and even on individual bundles.
Create a Placement Group (Reserve Resources)
You can create a placement group using ray.util.placement_group().
Placement groups take in a list of bundles and a placement strategy.
Note that each bundle must be able to fit on a single node on the Ray cluster.
For example, if you only have a 8 CPU node, and if you have a bundle that requires {"CPU": 9},
this bundle cannot be scheduled.
Bundles are specified by a list of dictionaries, e.g., [{"CPU": 1}, {"CPU": 1, "GPU": 1}]).
CPU corresponds to num_cpus as used in ray.remote.
GPU corresponds to num_gpus as used in ray.remote.
memory corresponds to memory as used in ray.remote
Other resources corresponds to resources as used in ray.remote (E.g., ray.init(resources={"disk": 1}) can have a bundle of {"disk": 1}).
Placement group scheduling is asynchronous. The ray.util.placement_group returns immediately.
Python
from pprint import pprint
import time
# Import placement group APIs.
from ray.util.placement_group import (
placement_group,
placement_group_table,
remove_placement_group,
)
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
# Initialize Ray.
import ray
# Create a single node Ray cluster with 2 CPUs and 2 GPUs.
ray.init(num_cpus=2, num_gpus=2)
# Reserve a placement group of 1 bundle that reserves 1 CPU and 1 GPU.
pg = placement_group([{"CPU": 1, "GPU": 1}])
Java
// Initialize Ray.
Ray.init();
// Construct a list of bundles.
Map bundle = ImmutableMap.of("CPU", 1.0);
List