technology from back to front

RabbitMQ: RabbitMQ-shovel: Message Relocation Equipment

In several applications, it’s very useful to be able to take messages out of one RabbitMQ broker, and insert them into another. Many people on our mailing list have being asking for such a shovel, and we’ve recently been able to devote some time to writing one. This takes the form of a plugin for Rabbit, and whilst it hasn’t been through QA just yet, we’re announcing it so people who would like to play and even suggest further features for inclusion can do so sooner rather than later.

The shovel is written on top of the Erlang client. It supports both direct and network connections to nodes, SSL support, the ability to declare resources on nodes it connects to, basic round-robinrabbit balancing of both source and destination nodes, and allows you to configure many parameters controlling how messages are consumed from the source, and how they’re published to the destination. Multiple shovels can be specified, their statuses queried, and shovels can repeatedly reconnect to nodes in the event of failure.

The plugin is available from http://hg.rabbitmq.com/rabbitmq-shovel/, and is released under the MPL v1.1. There is a README included which contains full documentation. This is replicated below. (more…)

by
matthew
on
01/02/10

RabbitMQ: Plugin exchange types for RabbitMQ

An obvious extension point for an AMQP broker is the addition of new types of exchange. An exchange type essentially represents an algorithm for dispatching messages to queues, usually based on the message’s routing key, given how the queues are bound to the exchange — it’s a message routing algorithm.

At a minimum, supporting new exchange types requires only some scaffolding to plug in to (an exchange type registry) and a hook for routing messages. However, this wouldn’t support some more interesting use cases, and in particular it didn’t support our motivating use case. Exchange types that want to keep their own state need to be initialised, and be notified about other lifecycle events.

The branch bug22169 of RabbitMQ supports plugin exchange types, by providing a behaviour for exchange type modules to implement, and an exchange type registry to map a module to a type (i.e., what the client supplies in the type field of exchange.declare).

The behaviour requires exported hooks for validating exchange declarations, creating exchanges, recovering durable exchanges, publishing to an exchange (this is where the routing comes in), maintaining bindings, and deleting an exchange. RabbitMQ continues to maintain the database of exchanges and bindings, and calls the hooks after it’s done its own bookkeeping.

For simplicity, the hooks are not called atomically with the bookkeeping; so, it is possible for instance to publish to a new exchange for which the hook has not completed. However, provided there are no asynchronous operations in the hook implementation, the hook will have completed by the time the OK message is sent to the client. This is no more racey than AMQP itself, with consistency at the channel level where operations follow a single thread of control.

Here’s an example that simply io:formats things as they happen:

-module(rabbit_exchange_type_debug).
-include("rabbit.hrl").

-behaviour(rabbit_exchange_type).

-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2, add_binding/2, delete_binding/2]).
-export([register_debug_types/0]).
-include(”rabbit_exchange_type_spec.hrl”).

-rabbit_boot_step({debug_exchange_types, 
                   [{description, "debugging exchange types"},
                    {mfa, {?MODULE, register_debug_types, []}},
                    {requires, rabbit_exchange_type_registry},
                    {enables, exchange_recovery}]}).

description() ->
    [{name, <<"debug">>},
     {description, <<"Debugging exchange">>}].

