Things on this page are fragmentary and immature notes/thoughts of the author. Please read with your own judgement!
Symptoms¶
Symptom 1¶
16/04/17 11:17:36 ERROR scheduler.TaskSetManager: Total size of serialized results of 126 tasks (1137.3 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) 16/04/17 11:17:36 ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 114 tasks (1029.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 114 tasks (1029.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
Symptom 2¶
16/04/17 11:23:15 INFO executor.Executor: Finished task 196.0 in stage 6.0 (TID 610). 9455052 bytes result sent to driver 16/04/17 11:23:46 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(157,[Lscala.Tuple2;56be55da,BlockManagerId(157, 45017))] in 1 attempts org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
Cause¶
When the BroadcastHashJoin is executed, for some reason, the driver clones N copies (N is the number of executors) of the broadcasted table size, (in my case: 75MB) and broadcast it to each executor. If the N is like 300, the total memory required for driver is at least: , larger than the executor memory. It will cause driver no-response (OOM).
Solution¶
Do not use
BroadcastHashJoin.For SparkSQL (SQLContext): do not call the function
broadcast(obj).For SparkSQL (HiveContext): set
spark.sql.autoBroadcastJoinThreshold=10MB(or less)For RDD: do not use broadcast variable
If one of the tables for joining contains too large number of partitions (which results in too many jobs), repartition it to reduce the number of partitions before joining.