How to Use Ray, a Distributed Python Framework, on Databricks

How to Use Ray, a Distributed Python Framework, on Databricks

Information about How to Use Ray, a Distributed Python Framework, on Databricks

Kevin David

Ray is an open-source project first developed at RISELab that makes it simple to scale any compute-intensive Python workload. With a rich set of libraries and integrations built on a flexible distributed execution framework, Ray brings new use cases and simplifies the development of custom distributed Python functions that would normally be complicated to create.

Running Ray on top of an Apache Spark™ cluster creates the ability to distribute the internal code of PySpark UDFs as well as Python code that used to be only run on the driver node. It also adds the ability to use Ray’s scalable reinforcement learning RLlib out of the box. These abilities allow for a wide array of new applications.

Why need another distributed framework on top of Spark?

There are two ways to think of how to distribute a function across a cluster. The first way is where parts of a dataset are split up and a function acts on each part and collects the results. This is called data parallelism, which is the most common form in big data, and the best example is Apache Spark. Modern forms of data parallelism frameworks typically have DataFrame functions and are not meant for low-level building of the internals of distributed operations, such as hand-crafted functions outside of UDFs (user-defined functions).

Data parallelism is the most common way to distribute tasks across a cluster. Here,  parts of the dataset are split up and a function acts on each part and collects the results.

Figure 1: Data Parallelism

Another form of distributing functions is when the data set is small, but the operations are complicated enough that simply running the same function on different partitions doesn’t solve the problem. This is known as task parallelism or logical parallelism and describes when many functions can be run concurrently and are set up in complicated pipelines using parameter servers and schedulers to coordinate dependencies. This type of parallelism is mostly found in HPC (High Performance Computing) or custom distributed jobs that aren’t possible with DataFrame operations. Often, these frameworks are meant for designing distributed functions from scratch. Examples include physics simulations, financial trading algorithms and advanced mathematical computations.

Task parallelism is another way to distribute tasks across a cluster and is typically reserved for more complex use cases. Here, many tasks can be run concurrently within a complicated pipeline.

Figure 2: Task Parallelism

However, many task-parallel and traditional HPC libraries are written for C++ instead of Python workloads (which is required in many data science pipelines) and don’t generalize enough to accommodate custom job requirements such as advanced design patterns. They may also be made for hardware optimization of multi-core CPU architectures, such as improving the performance of linear algebra operations on a single machine, instead of distributing functions across a cluster. Such hardware libraries could also be created for specialized hardware instead of commodity cloud hardware. The main difficulty with the majority of task parallel libraries is the level of complexity required to create dependencies between tasks and the amount of development time. To overcome these challenges, many open-source Python libraries have been developed that combine the simplicity of Python with the ability to scale custom tasks.

One of the best recent examples of task or logical parallelism in Python is Ray. Its simplicity, low-latency distributed scheduling and ability to quickly create very complicated dependencies between distributed functions solves the issues of generality, scalability and complexity. See a Gentle Introduction to Ray for more details.

A Simple Introduction to Ray Architecture

Ray Architecture

Figure 3: Ray Architecture

An important distinction of Ray’s architecture is that there are two levels of abstraction for how to schedule jobs. Ray treats the local system as a cluster, where separate processes, or Raylets, function like a node in the typical big data terminology. There is also a global scheduler, which can treat the separate machines as nodes. This allows for efficient scaling from the single node or laptop level for development all the way up to the massive scale of cloud computing. As each node has its own local scheduler that can also communicate with the global scheduler, a task can be sent from any node to the rest of the cluster. This feature lets the developer create remote tasks that can trigger other remote tasks and bring many design patterns of object-oriented programming to distributed systems, which is vital for a library designed for creating distributed applications from scratch. There is also a node that manages the global control store, which keeps track of tasks, functions, events, and other system-level metadata.

Data flow diagram between worker nodes and the GCS

Figure 4: Data flow diagram between worker nodes and the GCS

