2013-04-23

Rolling your own message/job queue in MySQL (Part 3: Message Queue Implementation)

Note: These blog posts are in a stream-of-counciousness style with limited revision so I can rapidly progress without worrying about polishing the post.  If you notice a mistake, something missing, or even just confusing portions, let me know and I'll attempt to revise that portion.

The specification

Okay, in part 2 we created a rough specification for what we need to implement a message queue in mysql.  I'll summarize them here:

  1. A unique id for each message.
  2. A field for holding a unique transaction id (initially null) to prevent multiple dequeuing clients from colliding over the same message.
  3. One or more payload fields.
  4. Queuing a message is a row insertion.
  5. Dequeuing a message is a matter of an UPDATE with a LIMIT setting a unique transaction identifier on entries that do not have one, and then a subsequent SELECT for rows matching that unique transaction identifier.
  6. Accepting a message is a matter of deleting that row from the table.  If we need a history of messages, we will move the row to an archival table.
  7. Rejecting a message if achieved by resetting that message's unique transaction identifier back to null.
That's fairly straight-forward, and quite easy to implement.  For our implementation we'll use MySQL's AUTO_INCREMENT capability for the primary key to get a unique id for each message, and contrary to the example in part 2, we'll use a simple integer for the unique transaction id, as that should be sufficient for our needs.

The implementation

With the specification taken care of, our schema is extremely simple:

CREATE TABLE `queue_test` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `transaction_unique` mediumint(8) unsigned DEFAULT NULL,
  `payload` text,
  PRIMARY KEY (`id`),
  KEY `transaction_unique` (`transaction_unique`)
) ENGINE=InnoDB;

Now, it's perfectly valid to write SQL functions for queue and dequeue actions, and the accept and reject actions, but I'm not a fan of moving too much of my model code to the database.

While it does enforce an extra level of consistency on the data, I feel constrained architecturally and feel it won't scale as easily, since the database is generally much harder to scale than front-end servers in my experience.  Truthfully, there's probably a good trade-off point where there's real benefits from reducing round trips to the database by coding small quick routines as SQL functions, and our Dequeue implementation probably falls squarely into this.  I'm not going to implement that here though.

The code


Now, let's work up an implementation to this in an actual programming language.  I'm going to use Perl and DBIx::Class, because that's what I'm comfortable in and I believe it's a fairly compact but readable syntax.  If you have problems with that, feel free to translate into your favorite language.  Actually, please do, as that's the best way to get a feel for this.  For my following examples, please assume they all reside in the same file, even though I'll be presenting them in discrete chunks.

use 5.014; # implies strict
use warnings;
use utf8;

This is a fairly regular preamble in modern Perl.  We want to use newer features from version 5.14 (which implies "use strict;"), turn on warnings, and tell perl to expect possible UTF-8 data within the script (a sane default).

package Queue::Message {
    use base 'DBIx::Class::Core';
    __PACKAGE__->table( 'queue_test' );
    __PACKAGE__->add_columns(qw( id transaction_unique payload ));
    __PACKAGE__->set_primary_key( 'id' );
    __PACKAGE__->resultset_class( 'Queue::MessageBroker' );

    # To "accept" a message, we delete that row. This leaves the ORM object alone
    sub message_accept {
        my $self = shift;
        return $self->delete;
    }
    # To "reject" a message, we mark it as no longer part of a transaction
    sub message_reject {
        my $self = shift;
        return $self->update({ transaction_unique => undef })
    }

    # Convenience method to automatically convert the JSON payload
    sub message {
        my $self = shift;
        return decode_json( $self->payload );
    }

    1; # Class returns true
}

Here we have our Message class, which both defines our metadata for our table specification in DBIx::Class (in a shorthand syntax), and defines our message accept and reject methods, as defined in our specification.  Additionally, it provides a convenience method to decode the JSON we are serializing our message to so it can be easily stored.

