+91-9916812177 | contact@beingdatum.com

Advanced Performance tuning in Spark

A lot of you might have worked in Spark for years and would have done a  lot of generic performance tuning. However, you may not have seen all performance tuning guidelines at a single place. This post aggregates my learning at a single place:

Spark tuning guidelines

  • Enable Compression in the Jobs


  • Enable Kryo Serialization-Kryo is a significantly optimized serializer, and performs better than the standard java serializer for just about everything. In your case, you may actually be using Kryo already. You can check your spark configuration parameter:

"spark.serializer" should be "org.apache.spark.serializer.KryoSerializer".

If it’s not, then you can set this internally with:

conf.set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" )

  • Use the power of Tungsten-To make the most out of Tungsten we pay attention to the following:

-Use Dataset structures rather than DataFrames
-Avoid User-Defined Functions (UDFs) as much as possible

  • Handling Skew in the Join – To handle skew in the join keys, you can specify the hint ` /*+ SKEW (‘<table_name>’) */ ` for a join that describes the column and the values upon which skew is expected. Based on that information, the engine automatically ensures that the skewed values are handled appropriately.

You can specify the hint in the following formats:

Format 1:

/*+ SKEW(‘<tableName>’) */

Format 2:

/*+ SKEW (‘<tableName>’, (<COLUMN-HINT>), (<ANOTHER-COLUMN-HINT>)) */

  • Configuring the Spark External Shuffle Service– spark.shuffle.service.enabled=true
  • Continuously Running Spark Streaming Applications – Set yarn.resourcemanager.app.timeout.minutes=-1 as an Hadoop override at the Spark cluster level
  • Consider tuning the batch window duration – This can improve stability and avoid lumping of the data
  • Use Accumulators – While reading and processing Accumulator can be used with dataframes to get counts for intermediate processing as well -(No extra time, just time taken for the respective actions)

For example during read:

val accum = sc.accumulator(0, "test")
sc.textFile("data.tsv").foreach { x =>
accum += 1
val count = accum.value

For example during processing:

val filterCounter = spark.sparkContext.longAccumulator("filter-counter")
df.filter(col("_c0") === lit(3)).map(x => {filterCounter.add(1) x }).count()
print(s"count for filter = ${filterCounter.value}")

  • Efficient resource utilization
    • Leave aside atleast one/two core per Node for OS processes and hadoop daemons
    • App Master would also need (approx. ~1024MB and 1 Executor).
    • Its seen HDFS achieves full write throughput with ~5 tasks per executor , so the recommendation is to keep ~= 5 cores or sometimes less than that
    • Full memory requested to yarn per executor =  spark-executor-memory + spark.yarn.executor.memoryOverhead, where, spark.yarn.executor.memoryOverhead = Max(384MB, 7% of spark.executor-memory)
    • Total cores requested = number of executors * cores per executor


    Total Cluster capacity example –

    **Cluster Config:**
    10 Nodes
    16 cores per Node = 160 total cores
    64GB RAM per Node = 640 GB total RAM

    Num cores available per node = 16-3(OS, Hadoop Daemons etc) ~= 13
    Total available of cores in cluster for Spark= 13 x 10 = 130

    e.g. So, for a job, if we request 5 executors, 20GB per executor with 5 cores

    20GB*5 + memoryOverhead = 100 + 7% of 20GB = ~103GB memory will be blocked for use.
    Also total num cores used will be 5 executors * 5 cores = 25 cores

    And, total number of such jobs that can be run in the cluster exactly in Parallel would be Min(640 GB/ 103 GB, (130 cores /25 cores) =~5 (Leaving 1 executor for ApplicationMaster)

    Also, tune the numbers based on complexity of computations needed and data volume to be processed.

  • Using Spark Listeners for getting write counts – Takes no extra time, just time taken for write
    org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
    var recordsWrittenCount = 0Lsc.addSparkListener(new SparkListener() {
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
    recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten
  • Use Multi-threading for concurrent writes

Spark can allow multiple tasks in parallel across the cluster but is limited by performing operations sequentially. Sample code snippet below shows how can we use multi threading concept and write same DataFrame in parallel to two separate locations.

// Parallel writes using Threading and Lambda Function
Thread write1Thread = new Thread(() -> finalDF.write().mode("overwrite").orc("/loudacre/top_devices"));
Thread write2Thread = new Thread(() -> finalDF.write().mode("overwrite").orc("/loudacre/top_devices2"));
// Start threads
write1.start(); write2.start();
// Waiting for threads to complete
write1.join(); write2.join();

  • Use performance testing framework such as Gatling or Jmeter for Load testing of Livy

Gatling is a highly capable load testing tool. It is designed for ease of use, maintainability and high performance. Gatling server provides the following benefits.

  • Powerful scripting using Scala
  • Akka + Netty
  • Run multiple scenarios in one simulation
  • Scenarios = code + DSL
  • Graphical reports with clear & concise graphs


val theScenarioBuilder = scenario("Interactive Spark Command Scenario Using LIVY Rest Services $sessionId").exec( /* myRequest1 is a name that describes the request. */ http("Interactive Spark Command Simulation") .get("/insrun?sessionId=${sessionId}&statement=sparkSession.sql(  select event.site_id from siteexposure_event as event where st_intersects(st_makeBBOX(${bbox})%2C geom) limit 5  ).show").check() ).pause(4 second)

0 responses on "Advanced Performance tuning in Spark"

    Leave a Message

    Your email address will not be published. Required fields are marked *

    © BeingDatum. All rights reserved.