What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?

If you’re running out of memory on the shuffle, try setting spark.sql.shuffle.partitions to 2001. Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000: private[spark] object MapStatus { def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { if (uncompressedSizes.length > 2000) { HighlyCompressedMapStatus(loc, uncompressedSizes) } else { new … Read more

pyspark: count distinct over a window

EDIT: as noleto mentions in his answer below, there is now approx_count_distinct available since PySpark 2.1 that works over a window. Original answer – exact distinct count (not an approximation) We can use a combination of size and collect_set to mimic the functionality of countDistinct over a window: from pyspark.sql import functions as F, Window … Read more

Spark specify multiple column conditions for dataframe join

There is a Spark column/expression API join for such case: Leaddetails.join( Utm_Master, Leaddetails(“LeadSource”) <=> Utm_Master(“LeadSource”) && Leaddetails(“Utm_Source”) <=> Utm_Master(“Utm_Source”) && Leaddetails(“Utm_Medium”) <=> Utm_Master(“Utm_Medium”) && Leaddetails(“Utm_Campaign”) <=> Utm_Master(“Utm_Campaign”), “left” ) The <=> operator in the example means “Equality test that is safe for null values”. The main difference with simple Equality test (===) is that the … Read more

Apache Spark — Assign the result of UDF to multiple dataframe columns

It is not possible to create multiple top level columns from a single UDF call but you can create a new struct. It requires an UDF with specified returnType: from pyspark.sql.functions import udf from pyspark.sql.types import StructType, StructField, FloatType schema = StructType([ StructField(“foo”, FloatType(), False), StructField(“bar”, FloatType(), False) ]) def udf_test(n): return (n / 2, … Read more

How to sort by column in descending order in Spark SQL?

You can also sort the column by importing the spark sql functions import org.apache.spark.sql.functions._ df.orderBy(asc(“col1”)) Or import org.apache.spark.sql.functions._ df.sort(desc(“col1″)) importing sqlContext.implicits._ import sqlContext.implicits._ df.orderBy($”col1″.desc) Or import sqlContext.implicits._ df.sort($”col1”.desc)

Spark DataFrames when udf functions do not accept large enough input variables

User defined functions are defined for up to 22 parameters. Only udf helper is define for at most 10 arguments. To handle functions with larger number of parameters you can use org.apache.spark.sql.UDFRegistration. For example val dummy = (( x0: Int, x1: Int, x2: Int, x3: Int, x4: Int, x5: Int, x6: Int, x7: Int, x8: … Read more

How to calculate the size of dataframe in bytes in Spark?

Usingspark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes we can get the size of actual Dataframe once its loaded into memory. Check the below code. scala> val df = spark.read.format(“orc”).load(“/tmp/srinivas/”) df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string … 75 more fields] scala> import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes bytes: BigInt = 763275709 scala> FileUtils.byteCountToDisplaySize(bytes.toLong) res5: String = 727 MB … Read more

More than one hour to execute pyspark.sql.DataFrame.take(4)

While Spark supports a limited predicate pushdown over JDBC all other operations, like limit, group, aggregations are performed internally. Unfortunately it means that take(4) will fetch data first and then apply the limit. In other words your database will execute (assuming no projections an filters) something equivalent to: SELECT * FROM table and the rest … Read more

Keep only duplicates from a DataFrame regarding some field

One way to do this is by using a pyspark.sql.Window to add a column that counts the number of duplicates for each row’s (“ID”, “ID2”, “Number”) combination. Then select only the rows where the number of duplicate is greater than 1. import pyspark.sql.functions as f from pyspark.sql import Window w = Window.partitionBy(‘ID’, ‘ID2’, ‘Number’) df.select(‘*’, … Read more