backing_module(#exchange{ type = Type }) ->
    %% Presume that Type is EITHER one of the standard types –
    %% i.e,. that we have been registered this module as direct,
    %% topic, fanout or match — or, for testing purposes, it’s
    %% registered (as in the boot steps above) as debug_direct, etc.
    Type1 = case atom_to_list(Type) of
                “x-debug-” ++ T -> T; 
                              T -> T
            end,
    list_to_existing_atom(”rabbit_exchange_type_” ++ Type1).

publish(Exchange, Delivery) ->
    io:format(”Publish ~p to ~p~n”, [Delivery, Exchange]),
    Module = backing_module(Exchange),
    Module:publish(Exchange, Delivery).

validate(X) ->
    io:format(”Validate ~p~n”, [X]),
    (backing_module(X)):validate(X).

create(X) ->
    io:format(”Create ~p~n”, [X]),
    (backing_module(X)):create(X).

recover(X, Bs) ->
    io:format(”Recover ~p with bindings ~p~n”, [X, Bs]),
    (backing_module(X)):recover(X, Bs).

delete(X, Bs) ->
    io:format(”Delete ~p with bindings ~p~n”, [X, Bs]),
    (backing_module(X)):delete(X, Bs).

add_binding(X, B) ->
    io:format(”Add binding ~p to ~p~n”, [B, X]),
    (backing_module(X)):add_binding(X, B).

delete_binding(X, B) ->
    io:format(”Delete binding ~p from ~p~n”, [B, X]),
    (backing_module(X)):delete_binding(X, B).

register_debug_types() ->
    lists:foreach(
      fun (T) ->
              rabbit_exchange_type_registry:register(T, ?MODULE)
      end,
      [<<"x-debug-direct">>,
       <<"x-debug-topic">>,
       <<"x-debug-fanout">>,
       <<"x-debug-headers">>]).

Bit by important bit:

-behaviour(rabbit_exchange_type).
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2, add_binding/2, delete_binding/2]).

rabbit_exchange_type specifies these exported callbacks.

-include("rabbit_exchange_type_spec.hrl").

This include has the specs for each of the exported functions, if you’re using specs.

-rabbit_boot_step({debug_exchange_types, 
                   [{description, "debugging exchange types"},
                    {mfa, {?MODULE, register_debug_types, []}},
                    {requires, rabbit_exchange_type_registry},
                    {enables, exchange_recovery}]}).

This uses the new boot sequence mechanism to register the exchange type during boot. The “enables” and “requires” say that the function given as mfa above must be run after the exchange type registry is available, but before any exchanges are recovered.

publish(Exchange, Delivery) ->
    io:format("Publish ~p to ~p~n", [Delivery, Exchange]),
    Module = backing_module(Exchange),
    Module:publish(Exchange, Delivery).

This exchange type simply delegates to a “backing” exchange type.

register_debug_types() ->
    lists:foreach(
      fun (T) ->
              rabbit_exchange_type:register(T, ?MODULE)
      end,
      [<<"x-debug-direct">>,
       <<"x-debug-topic">>,
       <<"x-debug-fanout">>,
       <<"x-debug-headers">>]).

rabbit_exchange_type_registry maintains a registry of type to module; because of this indirection, we can register this module as many different types, then check the declared type of the exchange in our hook to see which type we’re expected to be. Note that the AMQP specification requires the “x-” prefix for non-standard exchange types.

This should reach default branch soon after RabbitMQ 1.7.1 is released. Until then, if you want to play, you’ll have to

rabbitmq-server$ hg update -C bug22169

You can drop modules straight into src, but they are better packaged as plugins — follow the drill at the plugin development guide (and note that your plugin may only need to be a library application).

[EDITS: Updated sample code to keep up with name changes, and correct use of list_to_atom to list_to_existing_atom]

by
mikeb
on
22/01/10

RabbitMQ: Merry Christmas: Toke — Tokyo Cabinet driver for Erlang

Tokyo Cabinet is a rather excellent key-value store, with the ability to write to disk in a sane way (i.e. not just repeatedly dumping the same data over and over again), operate in bounded memory, and go really fast. I like it a lot, and there’s a likelihood that there’ll be a RabbitMQ plugin fairly soon that’ll use Tokyo Cabinet to improve the new persister yet further. Toke is an Erlang linked-in driver that allows you to use Tokyo Cabinet from Erlang. (more…)

by
matthew
on
21/12/09

RabbitMQ: RabbitMQ at the Skills Matter Functional Programming Exchange

Today I was lucky enough to give a talk at the Skills Matter Functional Programming Exchange. I talked about resource management in RabbitMQ and how we’re improving this in upcoming versions of RabbitMQ. All the sessions were videotaped and it would seem that a podcast will be going up shortly. In the mean time you can have a look at the slides if you want to.

