03 June 2014

Twitter’s Future library is a beautiful abstraction for dealing with concurrency. However, there are code patterns that seem natural or innocuous but can cause real trouble in production systems. This short article outlines a few of the easiest traps to fall into.

An example

Below is a method from a fictional web application that registers a user by calling the Foursquare API to get the user’s profile info, their friend graph and their recent check-ins.

def registerUser(token: String): Future[User] = {
  val api = FoursquareApi(token)

  def apiFriendsF(apiUser: ApiUser): Future[Seq[ApiUser]] = {
    Future.collect(apiUser.friendIDs.map(api.getUserF))
  }

  def apiCheckinsF(apiUser: ApiUser, categoryies: Seq[ApiCategory]): Future[Seq[ApiCheckin]] = {
    ...
  }

  def createDBUser(
      user: ApiUser,
      friends: Seq[ApiUser],
      checkins: Seq[ApiCheckin]): User = {
    db.insert(...)
  }

  for {
    apiUser <- api.getSelfF()
    apiCategories <- api.getCategoriesF()
    apiFriends <- apiFriendsF(apiUser)
    apiCheckins <- apiCheckinsF(apiUser, apiCategories)
  } yield createDBUser(apiUser, apiFriends, apiCheckins)
}

There are some problems with this code.

Anti-pattern #1: Blocking in a yield or a map

The last part of the for-comprehension desugars to

apiCheckinsF(apiUser, apiCategories).map(apiCheckins => 
  createDBUser(apiUser, apiFriends, apiCheckins))

The problem here is that createDBUser makes a blocking call to the database. You should never do blocking work in map on a Future. Every Future runs in a thread pool that is (hopefully) tuned for a particular purpose. Code inside the map (generally) runs on the thread that completes the Future. So you’re putting work in a thread pool that wasn’t designed to handle that work.

Furthermore, when you’re dealing with Futures composed from other Futures, it’s often hard to tell by inspection which Future will be the last to complete (and whose thread pool will run the map code). It’s frequently not the “outermost” Future. For example:

val outermostFuture: Future[Int] = {
  val runsInThreadPoolA: Future[Int] = ...
  val runsInThreadPoolB: Future[Int] = ...
  for {
    a <- runsInThreadPoolA
    b <- runsInThreadPoolB
  } yield a+b
}

outermostFuture.map(i => /* where do I run?? */)

It’s also possible that the Future completes before you call map — in which case the work inside the map happens in the main thread. This is bad if your callers expect you to to return instantly with a Future.

def thisActuallyBlocks(a: Int): Future[Int] = {
  val anIntF: Future[Int] = Future.value(a)
  // ... some more stuff ...
  anIntF.map(i => somethingThatBlocks(i))
}

It’s also possible to cause a deadlock (and yes we’ve seen this in production) if the code inside the map calls Await on another thread in the same thread pool — but again, it’s hard to know which thread pool that is.

So instead, set up your own thread pool for blocking work:

object future {
  private val pool = FuturePool(Executors.newFixedThreadPool(10))
  def apply[A](a: => A): Future[A] = pool(a)
}

And use it like this:

def createDBUserF(
    user: ApiUser,
    friends: Seq[ApiUser],
    checkins: Seq[ApiCheckin]): Future[User] = future {
  db.insert(...)
}

for {
  ...
  user <- createDBUserF(apiUser, apiFriends, apiCheckins)
} yield user

This now desugars to

createDBUserF(apiUser, friends, checkins).map(user => user)

which is safe.

So ALWAYS yield a plain value or a simple computation. If you have blocking work, wrap it in a ThreadPool-backed Future and flatMap it.

It’s worth noting that in the Scala-native Future library (scala.concurrent.Future), you must supply an implicit or explicit execution context when you create a Future. That way, you do have control over where your code executes, so the above warnings about map do not apply.

Anti-pattern #2: Too much parallelism

The method apiFriendsF creates a future for each item in a list of user IDs and collects the results into a single Future:

def apiFriendsF(apiUser: ApiUser): Future[Seq[ApiUser]] = {
  Future.collect(apiUser.friendIDs.map(api.getUserF))
}

