Erlang/OTP Forums

Author Message

<  RabbitMQ mailing list  ~  Dynamically binding to a topic exchange without losing messa

Guest
Posted: Fri Oct 02, 2009 2:07 pm Reply with quote
Guest
Hi,

I have a usage scenario that I am having a bit of trouble translating into exchanges, queues and bindings. This is using the Erlang client.

I have a stream of messages that have the identifying characteristics <type> and <id>. I would like to process these such that I can recognise a particular <type>=START to indicate that all messages related to <id> that follow are to be processed in some fashion by a new Erlang process.

So, I declare a topic exchange and bind a queue with the key 'START.*'. I now get my "starter" messages, nice. My naive idea was to have this starter create a new queue and bind it with key '*.ID' and then have a new Erlang process work on the messages arriving there.

What I am trying to get my head around is, how can I quarantee that any messages arriving for the new ID don't get lost while my starter process is setting stuff up? For example, a message with key START.27 arrives, the starter now sets up the new queue and binding against '*.27', but before that is complete, a message with key X.27 got published (and potentially lost).

One way I could see getting this to work, would be to have my starter process read _all_ messages (not just the START ones), synchronously starting the sub processes, and passing any non-starter messages to another exchange where the sub-processes have bound their queues.

Somehow this "pass it along" approach seems a bit clunky to me. Is there a more elegant way? Or, alternatively, does RabbitMQ Erlang Client have some nice and easy functions to "consume message from here, look at it, and if not for me, publish it to there"? It looks to me as if I need to transmit the RoutingKey as part of the message, as it doesn't seem to be part of the #content{} structure. But maybe I've missed something.

Thanks for any thoughts,
Robby


Post received from mailinglist
0x6e6562
Posted: Fri Oct 02, 2009 3:14 pm Reply with quote
User Joined: 12 Jul 2007 Posts: 250
Robby,

On Fri, Oct 2, 2009 at 3:06 PM, Robert Raschke <rtrlists@googlemail.com> wrote:
> What I am trying to get my head around is, how can I quarantee that any
> messages arriving for the new ID don't get lost while my starter process is
> setting stuff up? For example, a message with key START.27 arrives, the
> starter now sets up the new queue and binding against '*.27', but before
> that is complete, a message with key X.27 got published (and potentially
> lost).

Don't know if this helps but if you're concerned about about messages
being binned, then you can set the mandatory flag when publishing the
message which will have the effect that if the message could not get
routed to at least one queue, it will be returned to the sender.

Ben

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Post received from mailinglist
View user's profile Send private message
Guest
Posted: Fri Oct 02, 2009 3:33 pm Reply with quote
Guest
Hi Robert,

In terms of your application, is only one Erlang process every going to process a given stream? Would it be possible to change the model slightly, and in a start message send the name of a queue instead? That way your producer could allocate a queue, connect it to that topic, and send out a message telling consumers where to listen.

(Some might argue this violates some AMQP design principles though, since it starts coupling the sender to the receiver more than you'd otherwise hope)

Paul.

On Fri, Oct 2, 2009 at 3:06 PM, Robert Raschke <rtrlists@googlemail.com (rtrlists@googlemail.com)> wrote:
Quote:
Hi,

I have a usage scenario that I am having a bit of trouble translating into exchanges, queues and bindings. This is using the Erlang client.

I have a stream of messages that have the identifying characteristics <type> and <id>. I would like to process these such that I can recognise a particular <type>=START to indicate that all messages related to <id> that follow are to be processed in some fashion by a new Erlang process.

So, I declare a topic exchange and bind a queue with the key 'START.*'. I now get my "starter" messages, nice. My naive idea was to have this starter create a new queue and bind it with key '*.ID' and then have a new Erlang process work on the messages arriving there.

What I am trying to get my head around is, how can I quarantee that any messages arriving for the new ID don't get lost while my starter process is setting stuff up? For example, a message with key START.27 arrives, the starter now sets up the new queue and binding against '*.27', but before that is complete, a message with key X.27 got published (and potentially lost).

One way I could see getting this to work, would be to have my starter process read _all_ messages (not just the START ones), synchronously starting the sub processes, and passing any non-starter messages to another exchange where the sub-processes have bound their queues.

Somehow this "pass it along" approach seems a bit clunky to me. Is there a more elegant way? Or, alternatively, does RabbitMQ Erlang Client have some nice and easy functions to "consume message from here, look at it, and if not for me, publish it to there"? It looks to me as if I need to transmit the RoutingKey as part of the message, as it doesn't seem to be part of the #content{} structure. But maybe I've missed something.

Thanks for any thoughts,
Robby


_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com (rabbitmq-discuss@lists.rabbitmq.com)
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss




Post received from mailinglist
Guest
Posted: Fri Oct 02, 2009 4:09 pm Reply with quote
Guest
Ben and Paul, thanks for your thoughts.

My producer is generating a constant stream of messages with keys Type.Id, where the Type is one of a few hundred possible values and the Id identifies is an entity where something of that Type just happened. More concretely, in a trouble ticketing system for example, tickets have an Id and they constantly get updated with stuff of a certain Type.

After a particular Type of update, I need to start processing all following updates on the ticket with the given Id, up until some kind of "stop" update happens.

So I was thinking of having one process per ticket that needs monitored (~1000 at any one time). But the producer doesn't know which ones will get monitored. And I have to ignore all messages that are not of type "start monitoring" and aren't currently monitored by a process. Thus I can't use the mandatory flag either.

I'll try playing with the "look and forward" approach to see how awkward it'll be.

Thanks,
Robby


Post received from mailinglist

Display posts from previous:  

All times are GMT
Page 1 of 1
This forum is locked: you cannot post, reply to, or edit topics.

Jump to:  

You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum
You cannot vote in polls in this forum
You cannot attach files in this forum
You cannot download files in this forum