Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23365

DynamicAllocation with failure in straggler task can lead to a hung spark job

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.1.2, 2.2.1, 2.3.0
    • 2.3.1, 2.4.0
    • Scheduler, Spark Core
    • None

    Description

      Dynamic Allocation can lead to a spark app getting stuck with 0 executors requested when the executors in the last tasks of a taskset fail (eg. with an OOM).

      This happens when ExecutorAllocationManager s internal target number of executors gets out of sync with CoarseGrainedSchedulerBackend s target number. EAM updates the CGSB in two ways: (1) it tracks how many tasks are active or pending in submitted stages, and computes how many executors would be needed for them. And as tasks finish, it will actively decrease that count, informing the CGSB along the way. (2) When it decides executors are inactive for long enough, then it requests that CGSB kill the executors – this also tells the CGSB to update its target number of executors: https://github.com/apache/spark/blob/4df84c3f818aa536515729b442601e08c253ed35/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L622

      So when there is just one task left, you could have the following sequence of events:
      (1) the EAM sets the desired number of executors to 1, and updates the CGSB too
      (2) while that final task is still running, the other executors cross the idle timeout, and the EAM requests the CGSB kill them
      (3) now the EAM has a target of 1 executor, and the CGSB has a target of 0 executors

      If the final task completed normally now, everything would be OK; the next taskset would get submitted, the EAM would increase the target number of executors and it would update the CGSB.

      But if the executor for that final task failed (eg. an OOM), then the EAM thinks it doesn't need to update anything, because its target is already 1, which is all it needs for that final task; and the CGSB doesn't update anything either since its target is 0.

      I think you can determine if this is the cause of a stuck app by looking for

      yarn.YarnAllocator: Driver requested a total number of 0 executor(s).
      

      in the logs of the ApplicationMaster (at least on yarn).

      You can reproduce this with this test app, run with --conf "spark.dynamicAllocation.minExecutors=1" --conf "spark.dynamicAllocation.maxExecutors=5" --conf "spark.dynamicAllocation.executorIdleTimeout=5s"

      import org.apache.spark.SparkEnv
      
      sc.setLogLevel("INFO")
      
      sc.parallelize(1 to 10000, 1000).count()
      
      val execs = sc.parallelize(1 to 1000, 1000).map { _ => SparkEnv.get.executorId}.collect().toSet
      val badExec = execs.head
      println("will kill exec " + badExec)
      
      sc.parallelize(1 to 5, 5).mapPartitions { itr =>
        val exec = SparkEnv.get.executorId
        if (exec == badExec) {
          Thread.sleep(20000) // long enough that all the other tasks finish, and the executors cross the idle timeout
          // now cause the executor to oom
          var buffers = Seq[Array[Byte]]()
          while(true) {
            buffers :+= new Array[Byte](1e8.toInt)
          }
      
      
          itr
        } else {
          itr
        }
      }.collect()
      

      EDIT: I adjusted the repro to cause an OOM on the bad executor, since sc.killExecutor doesn't play nice with dynamic allocation in other ways.

      Attachments

        Issue Links

          Activity

            People

              irashid Imran Rashid
              irashid Imran Rashid
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: