pyspark: count distinct over a window

EDIT: as noleto mentions in his answer below, there is now approx_count_distinct available since PySpark 2.1 that works over a window. Original answer – exact distinct count (not an approximation) We can use a combination of size and collect_set to mimic the functionality of countDistinct over a window: from pyspark.sql import functions as F, Window … Read more

Apache Spark — Assign the result of UDF to multiple dataframe columns

It is not possible to create multiple top level columns from a single UDF call but you can create a new struct. It requires an UDF with specified returnType: from pyspark.sql.functions import udf from pyspark.sql.types import StructType, StructField, FloatType schema = StructType([ StructField(“foo”, FloatType(), False), StructField(“bar”, FloatType(), False) ]) def udf_test(n): return (n / 2, … Read more

More than one hour to execute pyspark.sql.DataFrame.take(4)

While Spark supports a limited predicate pushdown over JDBC all other operations, like limit, group, aggregations are performed internally. Unfortunately it means that take(4) will fetch data first and then apply the limit. In other words your database will execute (assuming no projections an filters) something equivalent to: SELECT * FROM table and the rest … Read more

Keep only duplicates from a DataFrame regarding some field

One way to do this is by using a pyspark.sql.Window to add a column that counts the number of duplicates for each row’s (“ID”, “ID2”, “Number”) combination. Then select only the rows where the number of duplicate is greater than 1. import pyspark.sql.functions as f from pyspark.sql import Window w = Window.partitionBy(‘ID’, ‘ID2’, ‘Number’) df.select(‘*’, … Read more

How to add a SparkListener from pySpark in Python?

It is possible although it is a bit involved. We can use Py4j callback mechanism to pass message from a SparkListener. First lets create a Scala package with all required classes. Directory structure: . ├── build.sbt └── src └── main └── scala └── net └── zero323 └── spark └── examples └── listener ├── Listener.scala ├── … Read more

Why does Spark think this is a cross / Cartesian join

This happens because you join structures sharing the same lineage and this leads to a trivially equal condition: res2.explain() == Physical Plan == org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans Join Inner, ((idx#204L = key1#209L) && (key2#210L = idx#204L)) :- Filter isnotnull(idx#204L) : +- LogicalRDD [idx#204L, val#205] +- Filter ((isnotnull(key2#210L) && (key2#210L … Read more