What are the main advantages of using PySpark compared to traditional Python for Big Data processing?

Posts

PySpark, the Python API for Apache Spark, provides significant advantages over using traditional Python libraries like pandas for large-scale data processing. The primary advantage is scalability. Traditional Python data analysis tools operate on a single machine, meaning they are constrained by the memory (RAM) and processing power of that one computer. When a dataset grows too large to fit into memory, these tools fail. PySpark, however, is built on Apache Spark, a distributed processing engine. It can distribute data and computations across a cluster of many machines, allowing it to scale horizontally. This means that to process more data, you simply add more machines to the cluster, enabling the processing of datasets that are terabytes or even petabytes in size.

Beyond just scalability, PySpark offers high performance through in-memory parallel processing. Spark’s engine attempts to keep data in memory as much as possible, avoiding slow disk I/O, and runs operations in parallel across all the nodes in the cluster. This is substantially faster than the single-threaded or limited multi-threaded nature of most traditional Python tools. Another key advantage is fault tolerance. PySpark achieves this through its core data abstraction, the Resilient Distributed Dataset (RDD), which tracks the lineage of data transformations. If a node in the cluster fails, Spark can automatically recompute the lost data partition using this lineage, ensuring data reliability without user intervention. Finally, PySpark integrates seamlessly with the broader big data ecosystem, including tools like Hadoop HDFS, Apache Hive, and Apache Kafka, making it a comprehensive solution for end-to-end data pipelines.

How do you create a SparkSession in PySpark? What are its main uses?

In modern PySpark (Spark 2.0 and later), the SparkSession is the single, unified entry point for all Spark functionality. It consolidates the various contexts that existed in earlier versions, such as the SparkContext, SQLContext, and HiveContext, into one convenient object. You create a SparkSession using the SparkSession.builder pattern. This pattern allows you to chain configuration methods to set up your session before finally calling .getOrCreate(). The .getOrCreate() method is intelligent; it will either create a new SparkSession based on your configuration or, if one already exists with the same configuration, it will return the existing session. This prevents conflicts and ensures a single entry point per application.

The main uses of the SparkSession are comprehensive. First, it is used to configure your Spark application’s properties, such as the application name (.appName()), the master URL (.master(), which can be “local[*]” for local testing or a cluster manager’s URL), and other Spark configuration options via .config(). Second, it is the primary way you create DataFrames, either by reading from external data sources like CSV, Parquet, or JSON, or by converting existing RDDs or Python lists. Third, it provides access to the Spark SQL engine, allowing you to run SQL queries directly against your data. Finally, the SparkSession object also contains the sparkContext attribute, so you can still access the lower-level RDD API if needed.

Here is the standard code example for creating a SparkSession:

Python

from pyspark.sql import SparkSession

 

spark = SparkSession.builder \

    .appName(“MySparkApp”) \

    .master(“local[*]”) \

    .getOrCreate()

 

Describe the different ways to read data in PySpark.

PySpark provides a powerful and versatile reader interface, accessed through the spark.read attribute of your SparkSession object. This interface is designed to handle a wide varietyax of data sources in a unified way. The most common method is to specify the format of the data using methods like spark.read.csv(), spark.read.parquet(), and spark.read.json(). Each of these methods offers format-specific options. For example, when reading a CSV, you can specify whether the file has a header, what the delimiter is, and whether to infer the schema. Parquet, a preferred format in the big data world, is often more efficient as it is a columnar format and stores the schema within the file, requiring less configuration.

You can also use the more general spark.read.format(“format_name”) method, where “format_name” could be “csv”, “json”, “parquet”, “orc”, “jdbc”, and so on. After specifying the format, you can chain .option() or .options() calls to set multiple configuration parameters before finally calling .load(“path_to_data”) to initiate the read. This generic approach is particularly useful for less common formats or when connecting to databases via JDBC, where you need to provide details like the URL, driver, and database table. PySpark’s lazy evaluation means that the data is not actually read until you perform an action on the resulting DataFrame, such as .show() or .count().

Here is an example demonstrating reads from multiple formats:

Python

# Reading a CSV file with options

df_from_csv = spark.read.csv(“my_file.csv”, header=True, inferSchema=True)

 

# Reading a Parquet file (often requires no options)

df_from_parquet = spark.read.parquet(“my_file.parquet”)

 

# Reading a JSON file

df_from_json = spark.read.json(“my_file.json”)

 

# A more general way using .format() and .load()

df_from_orc = spark.read.format(“orc”).load(“my_file.orc”)

 

How do you handle missing data in PySpark?

Handling missing data, which typically appears as null or NaN (Not a Number) values in a DataFrame, is a critical step in any data cleaning process. PySpark’s DataFrame API provides a dedicated DataFrameNaFunctions object, accessible via the .na attribute, which contains all the necessary methods. The most common methods are .dropna() and .fillna(). The .dropna() method is used to eliminate rows or columns that contain missing values. You can control its behavior with parameters like how=”any” (drop a row if any of its values are null) or how=”all” (drop a row only if all of its values are null). You can also provide a subset parameter to specify that the check for nulls should only be performed on a specific subset of columns.

The .fillna() method is used to replace missing data with a specific value. You can fill all nulls with a single constant value, or you can provide a dictionary to specify different replacement values for different columns. For more advanced use cases, such as imputation, PySpark’s machine learning library (pyspark.ml) offers the Imputer transformer. The Imputer can fill missing values using statistical methods, such as the mean, median, or mode of the column. This is generally a more robust approach than filling with a simple constant, as it helps preserve the statistical properties of the data. You fit the Imputer on your data to calculate the statistics and then use the transform method to apply the imputation.

Here are code examples for these techniques:

Python

# Drop rows with any null values

df_cleaned = df_from_csv.dropna(how=”any”)

 

# Fill missing values with a constant (e.g., 0)

df_filled = df_from_parquet.fillna(value=0)

 

# Fill missing values differently for specific columns

df_filled_subset = df_from_json.fillna(value={“price”: 0, “rooms”: “Unknown”})

 

# Impute values using the median

from pyspark.ml.feature import Imputer

 

imputer = Imputer(

    strategy=”median”, 

    inputCols=[“price”, “rooms”], 

    outputCols=[“price_imputed”, “rooms_imputed”]

)

 

model = imputer.fit(df_from_json)

df_imputed = model.transform(df_from_json)

 

How can you cache data in PySpark to improve performance?