The object store in Ray is a distributed object store built on Apache Arrow that manages the shared functions, objects and tasks used by the cluster. One of the most important aspects of Ray is that its object store is in-memory with a hierarchy of memory management for either evicting or persisting objects (in Ray v1.2+) that cause a memory spill. This high-speed in-memory system allows for high performance communication at large scale, but requires that the instances have large amounts of memory to avoid memory spills.

Take the following simple example of a remote task that calls another remote task within the function. The program’s dependencies are represented by the task graph and the physical execution shows how the object store holds common variables and results while functions are executed on separate worker nodes.

Example of the relation of the driver and worker nodes and object store in application

Figure 5: Example of the relation of the driver and worker nodes and object store in application.

Remote class objects (called remote actors in Ray) allow for parameter servers and more sophisticated design patterns such as nested trees of actors or functions. Using this simple API and architecture, complicated distributed tasks can be designed quickly without the need to create the underlying infrastructure. Examples of many design patterns can be found here.

@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

    def get_counter(self):
        return self.value
counter_actor = Counter.remote()

For more details on the underlying architecture, see the Ray 1.0 Architecture whitepaper

Starting Ray on a Databricks Cluster

Note: The official Ray documentation describes Spark integration via the RayDP project. However, this is about “Ray on Spark” since a Databricks cluster starts as a managed Spark cluster instead of being able to initialize as a Ray cluster. Ray is also not officially supported by Databricks.

Some custom setup is needed before being able to run Ray on a Databrick script. An init script is a shell script that runs during startup of each cluster node before the Apache Spark driver or worker JVM starts. Instructions on how to configure an init script can be found here.

Run the following cell in a Databricks notebook to create the init script:

%python

kernel_gateway_init = """
#!/bin/bash

#RAY PORT
RAY_PORT=9339
REDIS_PASS="d4t4bricks"

# install ray
/databricks/python/bin/pip install ray

# Install additional ray libraries
/databricks/python/bin/pip install ray[debug,dashboard,tune,rllib,serve]

# If starting on the Spark driver node, initialize the Ray head node
# If starting on the Spark worker node, connect to the head Ray node
if [ ! -z $DB_IS_DRIVER ] && [ $DB_IS_DRIVER = TRUE ] ; then
  echo "Starting the head node"
  ray start  --min-worker-port=20000 --max-worker-port=25000 --temp-dir="/tmp/ray" --head --port=$RAY_PORT --redis-password="$REDIS_PASS"  --include-dashboard=false
else
  sleep 40
  echo "Starting the non-head node - connecting to $DB_DRIVER_IP:$RAY_PORT"
  ray start  --min-worker-port=20000 --max-worker-port=25000 --temp-dir="/tmp/ray" --address="$DB_DRIVER_IP:$RAY_PORT" --redis-password="$REDIS_PASS"
fi
""" 
# Change ‘username’ to your Databricks username in DBFS
# Example: username = “[email protected]”
username = “”
dbutils.fs.put("dbfs:/Users/{0}/init/ray.sh".format(username), kernel_gateway_init, True)

Configure the cluster to run the init script that the notebook created on startup of the cluster. The advanced options, if using the cluster UI, should look like this:

Advanced cluster configuration example

Figure 6: Advanced cluster configuration example

Distributing Python UDFs

User-Defined Functions (UDFs) can be difficult to optimize since the internals of the function still run linearly. There are options to help optimize Spark UDFs such as using a Pandas UDF, which uses Apache Arrow to transfer data and Pandas to work with the data, which can help with the UDF performance. These options allow for hardware optimization, but Ray can be used for logical optimization to drastically reduce the runtime of complicated Python tasks that would not typically be able to be distributed. Example included in the attached notebook for distributing a ML model within a UDF to achieve 2x performance.

Reinforcement Learning

Example diagram of how Ray can be used for reinforcement learning

Figure 7: Diagram of Reinforcement Learning

