The first problem we ran into with subscribing to a queue was if we forcefully kill a client, a large number of messages would disappear from the queue (far more than we had processed on the client). I’ll get to the “why” of this in a minute, but the solution was simply to turn on acknowledgments (or acks) for our messages which was something we knew we wanted to do anyway.
So after we turned on acks and started processing items from the queue we noticed that the number of items in the queue was not actually going down even though we were correctly sending the ack when we were done processing.
The AMQP gem uses EventMachine under the covers to manage the connections to the RabbitMQ server. It turned out that when you subscribe to a queue, it is a one time thing. You establish a connection and that is it. The server then sends you messages from that queue over the socket. RabbitMQ pre-fetches messages for you, meaning it crams a bunch of data over the socket and doesn’t wait for you to ask for more, it notices that you’ve read data off the socket, and pushes more to you.
The repercussions of this in the EventMachine world is a major blockage of data. EventMachine has an internal loop where it goes through registered sockets, and processes all the data off any sockets that are ready to be read from before it continues its loop. The server was basically keeping the socket full, so EventMachine would only complete a internal loop after we processed a full socket of data, and then it would get blocked on the next loop since the server has already filled the socket up again.
This means that all of our acks were sitting inside of EventMachine waiting for the loop to continue so they could be sent out. It also explains why when we weren’t using acks we were losing messages. The server had sent them to our socket and they were waiting to be processed and by killing the process we lost that data.
My first reaction was that the AMQP gem should be pulling all the data off the socket and caching it locally, then processing a single record off of that cache every time the EventMachine loop ran. This of course won’t work because as soon as we empty the socket, RabbitMQ is just going to fill it up again (until we have all the messages from the queue in our local cache).
So the solution? RabbitMQ 1.6 has an option to set a pre-fetch limit. So we simply set the pre-fetch limit to 1, and our EventMachine loop runs nice and fast now. You’ll want to tweak your pre-fetch limit depending on how long it takes to process each message. If you can churn through a hundred messages a second, you probably won’t even notice this problem and the prefetching will help you, but if it takes you a few seconds (or minutes) per message, you’ll wonder why things aren’t popped off the queue for several minutes (or hours).
About the AuthorMore Content by Joseph Palermo