The attendance was really good and the talks well received. There was a good range of talks, from some very practical and pragmatic such as my own, to slightly more theoretical talks. It was great to see Haskell, Erlang and F# being discussed outside of a purely academic setting and great to see so many companies and organisations getting really interested in functional programming and coming along to see how other people were making the most of it.

The Park Bench session was also good fun, with a good range of questions and experience being demonstrated by all. A good, fun atmosphere, and I’m sure all enjoyed the day.

by
matthew
on
07/12/09

RabbitMQ: Garbage Collection in Erlang

The new persister that is being developed for RabbitMQ is nearing completion and is currently working its way through code review and QA. It’s being pretty thoroughly tested and generally stressed to see what could go wrong. One of the issues that we’ve come across in the past has to do with Erlang’s garbage collector: indeed there’s code in at least one area of RabbitMQ written in a specific (and non-obvious) way in order to work around issues with Erlang’s garbage collection of binary data.

We had noticed in the release notes for Erlang R13B03, that it mentions improvements to the garbage collector, and today when testing with both R13B02 and R13B03, we noticed substantial improvements with R13B03. The new persister is able to send out to disk partial queues. Thus a queue can have a mix of messages - some just in RAM, some just on disk, and some somewhere in between. This is separate from whether or not a message is marked persistent. The proportion pushed out to disk varies smoothly with the amount of RAM left available to Erlang: the idea is to avoid flooding the disk with enormous amounts of write requests which would potentially stall the queue, and cause blockages elsewhere in RabbitMQ.

The test I’d written used the Erlang experimental client. It had one channel, it created a queue, consumed from the queue, set QoS prefetch count to 10, and then went into a loop. In this loop, it would publish two 1KB messages, then receive 1 message, and acknowledge it. This way the queue would always grow, and memory would be fairly fragmented (the gap from the head of the queue to the tail of the queue would increase steadily as the head is moving forwards at twice the rate of the tail). With no memory limit, I saw the following (I manually killed this after the queue grew to just over 350,000 messages long (which means 700,000 publishes, and 350,000 acknowledgements)):

Memory usage with no memory limit set

Note that for R13B03, the garbage collector is much more active, and in general memory usage is certainly more fine-grained. In this test, all the messages were always in RAM, no messages were pushed out to disk. Flat-size refers to the value returned by pushing the queue state through erts-debug:flat-size/1 which returns the amount of memory used by the data structure.

Next, I imposed a limit of about 200MB and did the same test. With R13B02, it got stuck after just over 260,000 messages: it was no longer able to reclaim any further space, and so flow-control kicked in and stopped the publisher, game over. With R13B03 it soldiered merrily on - I ended up manually killing it somewhere past the 1million message mark as I was getting bored. It’s also very clear to see how with R13B03, it successfully kicks down to pushing all the messages out to disk (which is why the size of the state suddenly gets very small - the memory growth from there on is due to an ets table). That’s certainly still possible with R13B02, and I have seen that happen, but there’s much greater risk, as seen here, of it getting stuck before that happens.

Memory usage with 200MB limit

In short, the garbage collector in R13B03 seems a solid improvement. Even if you’re not using the experimental new persister, I suspect you’ll gain from upgrading to R13B03. And yes, that really is 1-million 1KB messages successfully sent into a queue using under 200MB of RAM.

by
matthew
on
01/12/09

RabbitMQ: HTTP Routing with RabbitMQ and Trapeze

After building Hookout a little while back, I’ve been considering other things you could do to funnel clients through a server without them necessarily being reachable, or having an entire address space of their own. Hookout was working within the constraints of the reverse http protocol, where clients could speak only http. I wanted to explore a space where this wasn’t necessarily a requirement. At the same time, Rabbit gained a plugin mechanism.

