What is Apache Ballista?
Apache Ballista is a distributed SQL query engine that extends Apache DataFusion across multiple nodes for horizontal scaling. It adds distributed scheduling, shuffle, and exchange operators on top of DataFusion, enabling analytical queries over datasets that exceed the capacity of a single machine.
Apache DataFusion is a powerful single-node SQL query engine, but some workloads exceed what a single machine can handle. A dataset might be too large to fit in memory, a query might need to scan terabytes of data within a latency budget, or the computational cost of a complex join might benefit from parallelism across multiple cores on multiple nodes.
Apache Ballista solves this by extending DataFusion's query engine across a cluster of machines. It takes DataFusion's SQL parsing, logical planning, and optimization capabilities and adds the distributed execution layer needed to partition work, shuffle data between nodes, and merge results. Each node in a Ballista cluster runs DataFusion as its local query engine, and Ballista coordinates them.
Architecture
Ballista uses a scheduler-executor architecture, similar in concept to Spark's driver-executor model but implemented in Rust with Apache Arrow as the native data format.
Scheduler
The scheduler is the coordinator of a Ballista cluster. When a SQL query arrives, the scheduler:
- Parses and optimizes the query using DataFusion's standard SQL parser and optimizer
- Creates a distributed execution plan by analyzing the logical plan and inserting exchange operators where data needs to move between nodes (e.g., for joins and aggregations that require data from multiple partitions)
- Partitions the work into stages and tasks. A stage is a sequence of operations that can execute on a single partition without data exchange. Tasks are individual units of work assigned to executor nodes.
- Assigns tasks to executors based on data locality, executor capacity, and load balancing
- Tracks progress and handles retries if an executor fails or a task times out
The scheduler maintains a global view of the cluster state, including which executors are available, which tasks are running, and which stages are complete.
Executors
Executors are the worker nodes that perform the actual computation. Each executor:
- Receives task assignments from the scheduler
- Executes tasks using its local DataFusion engine. The executor reads data from its assigned partitions, applies the operators in the physical plan (scan, filter, project, aggregate, etc.), and produces Arrow record batches as output.
- Writes intermediate results to local storage (or exchanges them with other executors) for downstream stages
- Reports status back to the scheduler, including completion, failure, and performance metrics
Because each executor runs a full DataFusion engine, all of DataFusion's single-node optimizations -- predicate pushdown, projection pruning, vectorized execution on Arrow arrays -- apply at the per-node level. Ballista adds the coordination and data exchange layer on top.
How Distributed Query Execution Works
Distributed query execution introduces several concepts that don't exist in single-node engines. Understanding these is key to understanding how Ballista (and distributed query engines in general) operate.
Partitioning
Data is divided into partitions -- subsets of the full dataset that can be processed independently. Partitioning can be based on:
- Hash partitioning: Rows are assigned to partitions based on a hash of one or more columns. This ensures that all rows with the same key end up in the same partition, which is necessary for hash joins and group-by aggregations.
- Range partitioning: Rows are assigned to partitions based on value ranges. This is useful for ordered scans and range queries.
- Round-robin partitioning: Rows are distributed evenly across partitions without regard to content. This maximizes parallelism for operations that don't require co-located keys.
The choice of partitioning strategy affects both performance and correctness. A hash join, for example, requires that both sides of the join are hash-partitioned on the join key so that matching rows are co-located on the same executor.
Shuffles and Exchanges
When a query requires data to be repartitioned -- for example, when a hash join needs data partitioned by the join key, but the data is currently range-partitioned -- a shuffle (or exchange) occurs. During a shuffle:
- Each executor reads its local partitions and computes the target partition for each row based on the new partitioning scheme
- Rows are serialized as Arrow record batches and sent over the network to the appropriate executor
- The receiving executor collects incoming batches and makes them available for the next stage of execution
Shuffles are the most expensive operation in distributed query execution because they involve network I/O and serialization. Minimizing unnecessary shuffles is a key optimization goal for distributed query planners.
Stages
Ballista breaks a distributed query plan into stages separated by exchange boundaries. Within a stage, all operations can execute on a single partition without data exchange. Between stages, shuffles repartition the data as needed.
For example, a query that joins two tables and then aggregates the result might be broken into three stages:
- Stage 1: Scan and filter table A, hash-partition by join key
- Stage 2: Scan and filter table B, hash-partition by join key
- Stage 3: Perform the hash join on co-located partitions, then aggregate
Stages 1 and 2 can execute in parallel across different executors. Stage 3 depends on both Stage 1 and Stage 2 completing, because it needs the shuffled output from both.
Ballista vs. Other Distributed Query Engines
Ballista vs. Apache Spark
Spark is the most widely deployed distributed data processing framework. It runs on the JVM, supports multiple languages (Scala, Python, Java, R), and has a mature ecosystem of libraries for batch processing, streaming, machine learning, and graph processing.
Ballista differs in several ways:
- Language and runtime: Ballista is written in Rust with no JVM dependency. This means lower memory overhead, more predictable performance (no garbage collection pauses), and faster startup times.
- Data format: Ballista uses Apache Arrow as its native in-memory format. Spark uses its own internal row format for many operations and converts to/from Arrow when interfacing with external systems. Ballista's native Arrow integration eliminates this conversion overhead.
- Footprint: Ballista is a lightweight distributed query engine. Spark is a comprehensive data processing framework that includes batch, streaming, ML, and graph libraries. Ballista is smaller and more focused.
- Extensibility: Both are extensible, but Ballista inherits DataFusion's Rust trait-based extension model, while Spark uses JVM-based plugin interfaces.
Choose Spark when you need a mature, full-featured distributed data processing platform with a large ecosystem. Choose Ballista when you need a lightweight, Rust-native distributed SQL engine with native Arrow integration and lower operational overhead.
Ballista vs. Trino
Trino (formerly Presto) is a distributed SQL query engine designed for interactive analytics and SQL federation across heterogeneous data sources. Trino has a mature production track record and a rich connector ecosystem.
Ballista and Trino share the same high-level architecture (scheduler + workers), but differ in implementation:
- Language: Trino is written in Java. Ballista is written in Rust.
- Data format: Trino uses its own internal page format. Ballista uses Apache Arrow natively.
- Embeddability: Trino is designed to be deployed as a standalone cluster. Ballista, like DataFusion, is designed to be embeddable -- it can be integrated into a larger application rather than requiring standalone deployment.
- Maturity: Trino has years of production deployment at major companies. Ballista is newer and under active development.
Choose Trino when you need a production-proven distributed SQL engine with a broad connector ecosystem. Choose Ballista when you need a Rust-native, Arrow-native distributed engine that can be embedded into a custom system.
Ballista and DataFusion: The Relationship
Ballista is built directly on top of DataFusion. This relationship is fundamental to understanding both projects:
- DataFusion provides SQL parsing, logical planning, query optimization, and single-node physical execution. It is a library that runs in a single process.
- Ballista adds distributed scheduling, partitioning, shuffles, and inter-node coordination. It uses DataFusion as the per-node execution engine.
When Ballista executes a query, each executor node runs DataFusion locally. DataFusion handles all the per-partition computation -- scanning, filtering, projecting, joining, aggregating. Ballista handles the coordination between nodes -- deciding which executor processes which partition, managing shuffles, and collecting final results.
This separation means that improvements to DataFusion's optimizer or execution engine automatically benefit Ballista deployments. And DataFusion extensions -- custom table providers, UDFs, optimizer rules -- work in Ballista without modification.
Current Status and Development
Ballista is an incubating project within the Apache Arrow ecosystem. It is under active development, with contributions from multiple organizations. Key areas of ongoing work include:
- Fault tolerance: Improving task retry logic and executor failure recovery
- Resource management: Better scheduling based on executor memory and CPU availability
- Performance: Reducing shuffle overhead and improving exchange operator efficiency
- Integration: Expanding the set of data sources and file formats supported in distributed mode
Ballista is suitable for experimental and early production workloads. For mission-critical production deployments that require mature fault tolerance and operations tooling, teams should evaluate Ballista alongside established alternatives like Trino and Spark.
How Spice Uses Distributed Query Concepts
Spice builds on the distributed query concepts pioneered by Ballista and other distributed engines. Spice's distributed architecture enables SQL federation and data acceleration across multiple nodes:
- Distributed federation: Queries are federated across data sources from any node in a Spice cluster. The query planner determines the optimal execution strategy, including which sources to query from which nodes.
- Distributed acceleration: Accelerated datasets can be partitioned across nodes, with each node caching a subset of the data. Queries are routed to the nodes that hold the relevant partitions.
- Arrow-native transport: Like Ballista, Spice uses Apache Arrow as its native data format for inter-node communication, eliminating serialization overhead.
By combining DataFusion's single-node query engine with distributed execution capabilities, Spice delivers sub-second federated queries across distributed data sources and acceleration caches.
Advanced Topics
Scheduler-Executor Architecture in Depth
The scheduler and executors communicate through a combination of gRPC services and Arrow Flight endpoints. The scheduler exposes a planning API that accepts SQL or pre-built logical plans and returns a job identifier. It then decomposes the job into a directed acyclic graph (DAG) of stages and tasks.
Each executor registers with the scheduler at startup, reporting its available resources (CPU cores, memory). The scheduler uses this information to make placement decisions. When a task completes, the executor reports back with metrics -- execution time, rows processed, bytes shuffled -- which the scheduler uses to refine future scheduling decisions within the same job.
Shuffle Strategies
Shuffles are the most performance-critical aspect of distributed query execution. Ballista supports several shuffle strategies, each suited to different workload patterns:
Hash shuffle is the default for joins and group-by aggregations. Each executor hashes each row's partition key and writes it to one of N output partitions. The receiving executors pull their assigned partitions. This ensures co-location of matching keys but can create hot partitions if the key distribution is skewed.
Sort-merge shuffle is used when the downstream stage requires sorted input -- for example, a sort-merge join or a global ORDER BY. Each executor sorts its local partition and writes sorted runs. The downstream stage merges these sorted runs without needing to buffer the full dataset.
Broadcast shuffle is an optimization for small tables. When one side of a join is small enough to fit in executor memory, the scheduler broadcasts the entire small table to all executors rather than hash-partitioning both sides. This eliminates one full shuffle and is a significant performance win for star-schema queries with small dimension tables.
The query planner selects shuffle strategies based on the physical plan operators, available statistics, and configurable thresholds (e.g., the broadcast size limit).
Fault Tolerance
Distributed query execution must handle executor failures gracefully. Ballista's fault tolerance model operates at the task level:
- Heartbeat monitoring: The scheduler expects periodic heartbeats from each executor. If an executor misses consecutive heartbeats, the scheduler marks it as lost and reassigns its in-flight tasks to other executors.
- Task retries: When a task fails -- whether due to executor failure, out-of-memory errors, or data source errors -- the scheduler retries the task on a different executor up to a configurable retry limit. If the task depends on intermediate shuffle data that was stored on the failed executor, the scheduler re-executes the upstream stage that produced that data.
- Stage-level recovery: If a shuffle output is lost because the executor that stored it has failed, the scheduler must re-execute the entire upstream stage to regenerate the shuffle data. This is the most expensive failure mode and is the primary motivation for persisting shuffle data to durable storage in production deployments.
Resource Scheduling
Resource-aware scheduling is essential for stable cluster operation. Ballista's scheduler tracks each executor's resource utilization and enforces constraints:
- Memory-based admission control: The scheduler estimates the memory requirements of each task based on the physical plan operators (e.g., hash joins require memory proportional to the build side). Tasks are assigned to executors that have sufficient free memory.
- Slot-based concurrency: Each executor advertises a fixed number of task slots (typically equal to the number of CPU cores). The scheduler does not assign more tasks than an executor has slots, preventing CPU oversubscription.
- Data locality preferences: When a task reads data from a specific storage location, the scheduler prefers executors that are co-located with that data. This reduces network I/O for the initial scan stage. If no co-located executor has available capacity, the scheduler falls back to a remote executor.
These mechanisms work together to keep executor utilization high while avoiding overload conditions that would cause task failures or performance degradation.
Apache Ballista FAQ
How does Ballista compare to Apache Spark?
Spark is a comprehensive, JVM-based distributed data processing framework with mature libraries for batch, streaming, ML, and graph processing. Ballista is a focused, Rust-native distributed SQL query engine built on Apache DataFusion and Arrow. Ballista has lower memory overhead, faster startup, and native Arrow integration, but Spark has a larger ecosystem, broader language support, and years of production hardening at scale.
How does Ballista compare to Trino?
Both are distributed SQL query engines with scheduler-worker architectures. Trino is written in Java, uses its own internal data format, and is designed for standalone cluster deployment with a mature connector ecosystem. Ballista is written in Rust, uses Apache Arrow natively, and is designed to be embeddable. Trino is more production-proven; Ballista offers Rust-native performance and tighter Arrow integration.
Is Ballista production ready?
Ballista is under active development as an incubating Apache project. It is suitable for experimental and early production workloads. For mission-critical production deployments that require mature fault tolerance, operational tooling, and SLA guarantees, teams should evaluate Ballista alongside established alternatives like Trino and Spark, considering their specific requirements for maturity versus native Arrow and Rust integration.
What is the relationship between Ballista and DataFusion?
Ballista is built on top of DataFusion. DataFusion provides the single-node SQL query engine -- parsing, planning, optimization, and execution. Ballista adds the distributed layer -- scheduling, partitioning, shuffles, and inter-node coordination. Each executor node in a Ballista cluster runs DataFusion locally. DataFusion extensions (custom table providers, UDFs, optimizer rules) work in Ballista without modification.
What are common use cases for distributed SQL query engines?
Distributed query engines are used when single-node capacity is insufficient: scanning terabytes of data within latency budgets, running complex joins across very large tables, parallelizing expensive aggregations, and serving concurrent analytical queries that exceed single-machine throughput. They are also used for federated queries across geographically distributed data sources where pushing computation to the data is more efficient than centralizing it.
Learn more about distributed queries and Spice
Technical resources on distributed SQL execution, DataFusion, and how Spice handles distributed workloads.
Spice.ai OSS Documentation
Learn how Spice uses distributed query execution to federate and accelerate SQL across multiple nodes and data sources.

How we use Apache DataFusion at Spice AI
A technical overview of how Spice extends Apache DataFusion with custom table providers, optimizer rules, and UDFs.

A Developer's Guide to Understanding Spice.ai
Learn what Spice.ai is, when to use it, and how it solves enterprise data challenges.

See Spice in action
Get a guided walkthrough of how development teams use Spice to query, accelerate, and integrate AI for mission-critical workloads.
Get a demo