How to log using log4j to local file system inside a Spark application that runs on YARN?

It looks like you’ll need to append to the JVM arguments used when launching your tasks/jobs. Try editing conf/spark-defaults.conf as described here spark.executor.extraJavaOptions=-Dlog4j.configuration=file:/apps/spark-1.2.0/conf/log4j.properties spark.driver.extraJavaOptions=-Dlog4j.configuration=file:/apps/spark-1.2.0/conf/log4j.properties Alternatively try editing conf/spark-env.sh as described here to add the same JVM argument, although the entries in conf/spark-defaults.conf should work. If you are still not getting any joy, you can explicitly … Read more

Calculating the averages for each KEY in a Pairwise (K,V) RDD in Spark with Python

Now a much better way to do this is to use the rdd.aggregateByKey() method. Because this method is so poorly documented in the Apache Spark with Python documentation — and is why I wrote this Q&A — until recently I had been using the above code sequence. But again, it’s less efficient, so avoid doing … Read more

Mind blown: RDD.zip() method

It is not true that RDDs are always unordered. An RDD has a guaranteed order if it is the result of a sortBy operation, for example. An RDD is not a set; it can contain duplicates. Partitioning is not opaque to the caller, and can be controlled and queried. Many operations do preserve both partitioning … Read more

What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?

If you’re running out of memory on the shuffle, try setting spark.sql.shuffle.partitions to 2001. Spark uses a different data structure for shuffle book-keeping when the number of partitions is greater than 2000: private[spark] object MapStatus { def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { if (uncompressedSizes.length > 2000) { HighlyCompressedMapStatus(loc, uncompressedSizes) } else { new … Read more

pyspark: count distinct over a window

EDIT: as noleto mentions in his answer below, there is now approx_count_distinct available since PySpark 2.1 that works over a window. Original answer – exact distinct count (not an approximation) We can use a combination of size and collect_set to mimic the functionality of countDistinct over a window: from pyspark.sql import functions as F, Window … Read more

Understanding Spark’s caching

It would seem that Option B is required. The reason is related to how persist/cache and unpersist are executed by Spark. Since RDD transformations merely build DAG descriptions without execution, in Option A by the time you call unpersist, you still only have job descriptions and not a running execution. This is relevant because a … Read more

Spark iterate HDFS directory

You can use org.apache.hadoop.fs.FileSystem. Specifically, FileSystem.listFiles([path], true) And with Spark… FileSystem.get(sc.hadoopConfiguration).listFiles(…, true) Edit It’s worth noting that good practice is to get the FileSystem that is associated with the Path‘s scheme. path.getFileSystem(sc.hadoopConfiguration).listFiles(path, true)

What’s the difference between Spark ML and MLLIB packages

o.a.s.mllib contains old RDD-based API while o.a.s.ml contains new API build around Dataset and ML Pipelines. ml and mllib reached feature parity in 2.0.0 and mllib is slowly being deprecated (this already happened in case of linear regression) and most likely will be removed in the next major release. So unless your goal is backward … Read more

When are accumulators truly reliable?

To answer the question “When are accumulators truly reliable ?” Answer : When they are present in an Action operation. As per the documentation in Action Task, even if any restarted tasks are present it will update Accumulator only once. For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the … Read more

Spark specify multiple column conditions for dataframe join

There is a Spark column/expression API join for such case: Leaddetails.join( Utm_Master, Leaddetails(“LeadSource”) <=> Utm_Master(“LeadSource”) && Leaddetails(“Utm_Source”) <=> Utm_Master(“Utm_Source”) && Leaddetails(“Utm_Medium”) <=> Utm_Master(“Utm_Medium”) && Leaddetails(“Utm_Campaign”) <=> Utm_Master(“Utm_Campaign”), “left” ) The <=> operator in the example means “Equality test that is safe for null values”. The main difference with simple Equality test (===) is that the … Read more