technology from back to front

Too much mail is bad for you

We received a few reports from users of our Erlang-based RabbitMQ message broker who saw sharp decreases in throughput performance when putting the broker under heavy load. We subsequently reproduced these results in our lab. This is not what we expected to see – while some performance degradation is inevitable when running a system at its limits, we had carefully designed RabbitMQ to make such degradation are small and gradual. So clearly the system was behaving in ways we had not anticipated.

We eventually tracked down the problem. Take this example program, which sets up a chain of three processes: a producer sending N messages to a consumer which for every message it receives performs M request/response interaction with an echo process.

The interaction between the consumer and echo process follows the same pattern as what happens under the covers of Erlang’s gen_server:call: a message is sent to the recipient and the caller then waits for a reply using a selective receive, i.e. a receive matching on a specific pattern unique to the expected response, which distinguishes the response from any other messages the caller may have in their incoming message queue.

The interaction between the producer and consumer comes in two flavours. When WithAck = false the producer sends messages to the consumer completely asynchronously. By contrast, when WithAck = true the producer waits for an acknowledgment to every message before proceeding. The consumer sends that acknowledgment as soon as it is has received the message.

Which version is faster? One would think that the asynchronous version is faster than the synchronous version since the former does not have to carry out the additional work of sending/receiving acknowledgments. Wrong.

Eshell V5.5.5  (abort with ^G)
1> mqueue:timed_run(10000, 10, false).
4892400
2> mqueue:timed_run(10000, 10, true).
85830

This has the producer sending 10000 messages, and the consumer calling the echo process 10 times for each message. The synchronous version outperforms the asynchronous version by a factor of 57!

How does this huge discrepancy arise? The culprit is the consumer’s message queue, or, more precisely, its length. Firstly, the asynchronous version of the above test may result in 10000 messages needing to be enqueued , i.e. in the worst case scenario when the consumer does not start any work until the producer has finished sending all messages. However, surely queuing up 10000 messages shouldn’t take nearly five seconds? Indeed it doesn’t, as a few experiments quickly confirm.

The root cause for the excessive time taken by the asynchronous version is the selective receive performed when waiting for the echo response. A selective receive scans the message queue until it finds a match. The echo response will typically be at or near the end of the queue, since it will only have been sent a short while after the echo request. So if the message queue contains a lot of messages from the producer, every time the consumer is waiting for the echo response it has to scan over all these messages. If there are N messages in the consumer’s message queue and it performs M calls to the echo process for each of them, this results in M * N * (N+1)/2 match operations, i.e. a time complexity that is quadratic in the number of messages.

Could Erlang do something smarter here?

One possibility is to use a richer data structure for the message queue, optimised for pattern matching. That is difficult in the general case since patterns can be of arbitrary complexity and cannot always be determined statically. Still, one could envisage a message queue data structure that dynamically optimises itself for matching against the kinds of patterns thrown at it.

Another option is to allow processes to have several, independently addressable message queues, as in, e.g., the join calculus and various other process algebras. Replies to calls could then be sent to a secondary, usually empty, queue. I wonder what an encoding of the join calculus into Erlang would look like …

Meanwhile the lesson is: if you make synchronous calls inside a process you’d better make sure its message queue is short.

PS: The latest release of RabbitMQ, fixing the problem originally reported, is available for download here.

