基于HTTP的消息队列

jopen 10年前

之前介绍过基于TCP的消息队列,这里在写个基于HTTP的消息队列。代码仅仅演示整个程序员的框架。不会去考虑性能和实用性。简单起见,我们只考虑固定URI的情况。当然,对于不同URI去存取不同的消息队列也是意见很简单的事情。

我们还是用脚本来实现。这里采用Mojolicious框架来作为我们的基础模块。

Mojolicious是基于EV的perl web框架。性能是非常不错的。而且,符合PSGI规范,写出来的程序可以独立运行,也可以作为FCGI运行。

这里,我们使用一个数组来作为消息队列的容器。消息队列入对,对应数组的push操作,出队列,对应于数组的shift操作(从数组的开头取出一个元素,这样才符合消息队列先进先出的规矩)。

我们用另外一个数组来保存暂时没有得到消息的用户请求。

注意,因为使用的是内存数组来存放消息,所以,这个服务只能但进程运行。如果希望能多进程提供服务,需要考虑别的途径来在进程之间共享消息队列载体。

代码很简单,这主要归功于Mojo的框架。

#!/usr/bin/env perl  use Mojolicious::Lite;  use Data::Dumper;  my $jobs = [];  my $reqs = [];  get '/' => sub {    my $self = shift;    if ( scalar(@{$jobs}) == 0 ) {        $self->render_later;        my $timeout=$self->param('timeout') || 20;        Mojo::IOLoop->stream($self->tx->connection)->timeout($timeout);        push(@{$reqs},$self);    } else {        my $job = shift @{$jobs};        $self->render(text => $job );    }  };    post '/' => sub {      my $self=shift;      my $msg = $self->req->body;      $self->render(text => 'ok');      if ( scalar(@{$reqs}) > 0 ) {          my $s = shift @{$reqs};          $s->render(text => $msg);      } else {          push(@{$jobs},$msg);          printf STDERR "%d\r",scalar(@{$jobs});      }  };  app->start;

运行这个脚本比较简单,我们可以在命令行如下运行:

$perl ./mq_srv.pl daemon -l "http://*:8080"

这样,我们就在8080端口启动了一个HTTP服务。

如果我们这个时候直接去用浏览器或者curl去访问/,大家会发现,服务器不会给你回应。因为此时队列为空。

如果这个时候有另外用户恰好扔了一个消息过来,那么刚才等待的请求会得到这个消息。

整个程序的QPS取决于两个地方,第一个是Mojo本身在处理HTTP的速度。第二个是程序存取内存数组的速度。如果是正经写MQ,通常我们会使用内存+硬盘的方案。比如说,内存可以存放10万个消息。对出来的放到磁盘上去。那么这个时候,效率将取决于如何设计这个硬盘文件格式以及如何在文件和内存中来回倒腾。这些问题也没啥太多的技术含量,纯粹是经验和技巧。

大家看着乐一乐就行了。不用较真我这个程序的效率。

另外,如果真有人希望比较效率的话,可以把上面的程序作为nginx的FCGI来运行。这样效率会高不少。