package Queue::MessageBroker {
    use base 'DBIx::Class::ResultSet';
    use JSON;
    __PACKAGE__->load_components(
        qw(Helper::ResultSet Helper::ResultSet::Shortcut)
    );

    sub message_queue {
        my ($self,$message) = @_;
        return $self->create({ payload => encode_json( $message ) });
    }
    sub message_dequeue {
        my $self        = shift;
        my $wanted_msgs = shift || 1;
        my $uniq        = int(rand(2**24));
        # Mark some messages as part of this transaction
        my $result = $self->order_by('id')
                          ->rows($wanted_msgs)
                          ->search({ transaction_unique => undef })
                          ->update({ transaction_unique => $uniq });
        # Return marked messages
        return $self->search({ transaction_unique => $uniq })->all;
    }
    sub message_count { shift->search({ transaction_unique => undef })->count };

    1; # Class returns true
}

This is the definition of our Broker class, which as a DBIx::Class::ResultSet subclass handles the actual query operations.  Beyond some simple DBIx::Class setup for our inherited methods, plus a few convenience methods (chained order_by and rows methods), our Broker is responsible primarily for actually inserting (queuing), retrieving and deleting (dequeuing) messages, and as such methods to do those operations take up a majority of the implementation. 

package Queue {
    use base 'DBIx::Class::Schema';
    __PACKAGE__->load_classes( 'Message' );
    1; # Class returns true
}

Finally, we have our base ORM class.  This comes last because it expects to parse the object schema information from the Queue::Message class we defined above, and needs to know about any specifics we set in the ResultSet (MessageBroker) class.  Normally it can automatically find and load everything by itself from the separate files they are defined it, but since we are using a single file we have to be careful about the order we define classes.

That's it.  That the entire definition of our initial, somewhat naive implementation of a message queue on top of MySQL.  The only thing left to add is some code to drive the implementation and test that it works, so we might as well add that.

package main;
use JSON;
use Time::HiRes qw(time);

