Partition & Bucketing
Partitioning and bucketing are two crucial optimization techniques in Apache Spark that are decided upon when writing data to ensure efficient data processing, especially for analytical workloads (OLAP) where data is written once and read many times (read many) . These techniques help improve performance by reducing the amount of data Spark needs to scan and process for queries .
Partitioning in Spark#
Partitioning in Spark involves organizing data into separate subdirectories (folders) based on the distinct values of one or more specified columns . Each distinct value of the partitioning column(s) gets its own folder .
For example, if you partition by address (which contains countries), Spark will create separate folders for India, USA, Japan, etc., each containing only the records corresponding to that country .
How it Works & Advantages#
Folder Creation: When you partition data, Spark creates a directory structure where each distinct value of the chosen column becomes a folder name. For instance, if you partition by address, you might see folders like address=India/, address=USA/, address=Japan/, etc., at the specified storage location .
Performance Optimization (Predicate Pushdown/Partition Pruning): This is the primary advantage. If you later query the data with a filter on the partitioned column (e.g., WHERE address = 'USA'), Spark can directly go to the address=USA folder and read only the data within that folder, completely ignoring data in other folders . This significantly reduces the amount of data read from disk, leading to faster query execution, especially for large datasets .
Logical Organization: It provides a logical way to organize your data on the file system, making it easier to manage and understand .
Persistence: The data written to these partitioned folders is permanently stored (persistent storage) and will not be deleted even if the Spark cluster is shut down .
When Partitioning Fails (Disadvantages)#
High Cardinality: Partitioning becomes inefficient and can even degrade performance if the chosen column has a very high number of distinct values (high cardinality) . If you partition by an ID column where every record has a unique ID, Spark will create a separate folder for each record .
Small File Problem: Creating too many small folders (and thus, small files within them) is detrimental to performance in distributed file systems like HDFS (which Spark typically works with). Managing a large number of tiny files introduces overhead and can make operations slower rather than faster . The source indicates that for 1 billion records, creating 1 billion small partitions is not feasible .
Examples for Partitioning:#
-
Partitioning by a Single Column: To partition your DataFrame (df) by the address column. Explanation: This code will save the DataFrame df to the specified path. Within that path, Spark will create subdirectories for each unique value in the Address column (e.g., address=India, address=USA, address=Japan) . Each subdirectory will contain the data rows where Address matches that value.
-
Partitioning by Multiple Columns: You can partition by multiple columns, which creates a nested folder structure. The order of columns in partitionBy matters . If you want folders for each country, and then inside each country folder, folders for Gender, you should specify Address first, then Gender. Explanation: The first example will create folders like address=India/gender=Female/, address=USA/gender=Male/, etc. . The second example will create gender=Female/address=India/, gender=Male/address=USA/, etc. . The order determines the hierarchy of folders on the file system and impacts how pruning works .
Bucketing in Spark#
Bucketing is a technique used to divide data into a fixed number of 'buckets' based on the hash value of one or more specified columns . Unlike partitioning, which creates a variable number of directories based on distinct values, bucketing creates a pre-defined, fixed number of files (buckets) . Bucketing is particularly useful when partitioning is not suitable due to high cardinality .
How it Works & Advantages#
Fixed Number of Buckets: You specify the exact number of buckets you want to create . Spark then hashes the values of the chosen bucketing column(s) and assigns rows to specific buckets based on these hash values . For instance, if you define 3 buckets, all records will be distributed among these 3 buckets .
Requires saveAsTable(): Bucketing cannot be used with a direct save() operation to a file path. It requires the data to be saved as a table using saveAsTable(), as it interacts with Spark's metastore to manage bucket metadata .
Optimizing Joins (Shuffle Avoidance): This is a key advantage of bucketing . If two tables are bucketed on the same columns and with the same number of buckets, when you join these two tables on those bucketing columns, Spark can perform a bucket-aware join . This means it can directly join corresponding buckets from each table without needing to shuffle the entire dataset across the network, which is an expensive operation . This significantly speeds up join operations .
Faster Lookups (Bucket Pruning): Similar to partition pruning, bucketing can also enable bucket pruning . If you query data with a filter on the bucketing column, Spark can use the hash function to determine exactly which bucket(s) contain the relevant data . This means it only needs to scan a subset of files within the table, rather than the entire table, making lookups faster .
Overcoming High Cardinality Issues: When partitioning fails due to high cardinality, bucketing provides a solution by distributing records with many unique values into a fixed, manageable number of buckets .
Important Considerations for Bucketing:#
-
repartition() before bucketBy(): A common issue with bucketing is that if you have many tasks running (e.g., 200 tasks), and you specify, say, 5 buckets, Spark might create 5 buckets per task, resulting in 200 * 5 = 1000 unwanted buckets . To avoid this, it's recommended to repartition your DataFrame to the desired number of buckets before applying bucketBy() . This ensures that only the specified number of buckets are created .
-
Same Bucketing Strategy for Joins: For join optimization, both tables involved in the join must be bucketed on the same column(s) and with the same number of buckets .
Code Example for Bucketing Explanation:
repartition(3): This step ensures that the data is first divided into 3 partitions before bucketing, preventing the creation of an excessive number of bucket files if your Spark tasks are more than your desired buckets .
bucketBy(3, "ID"): This specifies that the data should be divided into 3 buckets, using the ID column for hashing .
saveAsTable("bucket_by_id_table"): This command saves the DataFrame as a managed table in Spark's metastore, which is required for bucketing .
Verification: After execution, you would typically use dbutils.fs.ls() to see the bucket files generated within the table's directory, often named like part-00000_0_0_C000.snappy.parquet where the _0_0 part might indicate bucket ID.