Notes on Spark Streaming app development
This post contains various notes from the second half of this year. It was a lot of learning trying to get a streaming model working and ready in production. We used Spark Structured Streaming, and wrote the code in Scala. Our model was stateful. Our source and sink were both Kafka.
Match Spark parallelism with Kafka
If your Kafka topic has N partitions, use N tasks in Spark. Spark Structured Streaming uses Kafka streaming API with 1:1 correspondence between Kafka partitions and Spark tasks. Set these in your app, while building the Spark session, to match the number of Kafka partitions:
spark.sql.shuffle.partitions
spark.default.parallelism
Know your parallelism
Spark allows you to run multiple queries within your application. Each query runs independently. If you have 2 queries that can run concurrently, you will 2N cores. If you have M queries, you’ll need M x N cores.
In fact, it’s more than that. If you write to Kafka, there is a KafkaProducer thread shared among all cores within an executor. So you’ll need P cores extra, if P is the number of your executors. Spark uses a different Kafka consumer group for each query.
Suppose we had an application that ran 5 queries, and our source Kafka topic had 8 partitions. Say we decided to have 4 executors. So we would need: (8 * 5) + 4 cores in all.
If there is not enough parallelism, the streaming job will sometimes
show excessive delays. In our case, we saw a delay of 40 seconds, or
multiples thereof. This corresponds to Kafka read.timeout.ms
.
What happened was: the Spark task made a request to fetch data from Kafka, but after that, never got around to run. (There were many other threads contending to run, because we were severely under-provisioned.) Eventually, the read times out, the connection is reset, and another request is made. If it comes back soon enough, you’re lucky, else you’ll wait another 40 seconds, and so on.
You can run only one action, and that’s the streaming action
Yes, most developers are aware of this, but practically this means you can’t do things like count(), or even take(). How do you get visibility into what’s going on? You’ll have to use map(), with logs interspersed at key places. Note that these logs will run on executors, not on the driver.
Use accumulators for counting
If you do need to count, you can use Spark accumulators for this purpose. You’d need a map() call where you increment the accumulator. Once done, you can check its value to get the count.
Testing can be a pain, even if it uses a SparkContext
Streaming jobs have a number of limitations that a regular Spark job does not. Although using a SparkContext is better than nothing, there is no substitute to actually running a Spark streaming job and verifying that it works.
Case in point: you might have 2 actions in your code, and it’ll happily pass the tests that use SparkContext, but fail when run as a streaming job.
Jobs can run out of memory, or be very slow
Be sure to add a SparkListener to monitor job execution times. Also enable verbose GC logs, and heap dumps upon out of memory. Java allows you to do these things.
Be especially careful about how much state you’re using. If you’re appending to a list, check if you also have a timeout, or otherwise limit the size of the list.
Run Spark at TRACE
log-level, if something is amiss. This will give
more information to narrow down problems. I found this especially
useful when debugging KafkaConsumer problems.
State processing should be the last aggregation
This applies to update-style aggregations (e.g. user session updates, keyed by user). State processing should be the last aggregation. So if you want to take a group-by count at output time, you’re out of luck.
State processing function takes, and returns, an iterator of events
If you have a consume-only query, you can return an empty iterator. If you’re using duplicate() to process the iterator twice, heed to the Scala doc: you cannot reuse the original iterator after calling it.
State cannot be shared across queries
The state that Spark provides by default is backed by HDFS. The path is keyed by the query ID. This means you have a problem whenever one of your queries wants to access state from another query, even if it’s in the same application. In our case, one was a producer query, another was a consumer query. No, didn’t work!
The solution to this quandary is to use a single query, perhaps by
doing a df.union()
if you read from two different sources. Then,
you should use a single state-processing function, which filters the
rows and processes each topic row separately.
It’s messy and introduces tight coupling in what should be really independent modules, but that’s what works for now.
“Task not serializable”
At some point, you will see the notorious TaskNotSerializable exception. When that happens, read the exception very carefully: it contains additional information on what could not be serialized.
It’s frustrating if you make a massive change and then see this exception. Always break up your changes and run tests often, so that you can catch this exception sooner and faster.
Use rate limiting
Always limit how much data you can read, just in case someone decides
to blast away at your source. For Kafka, this is easy to set via
maxOffsetsPerTrigger
in the reader.
OK, that was a long list! But each of them represents something learned the hard way. Let’s hope this will help someone starting off with Spark Structured Streaming.