How to control partition size in Spark SQL

Spark < 2.0: You can use Hadoop configuration options: mapred.min.split.size. mapred.max.split.size as well as HDFS block size to control partition size for filesystem based formats*. val minSplit: Int = ??? val maxSplit: Int = ??? sc.hadoopConfiguration.setInt(“mapred.min.split.size”, minSplit) sc.hadoopConfiguration.setInt(“mapred.max.split.size”, maxSplit) Spark 2.0+: You can use spark.sql.files.maxPartitionBytes configuration: spark.conf.set(“spark.sql.files.maxPartitionBytes”, maxSplit) In both cases these values may not … Read more

How to find all partitions of a set

I’ve found a straightforward recursive solution. First, let’s solve a simpler problem: how to find all partitions consisting of exactly two parts. For an n-element set, we can count an int from 0 to (2^n)-1. This creates every n-bit pattern, with each bit corresponding to one input element. If the bit is 0, we place … Read more

Partitioning in spark while reading from RDBMS via JDBC

If you don’t specify either {partitionColumn, lowerBound, upperBound, numPartitions} or {predicates} Spark will use a single executor and create a single non-empty partition. All data will be processed using a single transaction and reads will be neither distributed nor parallelized. See also: How to optimize partitioning when migrating data from JDBC source? How to improve … Read more

How to define partitioning of DataFrame?

Spark >= 2.3.0 SPARK-22614 exposes range partitioning. val partitionedByRange = df.repartitionByRange(42, $”k”) partitionedByRange.explain // == Parsed Logical Plan == // ‘RepartitionByExpression [‘k ASC NULLS FIRST], 42 // +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6] // // == Analyzed Logical Plan == // k: string, v: int // RepartitionByExpression [k#5 ASC NULLS FIRST], 42 … Read more

How does HashPartitioner work?

Well, lets make your dataset marginally more interesting: val rdd = sc.parallelize(for { x <- 1 to 3 y <- 1 to 2 } yield (x, None), 8) We have six elements: rdd.count Long = 6 no partitioner: rdd.partitioner Option[org.apache.spark.Partitioner] = None and eight partitions: rdd.partitions.length Int = 8 Now lets define small helper to … Read more