Erlang/OTP Forums

Author Message

<  RabbitMQ mailing list  ~  New exchange type

Guest
Posted: Tue Apr 22, 2008 3:07 am Reply with quote
Guest
Hi all,

Recently I put together a new exchange type for RabbitMQ, because none of
the existing ones completely match our requirements. The problem it solves
is this: we want a publish-subscribe model where exactly one subscriber
receives any given message. If we set up a topic exchange and have all
subscribers listen on the same queue, we get this out of the box. But this
requires that all messages go to that one queue, which resides on one
machine, causing a pretty significant bottleneck. A reasonable way around
this was to implement a new exchange type ("anycast") that picks a random
queue when a message is routed. The diff and new file for
rabbit_exchange.erl are attached.

I'm very new to erlang, so feedback/comments/optimizations are extremely
welcome.

Thanks,
Kyle

http://www.nabble.com/file/p16820505/rabbit_exchange.erl.diff
rabbit_exchange.erl.diff
http://www.nabble.com/file/p16820505/rabbit_exchange.erl rabbit_exchange.erl
--
View this message in context: http://www.nabble.com/New-exchange-type-tp16820505p16820505.html
Sent from the RabbitMQ mailing list archive at Nabble.com.


_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Post recived from mailinglist
Guest
Posted: Tue Apr 22, 2008 5:16 am Reply with quote
Guest
Kyle,

Kyle Salasko wrote:
> Recently I put together a new exchange type for RabbitMQ, because none of
> the existing ones completely match our requirements. The problem it solves
> is this: we want a publish-subscribe model where exactly one subscriber
> receives any given message. If we set up a topic exchange and have all
> subscribers listen on the same queue, we get this out of the box. But this
> requires that all messages go to that one queue, which resides on one
> machine, causing a pretty significant bottleneck. A reasonable way around
> this was to implement a new exchange type ("anycast") that picks a random
> queue when a message is routed. The diff and new file for
> rabbit_exchange.erl are attached.
>
> I'm very new to erlang, so feedback/comments/optimizations are extremely
> welcome.

This looks pretty good and implements a useful feature.

It occurred to me that rather than introducing a new exchange type one
could place an indicator in the exchange arguments that turns any
exchange from a multicast to an anycast exchange. That has two advantages:

- we stay entirely within the defined protocol standard

- the user can select the most appropriate matching logic with the
exchange type, rather than being constrained to topic-based matching


Re optimisations: I haven't done any profiling on this, but I believe
erlang:now + random:seed are quite expensive, so it is costly to call
them every time a message is routed. I can think of three ways of fixing
that:

- don't call now+seed at all. That will result in the generated sequence
being the same for all publishing channels, which may or may not be a
problem

- call now+seed on channel creation. During normal operation channels
are the only processes invoking the routing code, so this catches all
the interesting cases.

- use a flag in the process dictionary to indicate whether the random
number generator has been seeded. Check/set that flag in your current
code that invokes random:uniform. This has the same effect as the
previous approach but is a less intrusive change since you don't need to
touch any other modules.


Matthias

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Post recived from mailinglist
Guest
Posted: Tue Apr 22, 2008 7:05 am Reply with quote
Guest
>> Recently I put together a new exchange type for RabbitMQ, because none of
>> the existing ones completely match our requirements. The problem it solves
>> is this: we want a publish-subscribe model where exactly one subscriber
>> receives any given message. If we set up a topic exchange and have all
>> subscribers listen on the same queue, we get this out of the box. But this
>> requires that all messages go to that one queue, which resides on one
>> machine, causing a pretty significant bottleneck. A reasonable way around
>> this was to implement a new exchange type ("anycast") that picks a random
>> queue when a message is routed. The diff and new file for
>> rabbit_exchange.erl are attached.
>>
I've seen this requirement before, however, AFAICS it breaks AMQP model.
The exchanges are meant to be used for message distribution, i.e. the
message should be delivered to *all* queues with appropriate binding to
the exchange. Queues are intended to do the load-balancing. Not adhering
to this model causes problems with some more sophisticated use cases
(wire-tapping, federation). It also introduces a lot of unfairness to
the load-balancing algorithm. Why do you believe a single queue would
create a significantly graver bottleneck when compared to a single exchange?

Martin

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Post recived from mailinglist
Guest
Posted: Wed Apr 23, 2008 3:20 am Reply with quote
Guest
Matthias,

