Following are a some thoughts I have jotted down to help write Spark jobs that are faster and less prone to issues like Out of Memory errors
Understand your data
Get a good understanding about your data before you set about writing any set of transformations and actions on it. By understanding your data, I mean the following -
- What is the size of your data?
- Is there any skew in your data? Do a small subset of keys have a large amount of values associated with them? If yes, introducing a level of hierarchy in the values can possibly help to get to a more even distribution of data. For example, if your data contains a user id which is mapped to a set of followers on a social networking site, a celebrity will have a huge number of followers when compared to a random nonentity person. If we hashmod the set of followers into a set of buckets based on hashmod, it will help us avoid a scenario where one node in the cluster gets too much of data where as another node sits twiddling its thumbs.
- Are you dealing with a lot of strings in your data? Would it be possible to instead convert them into integers before doing any sort of processing on them? String operations are slower than Integer operations. You can hash your strings at the start to get Integer values.
Intentionality about Spark configurations
Be explicit about the number of executors, executor cores, executor memory. Have a good understanding about your cluster nodes.
- What is number of cores available on each machine? Set the number of CPU cores associated with each executor by using the
spark.executor.cores
property - What is the amount of RAM available on each machine? What amount of RAM do you want to give each executor? Set this with
spark.executor.memory
. - What is number of partitions of your data? Doing a rough, back-of-the-envelope calculation, will each partition fit into a task?
- Do you want one executor per node? Do you want one executor per core? Do you want to be somewhere in the middle? Having one executor per node, you will have too many concurrent tasks trying to read / write out of your underlying datastore. Can your datastore handle that level of parallelism? Having one executor per core will not help you take advantage of parallelism in a JVM. You usually want to strike a middle path.
Explicitly set executor memory overhead to be greater than 7%
If you are running into OOM (Out of Memory) issues, it is possible that you are causing too many function calls in your code. Thread stacks are
given separate off-heap memory on a node and the default value allocated is 7% of spark.executor.memory
. This may not necessarily be enough.
Try increasing it gradually from 7% to 25%. Just remember that this equation is not violated while you fiddle with this parameters
executor memory + executor memory overhead < Total memory
provided by YARN to each executor
Using partitioners to your advantage to try and reduce the amount of shuffling
Operations that cause shuffling such as reduceByKey
, aggregateByKey
benefit from data that is already partitioned by a partitioner. If Spark knows
where a key resides, it can use that information to its advantage and reduce the inter-node network traffic.
Be careful about where you use Immutable Scala collections
Don’t use immutable collections if you are doing any sort of addition of elements to a collection or concatenating collections. You are better off using mutable collections in such places in your code to reduce the number of Java objects created in your Java heap. This helps to reduce the amount of JVM pauses.
Fiddle around with partition sizes for your data.
Partition number is appropriate when an approximate size of a partition fits into the JVM heap of an executor. Executor JVM heap size in turn is capped by YARN container size limit. Too many partitions just causes a job to be overly slow because of the amount of context switching that your executor tasks have to do. Too few partitions can cause OOM (Out of Memory) in your tasks. Start out with value equal to the number of cores in your cluster and go on increasing 1.5 times earlier size to figure out what fits well in your situation
Coalesce the smaller partition / broadcast the smaller partition before a join / cartesian operation
To reduce the amount of shuffling when performing a shuffle intensive operation such as a join or a cartesian product, if one of your RDDs is small enough to easily fit into a node, it would be a good idea to broadcast it to all nodes.