Skip to content

SparkExample

Spark Data Read Modes#

When reading data (e.g., from CSV, JSON, etc.) in Apache Spark using DataFrameReader, you can specify how Spark should handle malformed or corrupted records using the mode option. Below are the available modes:

Mode Description
failFast Terminates the query immediately if any malformed record is encountered. This is useful when data integrity is critical.
dropMalformed Drops all rows containing malformed records. This can be useful when you prefer to skip bad data instead of failing the entire job.
permissive (default) Tries to parse all records. If a record is corrupted or missing fields, Spark sets null values for corrupted fields and puts malformed data into a special column named _corrupt_record.

Schema in spark#

There are two primary methods: using StructType and StructField classes, and using a DDL (Data Definition Language) string.

These are classes in Spark used to define the schema structure.

StructField represents a single column within a DataFrame. It holds information such as the column's name, its data type (e.g., String, Integer, Timestamp), and whether it can contain null values (nullable: True/False). If nullable is set to False, the column cannot contain NULL values, and an error will be thrown if it does.

StructType defines the overall structure of a DataFrame. It is essentially a list or collection of StructField objects. When you combine ID, Name, and Age fields, for example, they define the structure of a record, and a collection of such records forms a DataFrame.

What happens if you set header=False when your data actually has a header? If you disable the header option (header=False) but your CSV file contains a header row, Spark will treat that header row as regular data. If this header row's values do not match the data types defined in your manual schema (e.g., a string "Count" being read into an Integer column), it can lead to null values in that column if the read mode is set to permissive, or an error if the mode is failfast.

Method 1: Creating Schema using StructType and StructField

This method involves importing specific classes from pyspark.sql.types to programmatically define the schema.

Required Imports: To use StructType and StructField, along with specific data types like StringType and IntegerType, you must import them:

Code

from pyspark.sql.types import StructType, StructField, StringType, IntegerType #

Conceptual Data (for explanation):

ID Name Age
1 Manish 30
2 John 25

Code Example for ID, Name, Age Schema:

Code

# Define the schema using StructType and StructField
my_schema = StructType([
    StructField("ID", IntegerType(), True),  # ID column, Integer type, nullable is True
    StructField("Name", StringType(), True),  # Name column, String type, nullable is True
    StructField("Age", IntegerType(), True)   # Age column, Integer type, nullable is True
])

Each StructField defines a column: its name (string), its data type (e.g., IntegerType()), and whether it's nullable (Boolean True or False). Setting nullable to False means the column cannot be null, otherwise an error will be thrown.

The StructType constructor takes a list of StructField objects.

Method 2: Creating Schema using DDL String

This method is simpler and involves passing a string that defines the column names and their data types, similar to SQL's Data Definition Language.

Code Example for ID, Name, Age Schema (DDL String):

Code

# Define the schema using a DDL string
ddl_my_schema = "ID INT, Name STRING, Age INT" #

The string specifies column names and their corresponding Spark SQL data types (e.g., INT for Integer, STRING for String) separated by commas.

Applying the Schema to Read Data

Actual Data Example (from a CSV file): The example uses a CSV file containing flight data with columns Destination Country, Origin Country, and Count.

Code Example for Reading Data with Manual Schema:

Code

# Assuming 'spark' session is available and the CSV file is at 'path/to/flight_data.csv'

# Define the schema for the flight data using StructType and StructField
flight_schema = StructType([
    StructField("Destination Country", StringType(), True), #
    StructField("Origin Country", StringType(), True),      #
    StructField("Count", IntegerType(), True)              #
])

# Read the CSV file applying the defined schema
# Note: The video uses 'flight_df' as the DataFrame name
flight_df = spark.read.format("csv") \
                .option("header", "false") \ # Initially set to false to demonstrate header issues
                .schema(flight_schema) \    # Apply the custom schema
                .load("dbfs:/FileStore/tables/flight_data.csv") # Example path

# To display the schema of the loaded DataFrame
# flight_df.printSchema()

Handling Headers and Skipping Rows

Problem Scenario: If header=False is set and the data includes a header row (e.g., "Count" as the first value in the Count column), Spark will attempt to parse "Count" as an Integer. This will result in null values for that row/column if the reading mode is permissive, or an error if the mode is failfast.

Solution 1: Using mode("permissive"): The mode("permissive") option allows Spark to continue reading the data even if there are type mismatches, converting problematic values to null instead of failing the job.

Code

flight_df = spark.read.format("csv") \
                .option("header", "false") \
                .schema(flight_schema) \
                .option("mode", "permissive") \ # Allows nulls for type mismatches
                .load("dbfs:/FileStore/tables/flight_data.csv")
# After this, the first row's 'Count' column would likely be null because 'Count' (string) cannot be an Integer.

Solution 2: Skipping Rows with option("skipRows", "N"): A more direct way to handle unwanted header rows (or any initial rows) is to use the skipRows option. This option allows you to skip a specified number of rows from the beginning of the file.

