You cannot access any of Spark’s “driver-side” abstractions (RDDs, DataFrames, Datasets, SparkSession…) from within a function passed on to one of Spark’s DataFrame/RDD transformations. You also cannot update driver-side mutable objects from within these functions.
In your case – you’re trying to use prodRows
and selection
(both are DataFrames) within a function passed to DataFrame.foreach
. You’re also trying to update listOfProducts
(a local driver-side variable) from within that same function.
Why?
- DataFrames, RDDs, and SparkSession only exist on your Driver application. They serve as a “handle” to access data distributed over the cluster of worker machines.
- Functions passed to RDD/DataFrame transformations get serialized and sent to that cluster, to be executed on the data partitions on each of the worker machines. When the serialized DataFrames/RDDs get deserialized on those machines – they are useless, they can’t still represent the data on the cluster as they are just hollow copies of the ones created on the driver application, which actually maintains a connection to the cluster machines
- For the same reason, attempting to update driver-side variables will fail: the variables (starting out as empty, in most cases) will be serialized, deserialized on each of the workers, get updated locally on the workers, and stay there… the original driver-side variable will remain unchanged
How can you solve this?
When working with Spark, especially with DataFrames, you should try to avoid “iteration” over the data, and use DataFrame’s declarative operations instead. In most cases, when you want to reference data of another DataFrame for each record in your DataFrame, you’d want to use join
to create a new DataFrame with records combining data from the two DataFrames.
In this specific case, here’s a roughly equivalent solution that does what you’re trying to do, if I managed to conclude it correctly. Try to use this and read the DataFrame documentation to figure out the details:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
val numRecProducts = 10
val result = prodRows.as("left")
// self-join by gender:
.join(prodRows.as("right"), $"left.gender_PK" === $"right.gender_PK" || $"right.gender_PK" === "UNISEX")
// limit to 10 results per record:
.withColumn("rn", row_number().over(Window.partitionBy($"left.product_PK").orderBy($"right.product_PK")))
.filter($"rn" <= numRecProducts).drop($"rn")
// group and collect_list to create products column:
.groupBy($"left.product_PK" as "product_PK")
.agg(collect_list(struct($"right.product_PK", lit(1))) as "products")