Using HDFS to store Spark Streaming application checkpoints

Mikael Paladiants
Campaign Monitor Engineering
3 min readApr 30, 2019

--

When you want to run a Spark Streaming application in an AWS EMR cluster, the easiest way to go about storing your checkpoint is to use EMRFS. It uses S3 as a data store, and (optionally) DynamoDB as the means to provide consistent reads. This approach allows you to freely destroy and re-create EMR clusters without losing your checkpoints. Also, enabling consistent reads is simply a matter of checking the corresponding checkbox:

Fig. 1. Enabling EMRFS consistent view

The problem

But this convenience comes at a price, literally. Here in the Insights team at Campaign Monitor, we found that the cost of using EMRFS to store the checkpoints of our Spark jobs constituted about 60% of the overall EMR costs. The cost distribution was: S3–80%, DynamoDB — 20%.

Fig. 2. An EMR cluster with EMRFS (S3 + DynamoDB) as a checkpoint store

One of the reasons of cost increase is the complexity of streaming jobs which, amongst other things, is related to:

  • the number of Kafka topics/partitions read from
  • watermark length
  • trigger settings
  • aggregation logic

More complex jobs have larger checkpoints. Some our checkpoints contain more than 25,000 files of total size of around 1Gb, and get updated every 90 seconds. This generated a lot of S3/DynamoDB requests.

It hadn’t been too long until we decided to cut that cost down.

Our solution

Here is the diagram of the solution we have gone with:

Fig. 3. HDFS replaces EMRFS

Let’s walk through it quickly. HDFS is a part of your EMR cluster and is available to it out of the box. That is where we store checkpoints now. Not only does this not incur any extra cost, but it also gets you more performant reads and writes since you don’t have to go through two separate AWS services (S3 and DynamoDB).

The downside, of course, is that once you destroy your cluster, HDFS and checkpoint data are also gone. So we need a way around that. This is where the Cron+S3+TeamCity trio comes into play. There is now a Cron job running on the master node of every EMR cluster we create. Its responsibility is to back up checkpoints to S3 (currently, once every 3 hours). First, we copy checkpoints from HDFS to the local disk by using this command:

hdfs dfs -copyToLocal hdfs:///[checkpoint-dir] /tmp/[checkpoint-dir]

Then, from local it goes to a time-stamped folder in S3, courtesy of the following command:

aws s3 cp --recursive /tmp/[checkpoint-dir] s3://[destination-path]

The process takes around 4 minutes for our busiest Spark job. Note that we don’t want to copy from HDFS directly to S3 as this would use the cluster’s computing resources.

TeamCity is useful when we want to re-provision a cluster. As before, running a build creates a new or replaces the existing cluster. But now, additionally, it looks up the latest checkpoint backup by timestamp and copies it to the newly provisioned cluster using s3-dist-cp.

This gives us a fully automated way of re-provisioning EMR clusters, storing checkpoints and their backups, all at a fraction of the original cost and with increased performance — a good reason to say “win-win”.

Conclusion

Using EMRFS as a checkpoint store makes it easier to get started with AWS EMR, but the cost of using it can get high for data-intensive Spark Streaming applications. Opting for HDFS with a little bit of extra work will rid you of most of that cost.

--

--