An important and growing application of machine learning is reinforcement learning in which can ML agent trains to learn actions in an environment to maximize a reward function. Its applications range from autonomous driving to power consumption optimization to state-of-the-art gameplay. Reinforcement learning is the third major category of machine learning along with unsupervised and supervised learning.

The challenges of creating reinforcement learning applications include the need for creating a learning environment or simulation in which the agent can train, the complexity of scaling, and the lack of open source standards. Each application requires an environment, which often is custom made and created through historical records or physics simulations that can provide the result of every action the agent can perform. Such simulation environment examples include OpenAI Gym (environments ranging from classic Atari games to robotics), CARLA (the open-source driving simulator), or Tensor Trade (for training stock market trading algorithms).

For these simulations to scale, they cannot simply run on partitions of a dataset. Some simulations will complete before others and they must communicate their copy of the machine learning model’s weights back to some central server for model consolidation in the simplest form of distributed model training. Therefore, this becomes an issue of task parallelism where it is not big data, but rather computing many simultaneous computations of high complexity. The last issue to mention is the lack of open source standards in reinforcement learning libraries. Whereas deep learning or traditional machine learning have had more time to establish standards or libraries that bridge the differences of frameworks (such as MLflow), reinforcement learning is in a younger form of development and does not yet have a well-established standard of model libraries and can vary widely. This causes more development time when switching between algorithms or frameworks.

To solve these problems, Ray comes with a reinforcement learning library named RLlib for high scalability and a unified API. It can run OpenAI Gym and user-defined environments, can train on a very wide variety of algorithms and supports TensorFlow and PyTorch for the underlying neural networks. Combining RLlib with Databricks allows for the benefits of highly scalable streaming and data integration with Delta Lake along with the high performance of state-of-the-art reinforcement learning models.

RLlib uses Tune, a Ray library for scalable hyperparameter tuning that runs variations of the models to find the best one. In this code example, it runs a PPO (Proximal Policy Optimization) agent on an OpenAI Gym’s CartPole environment and performs a grid search on three options for the learning rate. What is going on under the hood is that the Ray process on the Spark nodes is running simulations of the environment and sending back the batches to a central training Ray process that trains the model on these batches. It then sends the model to the rollout workers to collect more training data. While the trainer process can use GPUs to speed up training, by setting “num_gpus” to 0, it will train on less expensive CPU nodes.

The Ray library Tune uses a Proximal Policy Optimization (PPO) architecture to accelerate model training.

Figure 8: PPO Architecture

from ray import tune

tune.run(
    "PPO",
    stop={"episode_reward_mean": 200},
    config={
        "env": "CartPole-v0",
        "num_gpus": 0,
        "num_workers": 3,
        "lr": tune.grid_search([0.01, 0.001, 0.0001]),
    },
)

Applications of reinforcement learning broadly consist of scenarios wherever a simulation is able to run, a cost function can be established, and the problem is complicated enough that hard-set logical rules or simpler heuristical models cannot be applied. The most famous cases of reinforcement learning are typically research-orientated with an emphasis on game-play such as AlphaGo, super-human level Atari agents, or simulated autonomous driving, but there are many real-world business use cases. Examples of recent applications are robotic manipulation control for factories, power consumption optimization, and even marketing and advertising recommendations.

Get started

The benefits of Ray integrated with the power of using Spark help to expand the possible applications of using the Databricks Lakehouse Platform by allowing for scalable task parallelism as well as reinforcement learning. The integration combines the reliability, security, distributed-compute performance, and a wide array of partner integrations with Delta Lake, taking advantage of Ray’s universal distributed-compute framework to add new streaming, ML and big data workloads.

Try the Notebook

Breaking Story – How to Use Ray, a Distributed Python Framework, on Databricks

The Latest News on How to Use Ray, a Distributed Python Framework, on Databricks

Source link
Category – Big Data

Leave a Reply

Your email address will not be published. Required fields are marked *