Caching is one of the most fundamental optimization techniques in PySpark. Its purpose is to store the results of an intermediate DataFrame or RDD in memory so that it can be accessed quickly in subsequent operations without being recomputed. This is extremely useful because of Spark’s lazy evaluation. By default, every time you call an action, Spark re-evaluates the entire chain of transformations (the DAG) from the original source data. If you have a “hot” DataFrame that you plan to query multiple times (for example, in a machine learning training loop or an interactive analysis session), recomputing it each time is incredibly wasteful. Caching breaks this cycle by saving the computed data.

PySpark provides two main methods for this: .cache() and .persist(). The .cache() method is the simpler of the two; it is a shortcut for .persist(StorageLevel.MEMORY_ONLY). This means it will store the DataFrame in the executors’ memory as deserialized Java objects. If the DataFrame is too large to fit in memory, any partitions that do not fit will be lost and must be recomputed if needed. The .persist() method provides more granular control. It allows you to specify a StorageLevel to define where and how the data is stored. For example, you can choose StorageLevel.MEMORY_AND_DISK (spill to disk if it does not fit in memory), StorageLevel.DISK_ONLY (store on disk only), or StorageLevel.MEMORY_ONLY_SER (store in memory as serialized objects, which is more space-efficient but slower to access). Choosing the right storage level is a trade-off between memory usage, CPU overhead, and the risk of recomputation.

Here is an example of caching:

Python

from pyspark.storagelevel import StorageLevel

 

# Simple caching in memory

df_from_csv.cache()

 

# Persisting data to disk only

df_from_csv.persist(storageLevel=StorageLevel.DISK_ONLY)

 

# Perform an action to trigger the caching

df_from_csv.count()

 

# Now, subsequent operations on df_from_csv will be much faster

df_from_csv.show()

df_from_csv.groupBy(“some_column”).count()

 

Describe how to perform joins in PySpark.

Performing joins to combine two DataFrames is a cornerstone operation in data analysis, and PySpark’s DataFrame API provides a simple yet powerful .join() method. This method is called on one DataFrame and takes the second DataFrame as its first argument. The most critical parameters are on and how. The on parameter specifies the join condition. It can be a simple string if the column name is the same in both DataFrames, a list of strings for a composite key, or a more complex SQL-like expression for advanced conditions, such as non-equi joins or joins with additional filters.

The how parameter defines the type of join to be performed. PySpark supports all standard SQL join types. The default is “inner”, which returns only the rows that have matching keys in both DataFrames. Other common types include “outer” (or “full”, “full_outer”), which returns all rows from both DataFrames, filling in nulls for non-matching keys; “left” (or “left_outer”), which returns all rows from the left DataFrame and the matched rows from the right, filling with nulls on the right if no match is found; and “right” (or “right_outer”), which is the reverse of a left join. PySpark also supports “left_semi” joins, which return only the rows from the left DataFrame that have a match in the right DataFrame, and “left_anti” joins, which return only the rows from the left DataFrame that do not have a match in the right DataFrame.

Here are examples of different join types:

Python

# Assuming df_a and df_b are two DataFrames

 

# Inner join on a single column named “id”

df_a.join(df_b, on=”id”, how=”inner”)

 

# Outer join on a composite key

df_a.join(df_b, on=[“product_id”, “customer_id”], how=”outer”)

 

# Left join with a complex condition where column names differ

df_a.join(df_b, on=df_a.user_id == df_b.u_id, how=”left”)

 

What are the main differences between RDDs, DataFrames, and Datasets in PySpark?

RDDs, DataFrames, and Datasets are the three core data abstractions in Spark, and understanding their evolution is key to understanding modern PySpark. The Resilient Distributed Dataset (RDD) was the original and lowest-level API in Spark. RDDs are immutable, partitioned collections of objects. They offer complete control, as you can define transformations using lambda functions, but they are “schema-unaware.” This means Spark does not know what is inside your RDD; it just sees a collection of Python objects. Because of this, it cannot apply performance optimizations, and operations often suffer from high serialization and deserialization overhead as data is moved between the Python interpreter and the Java Virtual Machine (JVM). RDDs are not type-safe, meaning errors are only caught at runtime.

DataFrames, introduced in Spark 1.3, are the high-level API that solved these problems. A DataFrame is a distributed collection of data organized into named columns, much like a table in a relational database or a pandas DataFrame. DataFrames are built on top of RDDs but, crucially, they have a “schema.” This schema allows Spark to understand the data’s structure and apply powerful optimizations through its Catalyst optimizer and Tungsten execution engine. These optimizations include logical and physical plan generation, predicate pushdown, and code generation, resulting in performance that is orders of magnitude faster than RDDs for structured data. In PySpark, DataFrames are the standard and recommended API for most use cases.

Datasets, introduced in Spark 1.6, are an attempt to merge the best of both worlds: the performance optimizations of DataFrames and the compile-time type-safety of RDDs. In Scala (Spark’s primary language), a DataFrame is simply an alias for Dataset[Row]. Datasets provide a type-safe API, meaning the compiler can check your data types, which catches many errors before you even run your code. However, Python is a dynamically typed language, so the full benefits of the Dataset API are not available. In PySpark, the DataFrame remains the primary abstraction. While you can technically use Dataset-like features, the API is not as clean as in Scala, and for all practical purposes, DataFrame is the abstraction you will and should be using in Python.

Explain the concept of lazy evaluation in PySpark. How does this affect performance?

Lazy evaluation is a fundamental design principle of Apache Spark and is one of its most powerful features. It means that when you apply transformations to a distributed dataset (like a DataFrame or RDD), Spark does not execute them immediately. Instead, Spark constructs a “lineage,” which is a logical plan of the operations you have requested. This plan is represented internally as a Directed Acyclic Graph (DAG). A transformation is an operation like .select(), .filter(), .groupBy(), or .join() that returns a new DataFrame. These operations are merely recorded in the DAG; no data is processed. The computation is deferred and only “triggered” when you call an “action.” An action is an operation that returns a value to the driver program or writes data to an external system. Examples include .show(), .count(), .collect(), and .write().

This lazy evaluation strategy has profound effects on performance and optimization. First, it allows the Catalyst optimizer to analyze the entire logical plan at once. It can look at your chain of transformations and perform sophisticated optimizations before any code is executed. For example, if you filter a DataFrame and then select a few columns, the optimizer is smart enough to “push” the filter down to the data source, meaning it will read only the data it needs, and then it will “prune” the columns, reading only the columns you selected. This dramatically reduces the amount of data that needs to be read from disk and processed. Second, it allows Spark to “pipeline” or “fuse” transformations together into a single “stage,” minimizing the number of times data needs to be passed over, serialized, and deserialized. This deferred execution is the key to Spark’s high performance.