Code

# To skip the first row (e.g., a header row)
flight_df_skipped_header = spark.read.format("csv") \
                            .option("header", "false") \ # Header detection is off
                            .option("skipRows", "1") \   # Skips the first row (the actual header)
                            .schema(flight_schema) \
                            .load("dbfs:/FileStore/tables/flight_data.csv")

# To skip multiple initial rows (e.g., the first three rows)
flight_df_skipped_multiple = spark.read.format("csv") \
                                .option("header", "false") \
                                .option("skipRows", "3") \ # Skips the first three rows
                                .schema(flight_schema) \
                                .load("dbfs:/FileStore/tables/flight_data.csv")

This is a robust way to ensure that the schema aligns with the actual data, excluding any rows that are not part of the data records.

Handling corrupted record#

When reading data, Spark offers different modes to handle corrupted records, which influence how the DataFrame is populated.

In permissive mode, all records are allowed to enter the DataFrame. If a record is corrupted, Spark sets the malformed values to null and does not throw an error. For the example data with five total records (two corrupted), permissive mode will result in five records in the DataFrame, with nulls where data is bad.

In dropMalformed mode, Spark discards any record it identifies as corrupted. Given the example data has two corrupted records out of five, this mode will produce a DataFrame with three records

In failfast mode, Spark immediately throws an error and stops the job as soon as it encounters the first corrupted record. This mode will result in zero records in the DataFrame because the job will fail.

How to Print Bad Records#

To specifically identify and view the corrupted records, you need to define a manual schema that includes a special column named _corrupt_record. This column will capture the raw content of the corrupted record.

Where to store bad record For scenarios with a large volume of corrupted records (e.g., thousands), printing them is not practical. Spark provides the badRecordsPath option to store all corrupted records in a specified location. These records are saved in JSON format at the designated path

Read Json in Spark#

Reading JSON files in PySpark is straightforward. The general method involves using spark.read.format("json").load(). Basic Read Operation:

The simplest way to read a JSON file is as follows:

Code

df = spark.read.format("json").load("path/to/your/json_file.json")
df.show()

Datafram Write#

When working with Spark, after you have read data into a DataFrame and performed transformations, it is crucial to write the processed data back to disk to ensure its persistence. Currently, all the transformations and data processing occur in memory, so writing to disk makes the data permanent. Here's a detailed explanation with code examples and notes based on the provided sources:

Code

# Assuming 'df' is your DataFrame
# Define your base location where you want to save the output
base_location = "/user/hive/warehouse/your_database/output/"

# Construct the full path for the CSV output folder
output_path = base_location + "csv_write/"

# Write the DataFrame to disk
df.write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(output_path)

General Structure for Writing a DataFrame to Disk

The general structure for writing a DataFrame using the Spark DataFrame Writer API is as follows:

DataFrame.write: This is the starting point, indicating that you intend to write a DataFrame.

.format(): Specifies the file format in which you want to save the data. Common formats include CSV or Parquet. If no format is explicitly passed, Spark will default to Parquet.

.option(): Allows you to pass multiple options. For example, you can specify whether to include a header for CSV files (e.g., header as True). You can also specify the output path using path option, though save() method usually handles the path directly.

.mode(): Defines how Spark should behave if files or directories already exist at the target location. This is a very important aspect of writing data.

.partitionBy(): (To be covered in a dedicated video mentioned in the source) This method allows you to partition the output data based on one or more columns, creating separate folders for each partition.

.bucketBy(): (To be covered in a dedicated video mentioned in the source) Similar to partitionBy, but it organizes data into a fixed number of buckets within partitions.

.save(): This is the final action that triggers the write operation and specifies the output path where the DataFrame will be written.

A typical flow looks like: df.write.format(...).option(...).mode(...).save(path).

Modes in DataFrame Writer API

The mode() method in the DataFrame Writer API is crucial as it dictates how Spark handles existing data at the target location. There are four primary modes:

append

Functionality: If files already exist at the specified location, the new data from the DataFrame will be added to the existing files.

Example: If there were three files previously, and a new output DataFrame comes, it will simply append its data to that list of files.

overwrite

Functionality: This mode deletes any existing files at the target location before writing the new DataFrame.

Example: If a previous file had records, overwrite will delete all old files and only the new file with its records (e.g., five new records) will be visible.

errorIfExists

Functionality: Spark will check if a file or location already exists at the target path. If it does, the write operation will fail and throw an error.

Use Case: Useful when you want to ensure that you do not accidentally overwrite or append to existing data.

ignore

Functionality: If a file or location already exists at the target path, Spark will skip the write operation entirely without throwing an error. The new file will not be written.

Use Case: This mode is suitable if you want to prevent new data from being written if data is already present, perhaps to avoid overwriting changes or to ensure data integrity