# Connect to DB with DBIx::Class schema object
my $schema = Queue->connect(
        "DBI:mysql:database=test;host=localhost;mysql_socket=/var/lib/mysql/mysql.sock",
        'queuetest_user', 'queuetest_pass', { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
);
# Get our broker (DBIx::ResultSet for Message class)
my $broker = $schema->resultset('Message');

# Queue 4 messages with a high-resolution timestamp for data
$broker->message_queue({ time => time() }) for 1..4;
my @messages;

say 'Get one message, 4 -> 3';
say $broker->message_count, ' messages available';
@messages = $broker->message_dequeue;
say scalar(@messages) . " received";
printf("message: %d, time: %s\n", $_->id, $_->message->{time}) for @messages;
$_->message_accept for @messages;
print "\n";

say 'Get two messages, but reject them, 3 -> 3';
say $broker->message_count, ' messages available';
@messages = $broker->message_dequeue(2);
say scalar(@messages) . " received";
printf("message: %d, time: %s\n", $_->id, $_->message->{time}) for @messages;
$_->message_reject for @messages;
print "\n";

say 'Get two messages, 3 -> 1';
say $broker->message_count, ' messages available';
@messages = $broker->message_dequeue(2);
say scalar(@messages) . " received";
printf("message: %d, time: %s\n", $_->id, $_->message->{time}) for @messages;
$_->message_accept for @messages;
print "\n";

say 'Try to get two messages when 1 available, 1 -> 0';
say $broker->message_count, ' messages available';
@messages = $broker->message_dequeue(2);
say scalar(@messages) . " received";
printf("message: %d, time: %s\n", $_->id, $_->message->{time}) for @messages;
$_->message_accept for @messages;
print "\n";

Next time we'll look at performance, and see how quickly we can get messages in and out of this system.

2013-04-22

Rolling your own message/job queue in MySQL (Part 2: Message Queue Initial Spec)

Forgive me if this is dense and/or has errors, I'm doing this as a sort of stream of consciousness dump with limited revision, not a polished blog post.

The first step to implementing a message queue in SQL is to define a table.  If we want a sane implementation, we're going to have to take a look at what sort of usage we expect and plan our schema accordingly.

Let's revisit the AMQP protocol capabilities to spec this out:

  1. some standard outcomes for transfers, through which receivers of messages can for example accept or reject messages
  2. a mechanism for indicating or requesting one of the two basic distribution patterns, competing- and non-competing- consumers, through the distribution modes move and copy respectively
  3. the ability to create nodes on-demand, e.g. for temporary response queues
  4. the ability to refine the set of message of interest to a receiver through filters
For our initial implementation, let's focus on #1.  The other items are either of limited interest to us initially, or we already get for free (filtering is inherent in SQL).

One of the items I glossed over in the prior article was the performance considerations inherent in the design.  In addition to that, we need to decide whether we want auditing of past messages.  If we don't require a log of all prior messages, then we can just delete them from the table, which is useful because this will keep our table from growing indefinitely.  That's good, because a large table will cause our queries to slow over time, and eventually the system could fall over under it's own historical weight.  Imagine a system that processing 100,000 messages a day, usually with no more than 1,000 active at a time.  By the end of the first day, every scan of the table for active messages will have to scan 99,000 past entries (in a naive worst case example) to see the active entries.  This can be made much better with good indices, but in the end you are still indexing entries you don't care about, or at least don't care about for normal operation.

A better solution is for us to archive past entries to a separate table if we require auditing entries.  This allows us the ability to use past messages for statistical queries, while not impacting the active message queue table. This allows us to make the first rule for how our schema will act:
  • For each queue that requires history, there will be a secondary table that contains archived messages that have been accepted by a client.  Upon acceptance of a message, the message will be inserted into the corresponding archive queue table and deleted from the main queue table.
Now that we've dealt with that, we can look into the actual mechanisms used to submit and receive messages.  Submission is easy enough, to submit a message we insert a now into the queue table.  This row should have a unique id and a payload.

The message itself has no state that the receiver needs to be aware of, if it is available, it can be processed.  Unfortunately SQL is not inherently suited for parceling out a single row to an individual client while preventing access by other clients.  SQL by itself does not support a method to both retrieve and update a row atomically.  This means multiple clients could conceivably SELECT the same data before updating/deleting it to signal acceptance by that client.  To achieve this we'll have to combine a few other SQL features together to simulate an atomic UPDATE and SELECT action.  The core idea of this is that we use another field to denote the retrieval status of a message.  Using this field, we can attempt to update a number of rows that are not already in the process of being handed off to a client to set them to a unique state, and then select the rows that match that state.  It looks something like this:

# Assume client_unique is empty on newly inserted messages
SET @WANTED_ROWS=10;
SET @UUID=UUID();
UPDATE queue_table
   SET transaction_unique=@UUID
 WHERE transaction_unique IS NULL
 LIMIT $WANTED_ROWS

SELECT id, payload
  FROM queue_table
 WHERE transaction_unique=@UUID

We now have a set of rows that we have designated for a single client, and we designated them in an atomic way so we don't have to worry about another client getting the same rows.  This also allows us to achieve capability #1 of the AMQP protocol, which is to allow accepting or rejecting messages at the client level.  All we have to do to accept the message is to delete it and optionally recreate it in the archive table.  No other client will be able to access it as long as they use the same access mechanism we've defined above.  to reject a message, we just set the transaction_unique back to null, and other clients will be able to pick it up.

At this point you may have noticed a problem with this algorithm.  What happens if the client crashes at some point after designating some messages as received but before accepting or rejecting them all?  To account for this we probably need a reaper of some sort that looks for and resets messages that have been in the received but not accepted state longer than a timeout period.  In our initial, naive implementation, we'll do this on client shutdown, and examine alternate solutions when we look for ways to enhance performance later.  Unfortunately, to track time since a message was received, we then need to add a time field to each row, but at least it will also track closely the accepted date of the message (we would need to add yet another time field if we want to track the creation date, which may be worth doing).

At this point we've outlined how to submit messages (plain insert), receive messages (update to unique identifier with a limit, and select for that identifier), and archive messages if we require that.  Next, we can attempt to implement this, and find out where our assumptions and ideas fall apart.  I'll be doing that using Perl, and I don't imagine anyone with programming experience will have trouble following along, but I'll try to explain any areas I think may be confusing explicitly or with comments in the code, whichever seems more appropriate.

Rolling your own message/job queue in MySQL (Part 1)

Let's do an experiment.  Let's theorize on what we need to implement a job queue in MySQL.  I'll use this as a learning experience to discover more about message and job queue systems.  But first, an explanation as to why we're doing this in MySQL and not using some other, optimized software:
  • We can tightly couple our jobs to our data, allowing for better statistics
  • We can use all the existing tools we have at our disposal for SQL integration (ORMs, etc.)
  • We can use the raw flexibility of SQL itself
  • It will allow me to prototype these systems quickly without having to worry about the underlying data structure (beyond SQL schema) and transfer protocol (although we will need to design our API).
At the same time, there's some distinct disadvantages.  Some of which are:
  • Without special care, future usage and data growth may impair function.  Special care needs to be paid to future use.
  • The storage medium is not optimized as a queue, so it will never be quite as fast as a dedicated solution.
  • Too much explicit linking to data in the specification may hamper general purpose use, while too little will negate the benefits of using the same system as the data.
It's a fine line between taking too much advantage of the data in the system, and not enough.  It probably changes as time progresses as well, making it even harder.  I think the best way to deal with this particular complexity is to plan for one table per job type or category.  This allows you to link to the underlying data for that job appropriately without imposing on other job types.  As a fallback, a general queue with an indexed type field and arbitrary payload (serialized JSON, perhaps) could stand in for general jobs/messages until that type graduated to needing it's own table through needed data coupling.

With these requirements in place, we can now address the implementation of a general purpose job/message queue in a MySQL.  To help us make a general purpose implementation that isn't unintentionally influenced too much by our own data and needs (let's assume we can't perfectly model all our future needs), take note of some of the features of leading message and job queue software.

The AMQP protocol section on messaging capabilities has a few items of interest:
  1. some standard outcomes for transfers, through which receivers of messages can for example accept or reject messages
  2. a mechanism for indicating or requesting one of the two basic distribution patterns, competing- and non-competing- consumers, through the distribution modes move and copy respectively
  3. the ability to create nodes on-demand, e.g. for temporary response queues
  4. the ability to refine the set of message of interest to a receiver through filters
All of these seem like sane capabilities we would want to have.  The fourth item is already inherently available by our choice of SQL, so that's a bonus!  Now let's look at a job queue implementation, and see what it throws into the mix.  For this, let's look at Gearman.  It's a relatively popular job queue implementation, so by looking at a client implementation we should get a good idea what they view as requirements to implement a good job queue system.  For this I'm going to look at Perl's Gearman::Task.  I'll summarize the methods it supplies and their uses, from the perldoc page:
  • uniq - If this job exists in the queue, merge this request with that one, and notify both clients when finished
  • on_status - Call specified code when job changes status
  • on_{complete,retry,fail} - Call the specified code when the specific action/status reached
  • retry_count - Allow this many failures before giving up on job
  • timeout - set status to failed if specified time has elapsed without a success or fail status already
From this it becomes somewhat obvious that while there are similarities between message queues and job queues, they have different responsibilities.  Message queues concern themselves with correct delivery of messages to one or more consumers, with performance being a major constraint.  Additionally, message queues appear to be mainly unidirectional.  You put a message in the queue, and it's consumed, but the queue does not concern itself with the content of the message or eventual status.  Job queues concern themselves more with the eventual state of the message, or job.  They track what should happen on success or failure, and provide mechanisms to track that (asynchronously) at the submission level.  In fact, a lot of job queues are implemented on top off message queues, as some of the implicit requirements are similar enough that they can be reused or built upon easily.

So, what have we learned so far?  We've learned some of the core differences between message queues and job queues, and that their individual uses are different enough one solution encompassing both will probably serve neither need well.

So, where does that leave us?  Well, I think we need to build a message queue, which is the simpler of the two, and then we can look at a job queue.  With a little luck, we'll be able to take advantage of the medium to not just build a job queue on top of a message queue, but alter some of our behavior subtly to achieve the same result.  I can imagine a post for each initial implementation, plus some testing against existing systems at the end, so it looks like we may be in for at least four parts.


I need a dedicated message queue... or do I?

I'm starting the next step of a project at work, and that's presented me with a problem.  I need a job queue.  That in itself is not too problematic, but unfortunately I've been paralyzed by indecision over the last few days.  I've been reading about these new-fangled message queues for a while now, so in an effort to determine the best technology for the task I started surveying the information I could find about message queues and how they apply to my problem.  Unfortunately, there's a little voice in the back of my head saying "This isn't hard, just make a table and write some helpers for your ORM, you've done this before...", and I'm having trouble faulting that logic.

The problem is, while I really want to use the best technology for the task here, as the only technology based employee at a small (< 10 employees) company, I don't really have the time to spend days or weeks testing out implementations.  I need to bring my own experience (and where that fails the wisdom of the crowd) to bear.  As such, I'm going to try to present this problem in a general way, so it's useful to a wider audience, and in exchange hopefully I'll get some useful information back in comments and discussion.

With that out of the way let's start with a definition of my needs.  This is a small company, but they are moving heavily into analysis of a growing dataset to help make better decisions, streamline employee tasks and reduce labor.  As such, there's a need for a general job queue that can:

  • Track jobs that have no assigned employee
  • Allow a job to be assigned/unassigned to/from an employee
  • Allow an employee to mark a job as finished, or delay until a later date
  • Jobs must be able to be shifted to other states such as pending review
  • For some subset of jobs that are currently performed by an employee, jobs will eventually be mostly, and then fully performed automatically

That's a fairly open set of requirements.  With those, I think I have my pick of any number of message queue back-ends (brokers) with fewer, but still numerous options for a specialized job queue on top.  Then again, the draw of a RDBMS/SQL solution is strong here.  I can see lots of real tangible benefits to using a native SQL solution, but at the same time, I can see the benefits (immediate and future) of dedicated queue software.  So far my thought process has broken down like so for the benefits of staying within the RDBMS and making my own system:

  • Tighter coupling with data - better/easier statistical querying
  • Familiar - less likely to find surprising behavior
  • Known performance & implementation (I already know how I would implement this)
  • Reliability and persistence are well known, and non-issues
  • Keeps system complexity down by not introducing other failure points

In looking into this, came across a good post on implementing queues in SQL, https://blog.engineyard.com/2011/5-subtle-ways-youre-using-mysql-as-a-queue-and-why-itll-bite-you.  That all rings true, but other than some interesting ideas on how to hack up a performant polling solution, it's not really anything I didn't already know.  This isn't the first time I've ended up with a queue in in an RDBMS, I know a lot (but I doubt all!) of the pitfalls to avoid.  In truth, I think if I had a bit more support from the database in certain areas, such as polling/notify support or built in key-value stores, I wouldn't be as conflicted.  Alas, I'm using MySQL (Percona's variant), and while I generally like it, these are lacking areas.  Where this gets interesting is when I start comparing against the inherent and implicit strengths of implementing a queue on top of a RDBMS against the strengths of using a dedicated queuing system:

  • Optimized for queue performance
  • Subscription/notify system (not all, but this is useful to me)
  • Less likely to accidentally code data dependency in queue implementation, complicating future use
  • More opportunity to use fast queues as a general purpose structure for IPC and scalability
  • Ability to use and learn from a new technology

To be fair, I think the last point for an RDBMS implementation and the last two points for a dedicated queue solution don't really apply in this case.  I can see a use for a dedicated queue in the future in other parts of the system, so I imagine I'll have one sooner or later anyways, but I do think they are useful points in general, especially since I think they outline my current problem fairly well.  What trade-off do I take, something better suited to the data it's representing and the usages cases it needs, or something that keeps complexity low and allows easier data correlation?

I think what I need here is more information from people that have chosen one route or the other, and advantages and disadvantages they think that brought.  There may be techniques that mitigate some of the downsides I perceive for either solution that I'm unaware of.

Update: I decided to implement in MySQL as an experiment to learn the needs and requirements of message queue's intimately.  You can see the first part here.