What is the function of partitioning in PySpark? How can it improve performance?

Partitioning is the central concept that enables parallel processing in Spark. A partition is a smaller, logical chunk of your total dataset. A DataFrame or RDD is distributed across the cluster by being broken up into many partitions. Each executor in the Spark cluster is assigned one or more partitions to process, and it performs the computation for its assigned partitions in parallel with other executors. The number of partitions determines the maximum level of parallelism you can achieve. For example, if your DataFrame has 100 partitions, Spark can run up to 100 tasks in parallel (assuming you have at least 100 cores available in your cluster). Finding the right number of partitions is a critical performance tuning task. Too few partitions, and you will not be fully utilizing your cluster’s resources. Too many partitions, and the overhead of managing each small task can become a bottleneck.

Partitioning can be explicitly controlled using two main methods: .repartition() and .coalesce(). The .repartition() method is used to increase or decrease the number of partitions. It performs a “full shuffle,” which means it redistributes the data evenly across the cluster. This is an expensive operation as it involves sending all the data over the network, but it is necessary if you need to increase parallelism or to rebalance data that has become “skewed” (unevenly distributed). The .coalesce() method is a more optimized way to decrease the number of partitions. It avoids a full shuffle by “merging” existing partitions on the same executor, resulting in much less network I/O. Proper partitioning is crucial for performance, especially before expensive operations like joins or aggregations, as it ensures the workload is evenly distributed across all nodes.

Explain the concept of broadcast variables in PySpark and provide a use case.

Broadcast variables are a fundamental feature of Spark’s distributed computing framework designed to optimize a very common and specific scenario: joining a large dataset with a small dataset. By default, when you perform a join between two DataFrames, Spark will perform a “shuffle,” redistributing both DataFrames across the cluster based on their join keys so that matching keys end up on the same executor. This network-heavy shuffle operation is the most expensive part of a join. However, if one of the DataFrames is small enough to fit into the memory of a single executor (e.g., a few megabytes or even a gigabyte), a shuffle is extremely inefficient.

This is where broadcast variables come in. A broadcast variable is a read-only shared variable that is cached on each node in the cluster, rather than being sharded across them. You “broadcast” the small DataFrame, which means the Spark driver sends a copy of the entire small dataset to every single executor. Then, Spark performs a “broadcast join.” The large DataFrame is not shuffled. Instead, each executor processes its local partitions of the large DataFrame and joins them against the local copy of the small DataFrame it already has in memory. This completely eliminates the network shuffle for the large DataFrame, leading to a massive performance improvement. A classic use case is joining a large “facts” table (like a list of user transactions) with a small “dimension” table (like a list of store locations or product categories).

What are the differences between PySpark and pandas?

PySpark and pandas are two of the most popular data manipulation libraries in the Python ecosystem, but they are designed for fundamentally different use cases and architectures. The primary difference is scalability. Pandas is a single-node tool; it loads the entire dataset into the memory (RAM) of a single machine. This makes it incredibly fast and easy to use for small to medium-sized datasets, but it will fail if the dataset is larger than the available RAM. PySpark, as the Python API for Apache Spark, is a distributed processing tool. It processes data across a cluster of many machines, allowing it to handle datasets of virtually any size. Pandas operates on one machine; PySpark operates on many.

This architectural difference leads to other key distinctions. Pandas has a very rich, flexible, and intuitive API for in-depth exploratory data analysis (EDA) and data wrangling. PySpark’s API, while powerful, is more complex and built for distributed computing, which introduces concepts like lazy evaluation, partitioning, and immutable data structures. Pandas operations are “eagerly” executed (they run immediately), while PySpark’s are “lazily” evaluated (they run only when an action is called). In terms of performance, for small datasets that fit in memory, pandas is often faster than PySpark due to the overhead of setting up a Spark job. However, as soon as the dataset size grows, PySpark’s parallel processing capabilities make it exponentially faster. The common workflow is often to use PySpark for large-scale data cleaning and aggregation, and then convert a smaller, aggregated result into a pandas DataFrame using .toPandas() for local analysis or visualization.

How can you convert a Pandas DataFrame to a PySpark DataFrame and vice versa?

Converting between pandas DataFrames and PySpark DataFrames is a very common operation, especially for bridging the gap between large-scale data processing and local analysis or visualization. To convert a pandas DataFrame to a PySpark DataFrame, you use the spark.createDataFrame() method from your SparkSession. You simply pass the existing pandas DataFrame as the argument. Spark will automatically infer the schema from the pandas DataFrame’s data types and create a distributed PySpark DataFrame. This operation requires the driver to serialize the entire pandas DataFrame and send it to the executors, so it should only be done if the pandas DataFrame is small enough to fit in the driver’s memory.

To convert a PySpark DataFrame back to a pandas DataFrame, you use the .toPandas() method on the PySpark DataFrame. This operation performs the opposite action: it collects all the distributed data from all the partitions across the cluster and brings it back to the driver node as a single pandas DataFrame in memory. This is a very convenient operation, but it is also one of the most dangerous. If the PySpark DataFrame is very large (which it often is), calling .toPandas() will attempt to load all of that data into the driver’s memory. If the data is larger than the driver’s available RAM, it will cause an OutOfMemoryError and crash your application. This method should only be used after you have filtered, aggregated, or sampled your PySpark DataFrame down to a small, manageable size.

Here is a code example of the conversion:

Python

import pandas as pd

from pyspark.sql import SparkSession

 

# Initialize SparkSession

spark = SparkSession.builder.appName(“Example”).getOrCreate()

 

# Create a sample Pandas DataFrame

pdf = pd.DataFrame({‘id’: [1, 2, 3], ‘value’: [10, 20, 30]})

 

# 1. Convert Pandas DataFrame to PySpark DataFrame

df_spark = spark.createDataFrame(pdf)

df_spark.show()

 

# 2. Convert PySpark DataFrame back to Pandas DataFrame

# (Ensure df_spark is small enough first!)

pdf_new = df_spark.toPandas()

print(pdf_new)

 

What is a Spark Driver and what are its responsibilities?

