On the limits of concurrency: Worker Pools in Erlang
A worker pool is a very common pattern, and they exist in the standard libraries for many languages. The idea is simple: submit some sort of closure to a service which commits to running the closure in the future in some thread. Normally the work is shared out among many different threads and in the absence of anything fancier, one assumes a first-come-first-served queue of closures.
Erlang, with its light-weight process model is not a language which you would expect would require such an approach: processes are dirt cheap, and the scheduler maps processes onto threads when they are ready to be run — in many ways, the ErlangVM is a glorified implementation of a worker pool, only one that does pre-emption and other fancy features, in a very similar way to an OS kernel. However, we recently found in RabbitMQ a need for a worker pool.
At various points in RabbitMQ, we use mnesia transactions to ensure that state that is held by mnesia (for example, the existence or otherwise of a queue) is updated safely. A client recently informed us that he was creating 10,000 connections, each connection was creating a queue, with exclusive set (which means the queue must be deleted automatically when the connection disappears), and then dropping all the connections at the same time. RabbitMQ would become very unresponsive, for a very very long time. A small amount of head scratching and some quick testing led to the conclusion that this was creating 10,000 transactions, all on the same table, all of which were continuously colliding with each other and thus having to be restarted. Here, the realisation is that there is frequently no good reason to allow more mnesia transactions to go on at the same time than you have cores available — the probability of a collision will rise exponentially with the number of concurrently in flight transactions.
The solution here is to submit all of these transactions to a worker pool which has the same number of workers as there are CPU cores in the system. As a result, the probability of a collision is greatly reduced, and things progress much much faster. In many ways, this is a good example of where hinting to the underlying VM as to how it should schedule different jobs would result in massively improved performance.
One interesting gotcha with this solution is the issue of nested transactions. Mnesia handles these very well indeed, but if we wrap every transaction in a submission to the worker pool, we stop mnesia from knowing that these transactions are nested (which means we lose the unrolling of the inner ones if the outer one subsequently fails), and furthermore, we risk deadlock if every worker in the pool is waiting on another nested transaction to complete. To solve this problem, on submission of a closure, we detect whether or not we are already in a worker pool process, and if we are, we just run the closure in the current process without submission at all. This means that nested transactions are correctly run in the same process as their parent, and we don’t reach deadlock.
The code is already QA’d and merged into our default branch and is in the three modules worker_pool_sup (which is the module you just need to start up, it’ll then start everything else), worker_pool_worker (the actual workers who do the, erm, work), and worker_pool (which is the module to which you submit your jobs). Note that the submission is synchronous, thus you will be returned the result of your closures. Furthermore, your closures are responsible for catching any errors that might occur and returning them to you as values. Finally, note that you’ll also need our gen_server2 module as the worker_pool and worker_pool_worker modules both use this.