But this is too much parallelism! You’ll flood the thread pool with a ton of simultaneous work. Some network or database drivers don’t even allow more than a certain number of concurrent connections, and you’ll get a bunch of exceptions, and you will not have a good day. A better way to do it is to limit how much you are doing in parallel.

def apiFriendsF(apiUser: ApiUser): Future[Seq[ApiUser]] = {
  future.groupedCollect(apiUser.friendIDs, 5)(api.getUserF)
}

The groupedCollect helper method can be impemented as follows:

object future {
  def groupedCollect[A, B](xs: Seq[A], par: Int)(f: A => Future[B]): Future[Seq[B]] = {
    val bsF: Future[Seq[B]] = Future.value(Seq.empty[B])
    xs.grouped(par).foldLeft(bsF){ case (bsF, group) => {
      for {
        bs <- bsF
        xs <- Future.collect(group.map(f))
      } yield bs ++ xs
    }}
  }
}

The par parameter lets you specify how much work you want done in parallel. For example, if you specify 5, it will take 5 items from the list, do them all in parallel, and wait for them to complete before moving on to the next 5 items.

This can be mitigated a different way, by configuring a thread pool with a maximum number of threads, and making sure that all database or network calls go through this pool. This has the advantage of limiting parallelism application-wide, rather than just at a given call site. It still might be a good idea to limit parallelism at an individual call site to prevent it from crowding out other work.

Anti-pattern #3: Not enough parallelism

This code invokes api.getSelfF() and api.getCategoriesF() sequentially when they could be run in parallel:

for {
  apiUser <- api.getSelfF()
  apiCategories <- api.getCategoriesF()
  ...
} yield ...

It desugars to

api.getSelfF().flatMap(apiUser =>
  api.getCategoriesF().flatMap(apiCategories =>
    ...))

So one waits for the other even though it doesn’t need to. The fix is to invoke the methods outside of the for-comprehension.

val apiUserF: Future[ApiUser] = api.getSelfF()
val apiCategoriesF: Future[Seq[ApiCategory]] = api.getCategoriesF()

...

for {
  apiUser <- apiUserF
  apiCategories <- apiCategoriesF
  ...
} yield ...

Likewise, we have:

for {
  ...
  apiFriends <- apiFriendsF(apiUser)
  apiCheckins <- apiCheckinsF(apiUser, apiCategories)
  ...
} yield ...

These two can also be done in parallel. Write it this way instead:

for {
  ...
  (apiFriends, apiCheckins) <- Future.join(
    apiFriendsF(apiUser)
    apiCheckinsF(apiUser, apiCategories))
  ...
} yield ...

The join method runs multiple Futures in parallel and collects their results in a tuple. It also explicitly documents that the two calls will happen in parallel.

Conclusion

Here’s what we ended up with:

def registerUser(token: String): Future[User] = {
  val api = FoursquareApi(token)

  val apiUserF: Future[ApiUser] = api.getSelfF()

  val apiCategoriesF: Future[Seq[ApiCategory]] = api.getCategoriesF()

  def apiFriendsF(apiUser: ApiUser): Future[Seq[ApiUser]] = {
    future.groupedCollect(apiUser.friendIDs, 5)(api.getUserF)
  }

  def apiCheckinsF(apiUser: ApiUser, categoryies: Seq[ApiCategory]): Future[Seq[ApiCheckin]] = {
    ...
  }

  def createDBUserF(
      user: ApiUser,
      friends: Seq[ApiUser],
      checkins: Seq[ApiCheckin]): Future[User] = future {
    db.insert(...)
  }

  for {
    apiUser <- apiUserF
    apiCategories <- apiCategoriesF
    (apiFriends, apiCheckins) <- Future.join(
      apiFriendsF(apiUser)
      apiCheckinsF(apiUser, apiCategories))
    user <- createDBUserF(apiUser, apiFriends, apiCheckins)
  } yield user
}

Things to note here:

  1. Nested methods take plain values and return Futures, for great flatMaping.
  2. All the work is set up ahead of time via vals and nested methods.
  3. Everything is “glued” together with a for-comprehension at the end.
  4. Parallelism and dependencies are made explicit in the for-comprehension.
  5. Blocking work is explicitly wrapped in a thread pool.


blog comments powered by Disqus