Categories: FAANG

Connect Spark data pipelines to Gemini and other AI models with Dataproc ML library

Many data science teams rely on Apache Spark running on Dataproc managed clusters for powerful, large-scale data preparation. As these teams look to connect their data pipelines directly to machine learning models, there’s a clear opportunity to simplify the integration. But running inference on a Spark DataFrame using a model from Vertex AI typically requires custom development, making it complex to build a single, end-to-end workflow.

To solve this problem, we are developing a new open-source Python library designed to simplify AI/ML inference for Dataproc. This library connects your Apache Spark jobs to use popular ML frameworks and Vertex AI features, starting with model inference. Because the library is open-sourced, you will be able to use it directly in your application code with full transparency into its operation.

How it works

Dataproc ML is built to feel familiar to Spark users, following a SparkML-style builder pattern. You configure the model you want to use, and then call .transform() on your DataFrame. Let’s look at a few common inference use cases.

  1. Apply Gemini models to your Spark data

You can apply generative AI models, like Gemini, to columns in your Spark DataFrame. This is useful for tasks like classification, extraction, or summarization at scale. In this example, we take a DataFrame with “city” & “country” columns and use Gemini to create a new column by providing a simple prompt.

You can test in your local environment by installing from PyPi:

code_block
<ListValue: [StructValue([(‘code’, ‘pip install dataproc-ml’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x7f66d2bf7b20>)])]>

To deploy/test at scale, create a Dataproc version 2.3-ml cluster:

code_block
<ListValue: [StructValue([(‘code’, ‘gcloud dataproc clusters create my-ml-cluster \rn –project=”YOUR_PROJECT_ID” \rn –region=”YOUR_REGION” \rn –image-version=2.3-ml-ubuntu \rn –properties=’dataproc:pip.packages=dataproc-ml==0.1”), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x7f66d2bf73a0>)])]>

Copy this example to a file gemini_spark.py.