Quote:
It occurred to me that rather than introducing a new exchange type one could place an indicator in the exchange arguments that turns any exchange from a multicast to an anycast exchange. That has two advantages:

- we stay entirely within the defined protocol standard

- the user can select the most appropriate matching logic with the exchange type, rather than being constrained to topic-based matching

I'll take a look at this when I get a chance, sounds like a decent idea to me.


Quote:
- use a flag in the process dictionary to indicate whether the random number generator has been seeded. Check/set that flag in your current code that invokes random:uniform. This has the same effect as the previous approach but is a less intrusive change since you don't need to touch any other modules.


Got this working, and less intrusive wins out in my mind. I'll send out another version of the diff when I get to the above change, or if I leave that on the back burner for too long.

Martin,

Quote:
I've seen this requirement before, however, AFAICS it breaks AMQP model. The exchanges are meant to be used for message distribution, i.e. the message should be delivered to *all* queues with appropriate binding to the exchange. Queues are intended to do the load-balancing. Not adhering to this model causes problems with some more sophisticated use cases (wire-tapping, federation). It also introduces a lot of unfairness to the load-balancing algorithm. Why do you believe a single queue would create a significantly graver bottleneck when compared to a single exchange?

From what I could tell from the RabbitMQ docs,
Guest
Posted: Wed Apr 23, 2008 5:19 am Reply with quote
Guest
Kyle,

Kyle wrote:
> Matthias,
> - use a flag in the process dictionary to indicate whether the
> random number generator has been seeded. Check/set that flag in your
> current code that invokes random:uniform. This has the same effect
> as the previous approach but is a less intrusive change since you
> don't need to touch any other modules.
>
>
> Got this working, and less intrusive wins out in my mind. I'll send out
> another version of the diff when I get to the above change, or if I
> leave that on the back burner for too long.

Btw, if you look at the code (and docs) of the 'random' module you'll
see it is using the process dictionary already to remember the last
seed. So, to save duplication of work, I'd use something like the following:

random_uniform(N) ->
Seed = case get(random_seed) of
undefined -> erlang:now();
Tuple -> Tuple
end,
{V, NewSeed} = random:uniform_s(N, Seed),
put(random_seed, NewSeed),
V.

> From what I could tell from the RabbitMQ docs, even if I have a single
> exchange, messages can be routed through any node in the cluster.
> Therefore there should be no significant bottleneck with a single
> exchange, because there is no single node that all messages must go
> through. With a single queue, however, every message has to be sent to
> the machine on which that queue lives, after which different subscribers
> can pull messages off. This looks like a bottleneck to me, where all
> messages go through one machine, whereas a single exchange does not have
> that problem (unless I missed something).

That analysis is correct. I think what Martin is getting at is that this
is really an implementation issue - in principle AMQP queues do what you
want, it just so happens that in RabbitMQ they are implemented as a
single (Erlang) process rather than being distributed. The latter is
something we are looking into, but it will take a while to implement.
Meanwhile your solution is an excellent way of addressing the problem.


Matthias.

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Post recived from mailinglist
Guest
Posted: Wed Apr 23, 2008 6:43 am Reply with quote
Guest
>
>> From what I could tell from the RabbitMQ docs, even if I have a
>> single exchange, messages can be routed through any node in the
>> cluster. Therefore there should be no significant bottleneck with a
>> single exchange, because there is no single node that all messages
>> must go through. With a single queue, however, every message has to
>> be sent to the machine on which that queue lives, after which
>> different subscribers can pull messages off. This looks like a
>> bottleneck to me, where all messages go through one machine, whereas
>> a single exchange does not have that problem (unless I missed
>> something).
>
> That analysis is correct. I think what Martin is getting at is that
> this is really an implementation issue - in principle AMQP queues do
> what you want, it just so happens that in RabbitMQ they are
> implemented as a single (Erlang) process rather than being
> distributed. The latter is something we are looking into, but it will
> take a while to implement. Meanwhile your solution is an excellent way
> of addressing the problem.
Thanks for explanation. I was just interested what the use case was.

Martin

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Post recived from mailinglist
Guest
Posted: Wed Apr 23, 2008 5:50 pm Reply with quote
Guest
Attached is a revised version of the first patch with the changes below. Rather than using a different exchange type, add "anycast" = 1 to the arguments passed to exchangeDeclare. I've tested this with fanout and topic exchanges with the RabbitMQ java library and py-amqplib from Barry Pederson.