by
matthias
on
01/10/07
  1. thomas figg
    on 01/10/07 at 7:55 pm

    So, how did you go about solving this problem ?

    Did you use an intermediate process, or did you empty the message box into a seperate list ? or …. ?

    Another bug-like behaviour with erlang is with regards to it’s string handling (iirc as doubly linked lists with one node per character) which tripped up tim bray:

    http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder

  2. So, how did you go about solving this problem ?

    I made the interaction between the producer and consumer synchronous. That is not an appropriate solution in general, but in this particular case it was the easiest and correct thing to do.

  3. Hi,

    If the producer must send messages asynchronously, then a solution is to add a buffering process between it and the consumer. Send async messages and the buffer will make sync calls to the consumer, sending a small number of messages at a time and then waiting for ack.

    Of course, this works only if the messages aren’ t to be selectively received.

    regards,
    Vlad

  4. @thomas: I’m pretty sure Erlang’s lists are singly-linked, like ML, lisp, scheme, Haskell etc., which would make strings singly-linked as well. This is, surprisingly, usually a pretty good way of operating. I’ve written a little more about some of the issues in an earlier article, if you’re interested.

  5. Daniel Kwiecinski
    on 03/10/07 at 2:00 pm

    My idea to improve the erlang’s queue implementation is to make it a tree rather than list. As everything in erlang is comparable the implementation of such balanced tree should be straightforward.

    Cheers,
    Daniel

  6. -module(mqueue).</p>

    <p>-export([timed_run/3]).</p>

    <p>producer(0, ConsumerPid, <em>WithAck) -&gt;
        ConsumerPid ! {done, self()},
        receive
            done -&gt; ok
        end;
    producer(N, ConsumerPid, WithAck = false) -&gt;
        ConsumerPid ! {msg, self()},
        producer(N-1, ConsumerPid, WithAck);
    producer(N, ConsumerPid, WithAck = true) -&gt;
        ConsumerPid ! {acked</em>msg, self()},
        receive
            ack -&gt; ok
        end,
        producer(N-1, ConsumerPid, WithAck).</p>

    <p>consumer(M, EchoPid) -&gt;
        receive
            {msg, <em>From} -&gt;
                call</em>echo(M, EchoPid),
                consumer(M, EchoPid);
            {acked<em>msg, From} -&gt;
                From ! ack,
                call</em>echo(M, EchoPid),
                consumer(M, EchoPid);
            {done, From} -&gt;
                EchoPid ! done,
                From ! done
        end.</p>

    <p>queue<em>keeper</em>empty(ConsumerPid) -&gt;
        receive
            {msg, <em>From} -&gt;
                ConsumerPid ! {acked</em>msg, self()},
                queue<em>keeper</em>waiting(queue:new(), ConsumerPid);
            {done, From} -&gt;
                ConsumerPid ! {done, self()},
                receive
                    done -&gt;
                        From ! done
                end
        end.</p>

    <p>queue<em>keeper</em>waiting(Q, ConsumerPid) -&gt;
        receive
            {msg, <em>From} -&gt;
                queue</em>keeper<em>waiting(queue:in({acked</em>msg, self()}, Q), ConsumerPid);
            ack -&gt; case queue:out(Q) of
                    {{value, Msg}, Q1} -&gt;
                        ConsumerPid ! Msg,
                        queue<em>keeper</em>waiting(Q1, ConsumerPid);
                    {empty, Q} -&gt; queue<em>keeper</em>empty(ConsumerPid)
                end
        end.</p>

    <p>call<em>echo(0, _EchoPid) -&gt;
        ok;
    call</em>echo(M, EchoPid) -&gt;
        EchoPid ! {hello, self()},
        receive
            hello -&gt; call_echo(M-1, EchoPid)
        end.</p>

    <p>echo() -&gt;
        receive
            {Msg, From} -&gt;
                From ! Msg,
                echo();
            done -&gt; ok
        end.</p>

    <p>run(N, M, WithAck) -&gt;
        EchoPid     = spawn<em>link(fun echo/0),
        ConsumerPid = spawn</em>link(
                        fun () -&gt;
                                consumer(M, EchoPid) end),
        case WithAck of
            true -&gt;
                producer(N, ConsumerPid, WithAck);
            false -&gt;
                producer(N, spawn<em>link(
                            fun() -&gt;
                                queue</em>keeper_empty(ConsumerPid) end), WithAck)
        end.</p>

    <p>time(F) -&gt;
        Start = erlang:now(),
        F(),
        timer:now_diff(erlang:now(), Start).</p>

    <p>timed_run(N, M, WithAck) -&gt;
        time(fun() -&gt; run(N, M, WithAck) end).

    With own queue managing it’s only about 1.25 times slower and not blocking producer.

  7. Enable <pre> tag in comments will be good idea, I think.

  8. Non blocking and fast solution with readable source code :-)

 
 


8 − = five

2000-14 LShift Ltd, 1st Floor, Hoxton Point, 6 Rufus Street, London, N1 6PE, UK+44 (0)20 7729 7060   Contact us