The Spark Driver, or driver program, is the central coordinator and “brain” of every Spark application. When you submit a PySpark script, you are launching the driver process. This process can run on a client machine outside the cluster or on a dedicated node within the cluster (in YARN’s cluster mode, for example). The driver has several critical responsibilities. First, it is responsible for running the main() function of your application and creating the SparkSession. Second, it communicates with the cluster manager (like YARN or Kubernetes) to request and allocate resources, specifically the “executors” which are the processes that will do the actual work.

Once the executors are running, the driver’s most important job begins: it analyzes your code, translates your transformations and actions into a logical and physical execution plan (the DAG), and then breaks that plan into smaller “tasks.” The driver then schedules these tasks and sends them to the executors for execution. As the executors run the tasks, they report their status and results back to the driver. The driver monitors the health of all executors and reschedules tasks if an executor fails. Finally, the driver is responsible for collecting the final results of an action (like collect() or count()) and either returning them to the user or writing them to storage. Because the driver is the single point of orchestration, its health is critical; if the driver process fails, the entire application fails.

What is Spark DAG?

The Directed Acyclic Graph (DAG) is the logical execution model at the heart of Apache Spark. It is a graph where each “node” represents an RDD (or the RDD underlying a DataFrame) and each “edge” represents a transformation applied to that RDD. It is “directed” because operations flow in one direction, from the source data to the final result. It is “acyclic” because there are no loops or cycles; you cannot transform a dataset and have it link back to itself. This graph is known as the “lineage” of the data. Spark’s lazy evaluation is what makes this possible. When you call transformations, Spark does not execute anything; it simply adds another node or edge to this graph.

When you call an action, the driver passes this logical DAG to the “DAGScheduler.” The DAGScheduler’s job is to optimize this logical plan and convert it into a physical execution plan. It does this by collapsing transformations that can be done together (pipelining) into “stages.” A stage is a collection of tasks that can be executed together without a “shuffle” of data. The DAGScheduler creates a new stage at each “wide” transformation (a shuffle boundary, like groupBy or join). It then generates the “tasks” (the smallest unit of work) for each stage and passes them to the “TaskScheduler” to be run on the executors. This DAG-based model is what allows Spark to optimize queries, pipeline work efficiently, and, crucially, recover from failures by recomputing lost partitions using the registered lineage.

What are the different types of cluster managers available in Spark?

A cluster manager is the component responsible for acquiring and managing the cluster resources (CPU, memory, nodes) that a Spark application needs to run. Spark itself is agnostic to the cluster manager and is designed to plug into several different types. The most common cluster manager found in production environments is Hadoop YARN (Yet Another Resource Negotiator). YARN is the resource management layer of the Hadoop ecosystem. When you submit a Spark application to a YARN cluster, YARN is responsible for allocating containers for your Spark executors and (optionally) your driver, and it manages the lifecycle of these resources. This is the standard for on-premises Hadoop clusters.

Another increasingly popular cluster manager is Kubernetes. Kubernetes is an open-source container orchestration platform. Spark has native support for Kubernetes, allowing you to run your Spark applications inside Docker containers managed by a Kubernetes cluster. This provides excellent resource isolation, portability, and the ability to run Spark alongside other microservices in a unified, container-based environment. Spark also comes with its own “Standalone” cluster manager. This is a simple, lightweight cluster manager included with Spark itself. While it is easy to set up for testing or small clusters, it is less feature-rich than YARN or Kubernetes and lacks robust security and resource management capabilities. Finally, Spark also supports Apache Mesos, a general-purpose cluster resource manager, although its popularity has waned in favor of Kubernetes.

Describe how you can implement a custom transformation in PySpark.

While PySpark’s DataFrame API provides hundreds of built-in functions, there are times when you need to apply a complex piece of logic that is not easily expressed with standard functions. There are a few ways to implement custom transformations. The most idiomatic and recommended approach for applying a reusable, complex transformation is to use the .transform() method. This method allows youto chain a user-defined function that takes a DataFrame as input and returns a new DataFrame. This is not about creating a UDF (User-Defined Function) for row-level operations, but rather about creating a function that encapsulates a set of DataFrame transformations (like adding columns, filtering, and joining) into a single, logical step. This makes your code much cleaner, more modular, and easier to test.

You define a simple Python function that accepts a DataFrame. Inside this function, you perform all your desired transformations and then return the final, transformed DataFrame. To apply it, you call .transform(your_function_name) on your source DataFrame. This is purely syntactic sugar; it does not change the execution plan, but it dramatically improves code readability by allowing you to chain custom logic in the same way you chain built-in transformations. This is preferable to the alternative of writing a long, hard-to-read sequence of DataFrame assignments.

Here is an example of implementing and using a custom transformation:

Python

# Define a custom transformation function

# This function will add a discounted price column

def get_discounted_price(df, discount_percentage):

    “””

    Takes a DataFrame and a discount percentage, 

    returns a new DataFrame with a ‘discounted_price’ column.

    “””

    return df.withColumn(“discounted_price”, \

                          df.price * (1 – discount_percentage / 100))

 

# Assume df_from_csv has a ‘price’ column

# We can’t pass the second argument directly to .transform()

# So we use a lambda or functools.partial. A lambda is common.

 

# Evoke the transformation using .transform() and a lambda

df_discounted = df_from_csv.transform(lambda df: get_discounted_price(df, 10))

 

df_discounted.show()

 

Explain the concept of window functions in PySpark and provide an example.

Window functions in PySpark are a powerful feature that allows you to perform calculations across a “window,” or a set of rows, that are related to the current row. This is similar to aggregation functions (groupBy), but with a critical difference: a groupBy operation collapses the rows, returning a single row for each group, whereas a window function returns a value for every row. This makes them perfect for tasks like calculating running totals, moving averages, or ranking items within a group. A window function requires two components: the function itself (like rank(), row_number(), sum(), or avg()) and a “Window Specification.”

The Window Specification, defined using the Window class, defines the “window” of rows for the function. It has three main parts. partitionBy() is used to define the groups; it is similar to groupBy and tells Spark to perform the calculation within each specified group (e.g., “per department”). orderBy() is used to order the rows within each partition. This is essential for ranking functions (like rank()) and for functions that depend on order (like lag() or lead(), which get values from previous or next rows). Finally, rowsBetween() or rangeBetween() can be used to define a “frame” for sliding windows, such as “the current row and the 2 preceding rows” for a 3-point moving average.

Here is an example that ranks employees by salary within each department:

Python

from pyspark.sql.window import Window

from pyspark.sql.functions import row_number, rank

 

# Assume we have a DataFrame df_employees with “department” and “salary”

 

# 1. Define the window specification

# We partition by department and order by salary in descending order

window_spec = Window.partitionBy(“department”).orderBy(df_employees.salary.desc())

 

# 2. Apply the window function (e.g., rank())

df_ranked = df_employees.withColumn(“rank_in_department”, rank().over(window_spec))

 

# We can add other functions over the same window

df_ranked = df_ranked.withColumn(“row_number_in_department”, row_number().over(window_spec))

 

df_ranked.show()

 

How do you handle errors and exceptions in PySpark?

Handling errors and exceptions in a distributed system like PySpark is more complex than in a standard single-node Python script. Errors can occur in two main places: on the driver or on the executors. Errors on the driver, such as a syntax error or an issue with setting up the SparkSession, are straightforward. The script will fail, and you will see the exception in your main log, just as you would with any Python program. You can handle these using standard try-except blocks. For example, you might wrap a data-reading operation in a try-except block to catch a FileNotFoundException.

Errors on the executors are more complex. These occur when a task fails on a worker node, for example, due to a NullPointerException, a division-by-zero error in a User-Defined Function (UDF), or a node running out of memory. When a task fails, Spark will automatically retry it a few times (this is configurable). If the task fails repeatedly, the entire stage will fail, and the job will crash, bubbling the executor-side exception up to the driver. To handle potential “bad data” that might cause errors in transformations, you can use try-except blocks within your UDFs and return a null or a default value on failure. For RDDs, you can use the .foreach() operation, which iterates over elements, and place try-except logic inside the loop to handle exceptions on a per-element basis, though this is less common with the modern DataFrame API.

What is the purpose of checkpoints in PySpark?

Checkpointing is an advanced fault-tolerance mechanism in PySpark used to save the state of an RDD or DataFrame to persistent storage, typically HDFS or another distributed filesystem. This is different from caching. Caching stores data in memory (or on local disk) for performance, but it does not sever the RDD’s lineage, the logical plan of transformations. If a cached partition is lost (e.g., an executor fails), Spark can still recompute it using the lineage. Checkpointing, on the other hand, saves the actual data to a reliable file system and truncates the lineage. Once an RDD is checkpointed, Spark “forgets” how it was created and will read from the checkpoint file as if it were a new data source.

The primary purpose of checkpointing is to provide robustness for very long and complex computations. If your application has a DAG that is thousands of stages long, a failure deep in the computation could trigger a massive cascade of re-computation all the way back to the original source. This can be extremely time-consuming. By checkpointing an intermediate DataFrame, you create a “save point.” If any subsequent part of the job fails, the driver can restart the computation from the last checkpoint, rather than from the very beginning. This is particularly crucial in stateful streaming applications, where checkpoints are used to save the running state, but it can also be used in batch jobs to recover from failures in long-running, non-deterministic, or exceptionally complex transformations.

How does PySpark handle schema inference, and how can you explicitly define a schema?

Schema handling is a critical aspect of working with DataFrames. PySpark can handle schemas in two ways: inference or explicit definition. Schema inference is a convenience feature, enabled by setting inferSchema=True when reading data (like a CSV or JSON). When this option is enabled, Spark will make an extra pass over the data to “guess” the data type of each column. For example, it will examine the values in a column and decide if it is an IntegerType, DoubleType, StringType, or TimestampType. While this is very convenient for quick exploration, it is strongly discouraged for production jobs. Schema inference can be slow, as it requires an extra read, and it can be incorrect. For example, it might infer a column as IntegerType when it should be a LongType, or it might misinterpret a string-formatted date.

The best practice is to always explicitly define your schema. You do this by creating a StructType object, which is a collection of StructField objects. Each StructField defines the name of a column, its data type (e.g., StringType(), IntegerType(), BooleanType()), and whether that column is “nullable” (can contain null values). You then pass this custom schema object to the spark.read command. Explicitly defining a schema provides three major benefits: First, it is much faster, as Spark can skip the inference pass and read the data directly. Second, it is robust and prevents data type errors, as Spark will enforce the schema on read, flagging any data that does not conform. Third, it serves as a form of documentation for your data pipeline.

Here is an example of defining and using an explicit schema:

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

 

# 1. Define the schema explicitly

explicit_schema = StructType([

    StructField(“id”, IntegerType(), True),

    StructField(“name”, StringType(), True),

    StructField(“age”, IntegerType(), True),

    StructField(“price”, DoubleType(), True)

])

 

# 2. Use the schema during the read operation

df = spark.read.csv(

    “data.csv”, 

    schema=explicit_schema, 

    header=True  # Assumes header is present, schema is applied by name

)

 

df.printSchema()

 

Explain the differences between narrow and wide transformations in PySpark.

Understanding the difference between narrow and wide transformations is the most important concept for diagnosing performance issues in PySpark. This distinction is all about data movement. A “narrow” transformation is one where each input partition contributes to at most one output partition. In other words, all the data needed to compute a single output partition lives on a single input partition. This means narrow transformations are extremely fast and efficient, as they can be “pipelined” (run together) on a single executor without any data being moved across the network. Examples of narrow transformations include .map(), .filter(), .select(), and .union(). Spark loves narrow transformations.

A “wide” transformation, also known as a shuffle, is one where each input partition can contribute to multiple output partitions. To compute a single output partition, data from all input partitions may be required. This means Spark must perform a “shuffle”: it has to read the data from all partitions, find the relevant pieces, and send them over the network to be re-grouped on new executors. Shuffles are the most expensive operations in Spark. They are slow, I/O intensive, and network-bound. Any operation that needs to “group” data by a key that is not already the basis of partitioning will cause a shuffle. The most common examples of wide transformations include .groupBy(), .join(), .sortBy(), and .repartition(). A key goal of Spark optimization is to minimize the number of shuffles in a job.

What is a Catalyst optimizer in Spark and how does it work?