The combination of these is Trapeze. Trapeze provides HTTP request routing via AMQP. Requests are received by a RabbitMQ plugin, which then forwards these requests via AMQP to listening applications. Applications then respond via a private reply queue, and this is then forwarded back onto the web caller. Whilst this idea isn’t entirely novel (after writing Trapeze, I found http://somic.org/2008/12/18/introducing-rabbitbal/), it is an interesting area to explore, and provides a very different use case for Rabbit versus the “I want 100% guarantees of message delivery at the expense of speed” case.

Trapeze is available at http://github.com/paulj/trapeze, along with a Ruby application runner at http://github.com/paulj/trapeze-rb. Installation instructions are provided at http://github.com/paulj/trapeze/blob/master/README.rdoc.

Once installed, one can start playing with some of the interesting features of the routing algorithm. Some of the more interesting points are:

  • When applications are started, a routing key is provided that the client uses to subscribe to the “trapeze” exchange. When incoming requests arrive, a routing key is generated in the form: <verb>.<host parts>.<port>./.<path parts>. An AMQP topic exchange is used, allowing RabbitMQ to handle all of the logic involved in selecting the appropriate listener based on their declared subscription. The use of topic key syntax provides the ability for arbitrarily complex routing keys to be used, starting with simple subscriptions:

    • “*.localhost.55672./.#” to receive everything sent to http://localhost:55672
    • “*.#.example.com.*./.#” to receive everything sent to example.com and its subdomains, on any port

    Through to more complex subscriptions, such as:

    • “post.#./.listener” to receive only post requests to a /listener path, but on any server/port combination
  • Trapeze utilises AMQP Mandatory routing to ensure that if no application has registered a listener that matches a given incoming request, a basic.return will be received, and result in a 404 being returned to the client. This single flag prevents the need for Trapeze itself to have any understanding of the connected clients - it can rely entirely on the broker to inform it whether or not a connector is attached that can handle a given request.
  • To create the binding from the trapeze exchange, a named auto-delete queue is created. If multiple applications are started using the same queue name, queue load-balancing takes effect, allowing for requests to be balanced between a number of consumers. The use of QOS even opens the possibility of balancing to consumers able to process requests at different rates.

Trapeze has already spurred some interesting reflections about changes that could be made to Rabbit to do interesting things in these kinds of cases. I’m quite interested to hear if anyone else has any suggestions that could make Trapeze more useful or interesting. Also, if you’d like to write an adapter for a different language (for example, Python WSGI or even a Tomcat connector), then do get in touch. And please do check out the source on Github.

by
paulj
on
26/10/09

RabbitMQ: Grouping and collapsing in WireIt

I have recently been modifying the WireIt code to allow collapsing of multiple containers down into 1 composite container. A quick summary of WireIt (from their site)

WireIt is an open-source javascript library to create web wirable interfaces for dataflow applications, visual programming languages, graphical modeling, or graph editors.

I got started on this when I was following on from Jonathan Lister’s work on using WireIt to create a non-technical user interface for Rabbit Streams. It was soon apparent that if you wanted to use this in practice you would end up with huge graphs with many containers and many wires going all over the place. Once above about 10 containers or so with a couple of wires per container it gets hard to see what is happening. To solve this problem some kind of abstraction is needed, in this case collapsing multiple containers down into 1 composite container.

After only a few containers things get hairyShrink down the 3 groups of containers for a cleaner graph

A collapsed group of containers essentially boils down to a single container representing a sub graph. So I started by looking at the JSON object produced when you save a complete WireIt graph and also how that JSON is then expanded into the complete graph on loading. Once identified it was relatively easy to implement those function for only a sub set of the whole graph.

The next challenge was maintaining the wires across a collapse/expand. The first step was to show the terminals from grouped containers on the composite container. Naming conflicts had to be handled (in the case of two duplicate internal terminals from different containers) which led to the creation of a map from external Terminals to internal Containers and their terminals. With this map I could iterate over the wires attached to terminals in the group and create copies of those wires going to the external terminals. Then on expansion an wires connected to the composite container were again mapped back to their counter parts. The same was done for fields.

In combination with the ability to manually select which fields and terminals are visible and manually set their names you can easily represent encapsulation.

I have made a fork of the WireIt GitHub repository which can be found here. I also have a small readme and example of the editor here.

by
james
on
10/09/09

RabbitMQ: Rabbit on the treadmill: Run Rabbit, Run!

For the last couple of months I’ve been working on rewriting RabbitMQ’s persister so that it will scale to volumes of data that won’t fit in RAM, and will perform consistently across a wide variety of use cases. This work is coming to a conclusion now, and although the code is not yet released, nor has it even been through QA, benchmarking it thoroughly is useful to allow us to understand what’s good and what’s bad about the new design. In this post I’m not going to do any before and after comparisons — they’ll be coming in due course. Instead, I’m going use RabbitMQ to benchmark harddiscs — an SSD, and a normal rotating harddisc. As someone said at the presentation we gave at the recent Erlang Factory, “using SSDs are just like RAM”. Cue expectations of a turbo-charged, overclocked, overvolted Rabbit, with liquid nitrogen cooling. (more…)

by
matthew
on
14/07/09

RabbitMQ: Achieving Scale with Messaging and the Cloud

On the 9th, last Thursday, I spoke at the Online Gaming High Scalability SIG at Skills Matter. The talk covered

  • an introduction to Messaging (what it’s for, why you might like to use it),
  • a couple of pointers in the directions of examples of Messaging being used at scale in the cloud, and
  • the main part of the talk, an overview of techniques for scaling up Messaging-based distributed systems.

The slides are available for download here (PDF - with notes), and are also available on SlideShare:

View more documents from leastfixedpoint.
by
tonyg
on
13/07/09

RabbitMQ: PubSub-over-Webhooks with RabbitHub

RabbitHub is our implementation of PubSubHubBub, a straightforward pubsub layer on top of plain old HTTP POST — pubsub over Webhooks. It’s not well documented yet (understatement), but that will change.

It gives every AMQP exchange and queue hosted by a RabbitMQ broker a couple of URLs: one to use for delivering messages to the exchange or queue, and one to use to subscribe to messages forwarded on by the exchange or queue. You subscribe with a callback URL, so when messages arrive, RabbitHub POSTs them on to your callback. For example,

(The symmetrical …/subscribe/x/… and …/endpoint/q/… also exist.)

The PubSubHubBub protocol specifies some RESTful(ish) operations for establishing subscriptions between message sources (a.k.a “topics”) and message sinks. RabbitHub implements these operations as well as a few more for RESTfully creating and deleting exchanges and queues.

Combining RabbitHub with the AMQP protocol implemented by RabbitMQ itself and with the other adapters and gateways that form part of the RabbitMQ universe lets you send messages across different kinds of message networks — for example, our public RabbitMQ instance, dev.rabbitmq.com, has RabbitHub running as well as the standard AMQP adapter, the rabbitmq-xmpp plugin, and a bunch of our other experimental stuff, so you can do things like this:

RabbitHub example configuration

  • become XMPP friends with pshb@dev.rabbitmq.com (the XMPP adapter gives each exchange a JID of its own)

  • use PubSubHubBub to subscribe the sink http://dev.rabbitmq.com/rabbithub/endpoint/x/pshb to some PubSubHubBub source — perhaps one on the public Google PSHB instance. (Note how the given URL ends in “x/pshb”, meaning the “pshb” exchange — which lines up with the JID we just became XMPP friends with.)

  • wait for changes to be signalled by Google’s PSHB hub to RabbitHub

  • when they are, you get an XMPP IM from pshb@dev.rabbitmq.com with the Atom XML that the hub sent out as the body

RabbitHub is content-agnostic — you don’t have to send Atom around — so the fact that Atom appears is an artifact of what Google’s public PSHB instance is mailing out, rather than anything intrinsic in pubsub-over-webhooks.

We’ve also been experimenting with using http://www.reversehttp.net/ to run a PubSubHubBub endpoint in a webpage — see for instance http://www.reversehttp.net/demos/endpoint.html and its associated Javascript for a simple prototype of the idea. I’m playing with building a simple PSHB hub in Javascript using the same tools.

by
tonyg
on
30/06/09
2000-9 LShift Ltd, 1st Floor Office, Hoxton Point, 6 Rufus Street, London, N1 6PE, UK +44 (0)20 7729 7060