Spark jobs were failing. All of them. The data pipeline had stopped. This is a tale of high-pressure debugging.

We use Livy to submit Spark jobs. Well, we use Azure Data Factory (ADF) to schedule the data pipeline, and that uses Livy to submit Spark jobs.

Livy was returning failure on all new Spark jobs, because it apparently had run out of memory.

Why did it run out of memory? We were only submitting transient jobs: they certainly exited, they didn’t run forever!

Here today, and for the next 24 days

Livy has a model where it doesn’t delete information about Spark jobs right after they finish. Instead, the “session” is kept, until (A) a timer expires, or (B) the user explicitly deletes the session. This is understandably because REST (and HTTP) is stateless, so the user may come back after some time to request the status for a completed Spark job.

In our case, the problem was because of a nasty combination of:

  • High default timeout value (24 days: Azure team said they had optimized for long-running, streaming Spark jobs)
  • Missing explicit DELETE call from ADF (Azure team said they were working on it)

If either one of the above was not there, sessions would have been cleaned up and we would not see the memory build-up.

Feature, not a bug

Usually, a restart of the software can help in such situations, but Livy has a “feature” that prevented this from happening. It can save session information on Zookeeper, an external safekeeper of data, and read it back on restart. So when we restarted Livy, it would go right back into uselessness. I saw almost 40000 sessions saved on Zookeeper: for jobs that were long gone.

So, we have 2 design features in Livy that can cause this problem:

  • Explicit session deletion, or via timer-based garbage collection
  • Session persistence and recovery via Zookeeper

The first is needed because Livy is based on REST, and the second is needed because Spark jobs can run for a long time and Livy may crash in the mean time. Certainly, I won’t blame Livy for either of those.

Going to the source

My explanation above belies the fact that it took many days, email and phone calls to figure it all out. Google was not very helpful, and Azure team were not Livy experts: it was a third party software. but because Livy was open-source, I could look at its source code to hunt for the problem.

My main hunch was that there is a timeout that’s causing the problem, so I started to dig around in the codebase. Here’s the relevant code from Livy that I used to convince Azure team that we really were dealing with a Livy problem, not ours.

The code that deletes sessions based on expiry is an independent “garbage-collector” thread that wakes up once every minute, and looks for sessions that are past the configured session timeout.

def collectGarbage(): Future[Iterable[Unit]] = {
    def expired(session: Session): Boolean = {
      val currentTime = System.nanoTime()
      currentTime - session.lastActivity >
       math.max(sessionTimeout, session.timeout)
    }
    Future.sequence(all().filter(expired).map(delete))
}
// ...
private class GarbageCollector extends Thread("session gc thread") {
    setDaemon(true)
    override def run(): Unit = {
      while (true) {
        collectGarbage()
        Thread.sleep(60 * 1000)
      }
    }
}

(SessionManager.scala)

Situation normal

The actual fix was simple, once we were at the bottom of this problem. We cannot change how ADF submits jobs to Livy, but we could reduce the session timeout to a sane value. We did that, restarted Livy, and it’s looking good now.

All of this goes to say: software is complicated. And sometimes, we take the hit for no fault of ours. :-)