Compare commits

...

2 Commits

Author SHA1 Message Date
David Kunzmann
6916af074b Create rule S7194: PySpark broadcasting should be used when joining a small DataFrame to a larger DataFrame. 2025-01-30 15:40:05 +01:00
joke1196
f9500f5c0e Create rule S7194 2025-01-30 13:21:41 +00:00
3 changed files with 97 additions and 0 deletions

View File

@ -0,0 +1,2 @@
{
}

View File

@ -0,0 +1,25 @@
{
"title": "PySpark broadcasting should be used when joining a small DataFrame to a larger DataFrame",
"type": "CODE_SMELL",
"status": "ready",
"remediation": {
"func": "Constant\/Issue",
"constantCost": "5min"
},
"tags": [
"data-science",
"pyspark"
],
"defaultSeverity": "Major",
"ruleSpecification": "RSPEC-7194",
"sqKey": "S7194",
"scope": "All",
"defaultQualityProfiles": ["Sonar way"],
"quickfix": "unknown",
"code": {
"impacts": {
"RELIABILITY": "LOW"
},
"attribute": "EFFICIENT"
}
}

View File

@ -0,0 +1,70 @@
This rule raises an issue when a small DataFrame is joined to another DataFrame without the use of the broadcast operation.
== Why is this an issue?
In PySpark, shuffling refers to the process of transferring data between worker nodes within a cluster.
This operation, while necessary for tasks such as join and aggregation on DataFrames, can be resource-intensive.
Although Spark handles shuffling automatically, there are strategies to minimize it, thereby enhancing the performance of these operations.
When performing join operations with multiple DataFrames in PySpark, it is crucial to consider the size of the DataFrames involved.
If a small DataFrame is being joined to a larger one, utilizing the `broadcast` function to distribute the small DataFrame across all worker nodes can be beneficial.
This approach significantly reduces the volume of data shuffled between nodes, thereby improving the efficiency of the join operation.
== How to fix it
To fix this issue, use the `broadcast` function on the small DataFrame before performing the join operation.
=== Code examples
==== Noncompliant code example
[source,python,diff-id=1,diff-type=noncompliant]
----
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myspark').getOrCreate()
data = [
(1, "Alice"),
(2, "Bob"),
(2, "Charlie"),
(1, "Dan"),
(2, "Elsa")
]
large_df = spark.createDataFrame(data, ["department_id", "name"])
small_df = spark.createDataFrame([(1, 'HR'), (2, 'Finance')], ["department_id", "department"])
joined_df = large_df.join(small_df, on="department_id", how="left") # NonCompliant: the small DataFrame is not broadcasted
----
==== Compliant solution
[source,python,diff-id=1,diff-type=compliant]
----
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName('myspark').getOrCreate()
data = [
(1, "Alice"),
(2, "Bob"),
(2, "Charlie"),
(1, "Dan"),
(2, "Elsa")
]
large_df = spark.createDataFrame(data, ["department_id", "name"])
small_df = spark.createDataFrame([(1, 'HR'), (2, 'Finance')], ["department_id", "department"])
joined_df = large_df.join(broadcast(small_df), on="department_id", how="left") # Compliant
----
== Resources
=== Documentation
* PySpark Documentation - https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.broadcast.html[pyspark.sql.functions.broadcast]
=== Articles & blog posts
* Medium Article - https://aspinfo.medium.com/what-is-broadcast-join-how-to-perform-broadcast-in-pyspark-699aef2eff5a[What is broadcast join, how to perform broadcast in pyspark?]