The Catalyst optimizer is the “secret sauce” behind the high performance of Spark SQL and the DataFrame API. It is a rules-based optimization engine that translates the code you write (whether it is a DataFrame operation or a SQL query) into a highly efficient physical execution plan. The process is complex but can be broken down into a few key phases. First, Catalyst takes your query and converts it into an “unresolved logical plan.” This plan is “unresolved” because it knows what you want to do (e.g., “select ‘name'”) but it has not yet verified if the column “name” actually exists. Next, it uses the “catalog” (which contains your schema information) to “resolve” the plan, checking that all column and table names are correct. This creates a “logical plan.”

This is where the magic happens. Catalyst applies a series of rules to this logical plan to optimize it. These “logical optimizations” include things like “predicate pushdown” (moving a .filter() operation as close to the data source as possible), “projection pruning” (only reading the columns you actually need), and simplifying expressions. After creating an optimized logical plan, Catalyst generates multiple potential “physical plans” for executing it. It then uses a cost-based model to choose the “best” physical plan. For example, it will decide whether to use a “broadcast join” or a “shuffle join” based on the size of your tables. Finally, Catalyst takes this chosen physical plan and generates highly optimized Java bytecode (using a feature called Tungsten) that can be run directly on the executors. This entire process is what allows you to write simple, high-level code while Spark handles the complex, low-level optimization.

Describe how you can implement custom aggregations in PySpark.

While PySpark’s pyspark.sql.functions module provides a vast library of common aggregation functions like sum(), avg(), count(), min(), and max(), you may sometimes need to implement a custom aggregation. The most common approach is to use the .groupBy() method followed by the .agg() method. The .agg() method can take a variety of functions from the pyspark.sql.functions module, allowing you to perform multiple aggregations at once.

For truly custom logic that is not available as a built-in, you have a couple of options. The most performant and modern approach, for operations that can be vectorized, is to use the .applyInPandas() method on a GroupedData object. This function, which leverages Apache Arrow, splits your PySpark DataFrame into groups, converts each group into a small pandas DataFrame, applies your custom Python function (which uses pandas syntax) to that group, and then combines the results back into a PySpark DataFrame. This is extremely powerful for applying complex logic, like a custom normalization or a small machine learning model, to each group. A much older and slower method, not generally recommended, is to define a User-Defined Aggregate Function (UDAF). This is complex to implement correctly and has significant performance overhead, so .applyInPandas is preferred.

Here are examples of both standard and custom aggregations:

Python

from pyspark.sql import functions as F

 

# Standard aggregations using groupBy and agg

df_agg = df_from_csv.groupBy(“house_id”) \

    .agg(

        F.mean(“price”).alias(“avg_price”),

        F.count(“id”).alias(“num_houses”)

    )

df_agg.show()

 

# Custom aggregation using applyInPandas

# Let’s normalize the price within each group

def normalize_price(df):

    # ‘df’ here is a pandas DataFrame for a single group

    disc_price = df[“discounted_price”]

    df[“normalized_price”] = (disc_price – disc_price.mean()) / disc_price.std()

    return df  # Return the modified pandas DataFrame

 

# The schema must be defined for the output

schema = df_from_csv.schema.add(“normalized_price”, DoubleType())

 

df_normalized = df_from_csv.groupBy(“house_id”) \

    .applyInPandas(normalize_price, schema=schema)

 

df_normalized.show()

 

What challenges did you face when working with large datasets in PySpark? How did you overcome them?

This is a common behavioral and technical question designed to assess your real-world experience. A great answer will describe a specific problem, the symptoms, how you diagnosed it, and how you fixed it. One of the most common challenges is “data skew.” This happens when your data is not evenly distributed across partitions. For example, if you group by user_id, and one “power user” has 90% of the data, one partition will be massive while all others are tiny. This leads to a situation where 99 out of 100 tasks finish in seconds, but the entire job is stuck waiting for one massive task to complete. You can diagnose this by looking at the Spark UI and seeing the vast difference in task completion times.

To overcome data skew, one technique is “salting.” This involves modifying the join or group-by key to spread the data more evenly. For the “power user” example, you could append a random number (a “salt”) to the user_id, for example, ‘user_123’ might become ‘user_123_1’, ‘user_123_2’, etc. You do this on both sides of a join, which breaks the single large partition into multiple smaller, more manageable partitions. Other common challenges include memory management (like OutOfMemoryErrors), which are often solved by increasing executor memory or, more smartly, by optimizing the code to use less memory (e.g., caching, broadcasting). Slow shuffles are another challenge, often fixed by better partitioning, using broadcast joins, or reformulating the query to avoid the shuffle altogether.

How do you integrate PySpark with other tools and technologies in the Big Data ecosystem?

PySpark’s strength lies not just in its processing power but also in its deep integration with the wider big data ecosystem. It is designed to be the “glue” in a modern data stack. This integration is achieved through a rich sett of “connectors.” For storage, PySpark has native, high-performance connectors for Hadoop Distributed File System (HDFS), as well as cloud-based storage like Amazon S3 and Google Cloud Storage. These connectors allow you to read and write data directly from these storage systems as if they were a local filesystem. For data warehousing, PySpark integrates with Apache Hive. It can read from and write to Hive tables and even execute HiveQL queries directly from a SparkSession.

For real-time data, the Spark Structured Streaming module has built-in connectors for Apache Kafka, allowing you to process continuous streams of data. You can read from a Kafka topic, perform transformations, and write the results to another Kafka topic or a database. For databases, PySpark provides a generic JDBC connector (spark.read.jdbc and df.write.jdbc) that allows it to connect to any relational database like PostgreSQL, MySQL, or SQL Server. Additionally, there are many third-party, high-performance connectors for NoSQL databases like Apache HBase, Cassandra, and Elasticsearch, which are provided as separate libraries. This “connector-based” architecture makes PySpark a versatile engine that can sit at the center of a complex data pipeline, pulling data from diverse sources and pushing it to various destinations.

What are the best practices for testing and debugging PySpark applications?

Testing and debugging PySpark applications are crucial for production readiness. For testing, it is important to write unit tests. You should not need to spin up a full Spark cluster to test your transformation logic. A common practice is to write unit tests for your custom transformation functions using a testing library like pytest. You can create small, local pandas DataFrames as your “input” data, and a pandas DataFrame for your “expected” output. You can then test your transformation logic in isolation. For integration testing, where you need to test the PySpark code itself, libraries exist that can help create a local SparkSession within your test environment, allowing you to run assertions on PySpark DataFrames.

Debugging is often done by “failing fast” and “inspecting data.” Since PySpark is lazy, your code might run for 20 minutes before failing at the very end. To debug, it is a good practice to insert “actions” like .show(), .printSchema(), or .count() at various stages of your transformation pipeline. This forces Spark to execute the DAG up to that point and lets you inspect the intermediate state of your DataFrame to see where the data is going wrong. The most powerful debugging tool, however, is the Spark UI. This web interface (typically available at port 4040 on the driver) provides a detailed, visual representation of your jobs, stages, and tasks. You can see your execution DAG, check for tasks that are failing, analyze which stages are taking the most time, and inspect error logs and stack traces from the executors. Learning to read the Spark UI is the most essential skill for troubleshooting performance issues.

How would you handle data security and privacy issues in a PySpark environment?

Data security and privacy are critical concerns in any data processing environment, especially with large, sensitive datasets. In PySpark, security is a multi-layered problem. First, at the infrastructure level, you rely on the cluster manager (like YARN or Kubernetes) for authentication and authorization. This often involves integrating with systems like Kerberos for secure authentication between the driver, executors, and data sources. Data encryption “at rest” is another key component, where data stored in HDFS, S3, or GCS is encrypted on the disk. Data encryption “in transit” is also essential, ensuring that all data sent over the network (between executors, or between the driver and executors) is encrypted, typically using SSL/TLS.

Within PySpark itself, you can implement column-level security. If a DataFrame contains sensitive information like a social security number or a credit card number, you should not display it in plain text. PySpark provides built-in functions in its pyspark.sql.functions module, such as aes_encrypt() and aes_decrypt(), which allow you to encrypt and decrypt specific columns in your DataFrame using a shared key. This ensures that the sensitive data is unreadable, even in intermediate storage or logs. Another common practice is data “tokenization” or “masking,” where you replace a sensitive value with a non-sensitive one, either an irreversible hash or a format-preserving token. This allows analysts to work with the data for aggregations without ever being exposed to the raw, private information.

Describe how to use PySpark to create and deploy a machine learning model.

PySpark comes with a scalable machine learning library called MLlib, which is designed to run ML algorithms on large datasets in a distributed manner. The pyspark.ml package, which is built on DataFrames, is the modern and recommended API. The process follows a “pipeline” metaphor. First, you perform data preprocessing and feature engineering. This involves using “Transformers” (like StringIndexer to convert string labels to numbers, VectorAssembler to combine feature columns into a single vector) and “Estimators” (like Imputer or StandardScaler, which must be fit on the data). The VectorAssembler is critical, as MLlib expects all features to be in a single column of type Vector.

Once your data is preprocessed, you choose an “Estimator,” which is the machine learning algorithm itself, such as LinearRegression, LogisticRegression, or RandomForestClassifier. You “fit” this estimator on your feature-engineered data, which trains the algorithm and produces a “Model,” which is a “Transformer.” This model can then be used to .transform() new, unseen data to generate predictions. The real power of pyspark.ml is the Pipeline object. You can chain all your steps (indexing, scaling, assembling, and the model itself) into a single Pipeline object. This Pipeline is an “Estimator” that you can fit once on your training data. The resulting PipelineModel can be saved to disk and reloaded later for batch or streaming inference, encapsulating your entire preprocessing and modeling logic in a single, deployable artifact.

How can you optimize shuffling operations in PySpark?

Shuffling is the most expensive operation in PySpark, so optimizing it is key to good performance. The goal is to either avoid shuffles altogether or to make them as efficient as possible. The best way to avoid a shuffle is to use a “broadcast join.” If you are joining a large DataFrame to a small one, you can call broadcast() on the small DataFrame. This will send a copy of the small DataFrame to every executor, allowing the join to be completed without shuffling the large DataFrame, as discussed earlier. You should always prefer this when possible.

If a shuffle is unavoidable (e.g., in a groupBy or a join of two large tables), the key is to ensure the data is partitioned correctly before the shuffle. If your data is skewed, one executor will do all the work. You can “repartition” your DataFrame by the join key before the join. This pre-shuffles the data, and if you repartition both DataFrames on the same key with the same number of partitions, the subsequent join may be much faster as it knows the data is already co-located. Another critical setting is spark.sql.shuffle.partitions. This configuration determines how many partitions are created after a shuffle. The default is 200, which might be too high for small datasets (creating too much task overhead) or too low for massive datasets (creating partitions that are too large and spill to disk). Tuning this number to an appropriate value (e.t., 1-2 times the number of cores in your cluster) is a common optimization.

Describe how you would optimize a PySpark job that is running slowly. What are the main factors you would consider?

When a PySpark job is running slowly, I follow a systematic process of diagnosis and optimization. The first and most important step is to check the Spark UI. The UI is the primary tool for understanding why a job is slow. I would look at the “Stages” tab to identify the bottleneck. Is there one stage that is taking 90% of the time? If so, what operation does it correspond to? (e.g., a join or groupBy). I would then look at the task metrics for that stage. Are all tasks taking a long time, or is it just one or two “straggler” tasks? Straggler tasks are a classic sign of data skew.

Once I have identified the bottleneck, the optimization strategy depends on the cause. If the problem is data skew, I would implement a salting strategy. If the slow stage is a join, I would check the size of the tables. Is one of them small enough to broadcast? If so, I would implement a broadcast join. If not, I would ensure both DataFrames are partitioned on the join key before the join. If the job is slow due to disk spills, I would look at the “Storage” tab in the UI. This means my executors do not have enough memory. I could either increase the executor memory or, preferably, optimize the code by caching intermediate DataFrames that are reused, or by explicitly defining a schema to reduce memory footprint. Finally, I would check the number of partitions. I might need to repartition to increase parallelism or coalesce to reduce overhead.

How do you ensure fault tolerance in PySpark applications?

Fault tolerance, the ability of a system to continue operating despite failures, is a core design principle of Apache Spark. The primary mechanism for this is the Resilient Distributed Dataset (RDD) and its “lineage.” Every RDD (and the RDD underlying every DataFrame) keeps a “genealogical” record of how it was created from other RDDs, all the way back to the original data source. This lineage is a logical plan, not the data itself. If an executor fails and a partition of data is lost, Spark does not panic. The driver simply looks at the lineage graph and instructs another executor to recompute the lost partition by re-running the same transformation steps. This allows Spark to recover from failures automatically without any data loss.

For more complex scenarios, Spark provides additional fault-tolerance strategies. One is “checkpointing.” As discussed earlier, for jobs with extremely long lineages, recomputing from scratch can be too slow. Checkpointing allows you to truncate the lineage by saving an intermediate DataFrame to persistent storage. If a failure occurs after the checkpoint, Spark only needs to recompute from that save point. Another strategy is “speculative execution.” Sometimes a task is not “failed” but is just running very slowly (a “straggler,” perhaps due to a faulty hardware node). Spark can be configured to “speculatively” launch a copy of that same task on another, healthy node. Whichever task finishes first “wins,” and the other is killed. This helps mitigate the impact of slow nodes and improves overall job robustness.

What are the different ways to implement and manage PySpark applications?

Implementing and managing PySpark applications in a production environment involves more than just writing a script. You need a robust system for submission, resource management, and scheduling. The most common way to manage applications, especially in on-premises Hadoop environments, is to use YARN (Yet Another Resource Negotiator). You use the spark-submit command and specify YARN as the “master.” YARN then handles allocating containers for your driver and executors, managing resources, and logging. In this setup, you can run in “client mode” (where the driver runs on the machine you submitted from) or “cluster mode” (where YARN allocates a new container for the driver to run inside the cluster, which is more robust for long-running jobs).

In recent years, Kubernetes has become an extremely popular way to manage Spark applications. Spark has native support for Kubernetes, allowing you to spark-submit your application to a Kubernetes cluster. Kubernetes will schedule your driver and executor pods just like any other containerized workload. This is a very flexible approach, as it allows for fine-grained resource and dependency management (by packaging dependencies into your Docker image) and co-locating Spark with other microservices. Finally, there are fully managed platforms. These are commercial cloud-based services that abstract away all the complexity of cluster management. You simply provide your code (e.g., in a notebook), and the platform handles the creation, scaling, and termination of clusters on your behalf, often providing additional features for collaboration, scheduling, and monitoring.

How would you monitor and troubleshoot PySpark jobs running in a production environment?

Monitoring and troubleshooting a production PySpark job is a multi-layered process. The first and most important tool is the Spark UI. Even for a completed or failed job, you should use the Spark History Server, which saves the Spark UI for later analysis. In the UI, I look at the “Jobs” tab to see the overall status, the “Stages” tab to find bottlenecks, the “Executors” tab to check for memory usage and GC (garbage collection) time, and the “Environment” tab to confirm my configurations were applied. The “Storage” tab is critical for seeing if data is being cached properly or if it is “spilling” to disk, which is a major performance killer.

Beyond the Spark UI, the next step is detailed logging. You should configure your PySpark application with a proper logging level (e.g., INFO or WARN) to capture detailed information about its execution. When a job fails, the executor logs are the most important place to find the root cause, as this is where you will see the actual exception or error message. These logs are managed by the cluster manager (YARN or Kubernetes) and can be accessed through their respective UIs. Finally, for long-term production monitoring, you use a dedicated monitoring system. Spark exposes a rich set of metrics (like tasks running, shuffle size, memory usage) via various “sinks” (e.g., JMX, Prometheus). You can collect these metrics in a time-series database and build dashboards to monitor the health and performance of your Spark jobs over time, set up alerts for failures, and identify performance regressions.

Explain the difference between dynamic and static allocation in Spark and when you might choose one over the other.

Resource allocation is a critical configuration choice that dictates how your application uses cluster resources. “Static allocation” is the default and simplest model. When you submit your application, you request a fixed number of executors, each with a fixed amount of memory and cores (e.g., spark-submit –num-executors 20). The cluster manager grants you these 20 executors, and your application holds them for its entire duration, whether it is using them or not. This is a good choice for dedicated, high-priority, production-level batch jobs where you know the resource requirements in advance and want to guarantee that resources are available, ensuring predictable performance. The downside is that it is inefficient; if your job has idle periods, those resources are “wasted” and cannot be used by other applications.

“Dynamic allocation” is a more flexible and efficient model. With dynamic allocation, you do not request a fixed number of executors. Instead, you give Spark a range (a minimum and maximum). Spark will start with a small number of executors and then dynamically “scale up” by requesting more from the cluster manager as the workload demand increases (e.g., a large shuffle begins). Crucially, it will also “scale down” by releasing executors that have been idle for a certain period. This is the preferred model for environments where many users or applications are sharing a single cluster. It improves overall resource utilization and reduces costs. You would choose dynamic allocation for interactive (notebook) sessions and for most jobs where the workload is variable, as long as you are willing to accept a small latency hit as new executors are provisioned.

How do you decide between using DataFrames and RDDs in PySpark?

This is a critical architectural decision. In modern PySpark (Spark 2.0 and later), the default and strongly recommended choice for almost all use cases is the DataFrame API. The primary reason is performance. DataFrames are built on the Catalyst optimizer and Tungsten execution engine. Because they have a schema, Spark can understand your data’s structure and perform massive optimizations, such as predicate pushdown, projection pruning, and code generation. This makes DataFrame operations orders of magnitude faster than RDD operations for structured data. DataFrames also provide a much richer, higher-level API with a wide range of SQL-like functions, making complex transformations easier to write and read.

You should only “drop down” to the RDD API in a few specific situations. The most common reason is when you are working with completely unstructured data, such as lines of free-form text or complex binary data, where a rigid schema does not apply. The RDD API gives you fine-grained, low-level control to map and filter these individual objects using pure Python functions. Another reason might be if you need to perform a very specific, low-level partitioning or data placement task that is not exposed through the DataFrame API. However, these cases are rare. The general rule is: always start with DataFrames. Only use RDDs when you have a specific, well-justified reason that the DataFrame API cannot handle, and be aware that you are likely sacrificing performance.

Conclusion

Incremental data processing, or processing only the new or changed data since the last run, is essential for efficient data pipelines that operate on continuously growing datasets. The most modern and robust way to implement this in PySpark is by using Spark Structured Streaming. Despite its name, Structured Streaming can be run in “trigger-once” mode, which makes it perfect for incremental batch jobs. You define your source (e.g., a directory of Parquet files) and your transformations. Spark will automatically manage “checkpoints” to keep track of which files it has already processed. When you run the job, it will only read the new files that have arrived since the last run, apply the transformations, and update the output. This is a very powerful and resilient pattern.

To make this pattern even more robust, it is often combined with transactional data formats. A common format provides features like “time travel” (reading the state of a table at a previous point in time) and “upserts” (atomically merging new data). When you use Structured Streaming to write to one of these formats, you can efficiently handle incremental updates, inserts, and deletes. For example, you can process a change-data-capture (CDC) log from a database and use the merge command to apply those changes to your large target table. This combination of Structured Streaming and a transactional format provides an end-to-end, fault-tolerant, and highly-efficient solution for incremental data processing.