Plugin exchange types for RabbitMQ

By: on January 22, 2010

An obvious extension point for an AMQP broker is the addition of new types of exchange. An exchange type essentially represents an algorithm for dispatching messages to queues, usually based on the message’s routing key, given how the queues are bound to the exchange — it’s a message routing algorithm.

At a minimum, supporting new exchange types requires only some scaffolding to plug in to (an exchange type registry) and a hook for routing messages. However, this wouldn’t support some more interesting use cases, and in particular it didn’t support our [motivating use case](http://oceanobservatories.org/spaces/download/attachments/19432960/ooi-amqp-api-20091228.pdf?version=1). Exchange types that want to keep their own state need to be initialised, and be notified about other lifecycle events.

The branch bug22169 default branch of RabbitMQ supports plugin exchange types, by providing a behaviour for exchange type modules to implement, and an exchange type registry to map a module to a type (i.e., what the client supplies in the type field of exchange.declare).

The behaviour requires exported hooks for validating exchange declarations, creating exchanges, recovering durable exchanges, publishing to an exchange (this is where the routing comes in), maintaining bindings, and deleting an exchange. RabbitMQ continues to maintain the database of exchanges and bindings, and calls the hooks after it’s done its own bookkeeping.

For simplicity, the hooks are not called atomically with the bookkeeping; so, it is possible for instance to publish to a new exchange for which the hook has not completed. However, provided there are no asynchronous operations in the hook implementation, the hook will have completed by the time the OK message is sent to the client. This is no more racey than AMQP itself, with consistency at the channel level where operations follow a single thread of control.

Here’s an example that simply `io:format`s things as they happen:

-module(rabbit_exchange_type_debug).
-include(“rabbit.hrl”).

-behaviour(rabbit_exchange_type).

-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2]).
-export([register_debug_types/0]).
-include(“rabbit_exchange_type_spec.hrl”).

-rabbit_boot_step({debug_exchange_types,
[{description, “debugging exchange types”},
{mfa, {?MODULE, register_debug_types, []}},
{requires, rabbit_exchange_type_registry},
{enables, exchange_recovery}]}).

description() ->
[{name, <<"debug">>},
{description, <<"Debugging exchange">>}].

backing_module(#exchange{ type = Type }) ->
%% Presume that Type is EITHER one of the standard types —
%% i.e,. that we have been registered this module as direct,
%% topic, fanout or match — or, for testing purposes, it’s
%% registered (as in the boot steps above) as debug_direct, etc.
Type1 = case atom_to_list(Type) of
“x-debug-” ++ T -> T;
T -> T
end,
list_to_existing_atom(“rabbit_exchange_type_” ++ Type1).

publish(Exchange, Delivery) ->
io:format(“Publish ~p to ~p~n”, [Delivery, Exchange]),
Module = backing_module(Exchange),
Module:publish(Exchange, Delivery).

validate(X) ->
io:format(“Validate ~p~n”, [X]),
(backing_module(X)):validate(X).

create(X) ->
io:format(“Create ~p~n”, [X]),
(backing_module(X)):create(X).

recover(X, Bs) ->
io:format(“Recover ~p with bindings ~p~n”, [X, Bs]),
(backing_module(X)):recover(X, Bs).

delete(X, Bs) ->
io:format(“Delete ~p with bindings ~p~n”, [X, Bs]),
(backing_module(X)):delete(X, Bs).

add_binding(X, B) ->
io:format(“Add binding ~p to ~p~n”, [B, X]),
(backing_module(X)):add_binding(X, B).

remove_bindings(X, Bs) ->
io:format(“Delete bindings ~p from ~p~n”, [Bs, X]),
(backing_module(X)):remove_bindings(X, B).

register_debug_types() ->
lists:foreach(
fun (T) ->
rabbit_exchange_type_registry:register(T, ?MODULE)
end,
[<<"x-debug-direct">>,
<<"x-debug-topic">>,
<<"x-debug-fanout">>,
<<"x-debug-headers">>]).

Bit by important bit:

-behaviour(rabbit_exchange_type).
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2, add_binding/2, remove_bindings/2]).

`rabbit_exchange_type` specifies these exported callbacks.

-include(“rabbit_exchange_type_spec.hrl”).

This include has the specs for each of the exported functions, if you’re using specs.

-rabbit_boot_step({debug_exchange_types,
[{description, “debugging exchange types”},
{mfa, {?MODULE, register_debug_types, []}},
{requires, rabbit_exchange_type_registry},
{enables, exchange_recovery}]}).

This uses the new boot sequence mechanism to register the exchange type during boot. The “enables” and “requires” say that the function given as mfa above must be run after the exchange type registry is available, but before any exchanges are recovered.

publish(Exchange, Delivery) ->
io:format(“Publish ~p to ~p~n”, [Delivery, Exchange]),
Module = backing_module(Exchange),
Module:publish(Exchange, Delivery).

This exchange type simply delegates to a “backing” exchange type.

register_debug_types() ->
lists:foreach(
fun (T) ->
rabbit_exchange_type:register(T, ?MODULE)
end,
[<<"x-debug-direct">>,
<<"x-debug-topic">>,
<<"x-debug-fanout">>,
<<"x-debug-headers">>]).

`rabbit_exchange_type_registry` maintains a registry of type to module; because of this indirection, we can register this module as many different types, then check the declared type of the exchange in our hook to see which type we’re expected to be. Note that the AMQP specification requires the “x-” prefix for non-standard exchange types.

This should reach default branch soon after RabbitMQ 1.7.1 is
released.
This is in RabbitMQ’s default branch, and should be in the next release (1.7.2).

You can drop modules straight into `src`, but they are better packaged
as plugins — follow the drill at [the plugin development
guide](http://www.rabbitmq.com/plugin-development.html](http://www.rabbitmq.com/plugin-development.html)
(and note that your plugin may only need to be a library application).

[EDITS: Updated sample code to keep up with name changes, and correct use of list_to_atom to list_to_existing_atom]
[Further EDITS: slight API change, now in default branch]

FacebookTwitterGoogle+

2 Comments

Post a comment

Your email address will not be published.

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>