Kyle

On Tue, Apr 22, 2008 at 1:16 AM, Matthias Radestock <matthias@lshift.net (matthias@lshift.net)> wrote:
Quote:

It occurred to me that rather than introducing a new exchange type one could place an indicator in the exchange arguments that turns any exchange from a multicast to an anycast exchange. That has two advantages:

- we stay entirely within the defined protocol standard

- the user can select the most appropriate matching logic with the exchange type, rather than being constrained to topic-based matching

- use a flag in the process dictionary to indicate whether the random number generator has been seeded. Check/set that flag in your current code that invokes random:uniform. This has the same effect as the previous approach but is a less intrusive change since you don't need to touch any other modules.



Post recived from mailinglist
Guest
Posted: Thu Apr 24, 2008 1:38 am Reply with quote
Guest
I accidentally deleted a couple of lines, causing the channel to crash on an anycast exchange with no listeners, because random:uniform(0) is undefined. Diff is attached.

Kyle

Post recived from mailinglist
Guest
Posted: Thu Apr 24, 2008 7:49 pm Reply with quote
Guest
Kyle,


Kyle wrote:
> Diff is attached.

a few comments ...

- Why is there the curious special-casing for the processing of the Args
for a topic exchange?

- You should be able to use AMQP's boolean type for the "anycast"
argument, rather than 0 and 1. Also, take a look at lists:keysearch.
With those two changes the code should simplify to this:

H = route_sub(Exchange, RoutingKey),
case lists:keysearch(<<"anycast">>, 1, Args) of
{value, {_Name, bool, true}} -> random_queue(H);
_Other -> H
end.


Matthias.

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Post recived from mailinglist
Guest
Posted: Thu Apr 24, 2008 10:17 pm Reply with quote
Guest
Matthias,

On Thu, Apr 24, 2008 at 3:49 PM, Matthias Radestock <matthias@lshift.net (matthias@lshift.net)> wrote:
Quote:
- Why is there the curious special-casing for the processing of the Args for a topic exchange?

The special case was already in the declare method in rabbit_exchange.erl:60. Args for a topic exchange is set to a tuple: {sets:new(), Args}, while Args for other exchanges is passed through as a list. The code you posted won't work unless Args is a list in all exchange types.


Quote:
- You should be able to use AMQP's boolean type for the "anycast" argument, rather than 0 and 1. Also, take a look at lists:keysearch. With those two changes the code should simplify to this:


When I ran some tests of checking for the anycast argument, it came up as a signedint. This may be due to the python library I used. I set anycast=True, and got a signedint in rabbit_exchange:route. I didn't check setting anycast=true in the java libs, I'll look into it. Otherwise the match would have to be {_Name, signedint, 1}, correct?

Kyle

Post recived from mailinglist
Guest
Posted: Fri Apr 25, 2008 5:57 am Reply with quote
Guest
Kyle,

Kyle wrote:
> On Thu, Apr 24, 2008 at 3:49 PM, Matthias Radestock <matthias@lshift.net
> <mailto:matthias@lshift.net>> wrote:
>
> - Why is there the curious special-casing for the processing of the
> Args for a topic exchange?
>
> The special case was already in the declare method in
> rabbit_exchange.erl:60. Args for a topic exchange is set to a tuple:
> {sets:new(), Args},

Ah, I am pretty sure that bit of code is no longer needed. We should fix
that. Well spotted.

> - You should be able to use AMQP's boolean type for the "anycast"
> argument, rather than 0 and 1. Also, take a look at lists:keysearch.
> With those two changes the code should simplify to this:
>
> When I ran some tests of checking for the anycast argument, it came up
> as a signedint. This may be due to the python library I used. I set
> anycast=True, and got a signedint in rabbit_exchange:route.

It turns out that the boolean table field type is a protocol extension
introduced by qpid that we subsequently implemented in our broker for
better interoperability. It is not supported by our Java client (we will
consider adding it) and, I suspect, the python lib you are using.

> Otherwise the match would have to be {_Name, signedint, 1}, correct?

Yes, that should work.


Matthias.

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Post recived from mailinglist

Display posts from previous:  

All times are GMT
Page 1 of 1
This forum is locked: you cannot post, reply to, or edit topics.

Jump to:  

You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum
You cannot vote in polls in this forum
You cannot attach files in this forum
You cannot download files in this forum