code_block
<ListValue: [StructValue([(‘code’, ‘from pyspark.sql import SparkSessionrnfrom google.cloud.dataproc_ml.inference import GenAiModelHandlerrnrnspark = SparkSession.builder.getOrCreate()rnrn# Create a sample DataFramerndf = spark.createDataFrame([rn (“London”, “UK”),rn (“Bengaluru”, “India”),rn (“Paris”, “France”),rn (“Tokyo”, “Japan”)rn], [“city”, “country”])rnrn# Configure the model handler. It uses gemini-2.5-flash by default.rngenai_handler = GenAiModelHandler().prompt(rn “Write a short, one-line rhyming poem about the experience of visiting {city} in {country}.”rn)rnrn# Apply the model, which will output to a new `predictions` columnrngenai_handler.transform(df).show(truncate=False)rnrn# Outputrn# +———+——-+———————————————-+rn# |city |country|predictions |rn# +———+——-+———————————————-+rn# |London |UK |Big Ben’s loud chime, a magical time! |rn# |Bengaluru|India |Bengaluru’s green, a vibrant tech scene. |rn# |Paris |France |In Paris, I fell for romance at first glance. |rn# |Tokyo |Japan |In Tokyo’s vibrant pace, a smile upon my face.|rn# +———+——-+———————————————-+’), (‘language’, ‘lang-py’), (‘caption’, <wagtail.rich_text.RichText object at 0x7f66d2bf74c0>)])]>

The handler is flexible to support customized options, as explained in the documentation.

code_block
<ListValue: [StructValue([(‘code’, ‘from google.cloud.dataproc_ml.inference import GenAiModelHandlerrnfrom vertexai.generative_models import GenerationConfigrnrn# Configure the model handlerrngenai_handler = (rn GenAiModelHandler()rn .prompt(“Write a short, one-line rhyming poem about the experience of visiting {city} in {country}.”)rn .model(“gemini-2.5-pro”)rn .output_col(“city_poem”)rn .generation_config(GenerationConfig(temperature=0.7))rn)’), (‘language’, ‘lang-py’), (‘caption’, <wagtail.rich_text.RichText object at 0x7f66d2bf72e0>)])]>

Submit this job to your Dataproc cluster:

code_block
<ListValue: [StructValue([(‘code’, ‘gcloud dataproc jobs submit pyspark gemini_spark.py \ rn –cluster=my-ml-cluster \rn –region=”YOUR_REGION”‘), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x7f66d2bf7070>)])]>

2. Run inference with PyTorch and TensorFlow models

In addition to calling Gemini endpoints, the library also allows you to run inference with model files loaded directly from Google Cloud Storage. You can use the PyTorchModelHandler (and a similar handler for TensorFlow) to load your model weights, define a pre-processor, and run inference directly on your worker nodes. This is useful when you want to run batch inference at scale without managing a separate model serving endpoint.

code_block
<ListValue: [StructValue([(‘code’, ‘from google.cloud.dataproc_ml.inference import PyTorchModelHandlerrnrn# Get weights and transforms for the modelrnweights = ResNet50_Weights.DEFAULTrnrnimage_df = spark.read.format(“binaryFile”).load(“gs://cloud-samples-data/generative-ai/image/”)rnrndef vectorized_preprocessor(image_bytes_series: pd.Series) -> pd.Series:rn “””Applies ResNet50 transforms to a series of image bytes.”””rn return image_bytes_series.apply(rn lambda b: weights.transforms()(Image.open(io.BytesIO(b)).convert(“RGB”))rn )rnrnpytorch_handler = (rn PyTorchModelHandler()rn .model_path(“gs://<bucket>/resnet50_full_model.pt”)rn .input_cols(“content”)rn .pre_processor(vectorized_preprocess)rn .set_return_type(ArrayType(FloatType()))rn)rnrnpytorch_handler.transform(image_df).show()’), (‘language’, ‘lang-py’), (‘caption’, <wagtail.rich_text.RichText object at 0x7f66d2bf7280>)])]>

Built for performance

This library isn’t just a simple wrapper. It’s designed for running inference on large Dataproc clusters and includes several optimizations for inference:

  • Vectorized data transfer: We use pandas_udf to efficiently move data between Spark and the Python worker processes.
  • Connection re-use: Connections to the endpoint are re-used across partitions to reduce overhead.
  • Retry logic: The library automatically handles errors like HTTP 429 (resource exhausted) with exponential backoff and retries.

Get started

You can start using it today by checking out the open-source repository and reading our documentation.

Looking ahead, we plan to add the following features to this library in the coming months.

  1. Spark Connect support: This would also allow using above functionalities within BigQuery Studio notebooks.
  2. Vertex AI integrations: To ease inference, we plan to add more ModelHandlers to:
    1. Directly call a vertex model endpoint for online inference
    2. Refer to Vertex models and localize them to Spark workers
    3. Refer to models hosted in Vertex Model Garden including embedding models
  3. More Optimizations: Auto-repartition input dataframes to enhance inference runtime
  4. Third-party integrations: Refer to open sourced models in HuggingFace

We are actively working on including this library by default in Dataproc on Google Compute Engine ML images and Google Cloud Serverless for Apache Spark runtimes.

We look forward to seeing what you build! Have feedback or feature requests to further simplify your AI/ML experience on spark? Reach us at dataproc-feedback@google.com.

AI Generated Robotic Content

Recent Posts

Fine-tuning SDXL with childhood pictures → audio-reactive geometries – [Experiment]

After a deeply introspective and emotional journey, I fine-tuned SDXL using old family album pictures…

10 hours ago

Beyond Accuracy: 5 Metrics That Actually Matter for AI Agents

AI agents , or autonomous systems powered by agentic AI, have reshaped the current landscape…

10 hours ago

Apple Workshop on Reasoning and Planning 2025

Reasoning and planning are the bedrock of intelligent AI systems, enabling them to plan, interact,…

10 hours ago

MediaFM: The Multimodal AI Foundation for Media Understanding at Netflix

Avneesh Saluja, Santiago Castro, Bowei Yan, Ashish RastogiIntroductionNetflix’s core mission is to connect millions of members…

10 hours ago

Scaling data annotation using vision-language models to power physical AI systems

Critical labor shortages are constraining growth across manufacturing, logistics, construction, and agriculture. The problem is…

10 hours ago

Start Your Surround Sound Journey With $50 off This Klipsch Soundbar

This soundbar is just the beginning, with the option to add wireless bookshelf speakers or…

11 hours ago