Play! Akka Flume实现的完整数据收集

jopen 9年前

  1. 前言

    现如今,大数据如火如荼。针对用户行为,用户喜好等后续大数据分析也是十分火热。这个小项目实现了后台数据收集的一系列完整流程。

  2. 项目总体流程以及用到的技术

    Play ! 作为web服务器,使用RESTful 规范编写接口(客户端事先埋点,然后调用接口上传数据)

       Play !接口接受到的记录(json形式)经过处理后,先保存到 concurrentQueue中 

       Play! 启动后,start一个Akka schedulable actor.他每隔一段时间,让子actor去poll queue中的数据 

       调用flume的封装的rpc,将数据发送到指定的端口。

       Flume source端接收数据,按照配置重定向数据,sink到console.


  3. 后台实现

      3.1 编写接口

       采用RESTful编写接口,首先在play! 的conf中routes定义接口:

#run log  POST  /events/runlogs       controllers.RunLogs.create()

     然后编写controller

    public static Result create(){            JsonNode js = request().body().asJson();          RunLog.create(js);          //return ok anyway          return ok();      }

   然后是model

    public static void create(JsonNode js) {            if (js.has(LOG)) {              String logBody = js.findPath(LOG).asText();              //add one log into queue              QueueManager.INSTANCE.addRunLog(logBody);          }        }

可以看到,这些代码遵循MVC规范,首先让play!知道接口的定义,前端发送过来请求的时候,知道调用哪个controller中的哪个方法,并返回数据。而controller将数据从请求体中剥离出来,并发送给真正处理数据的model.

  3.2 Queue

 看到model中,接收到数据后,添加到queue中保存。

 定义为:ConcurrentLinkedQueue<String> 

 3.3 Akka 定时调度

 Akka负责定时从queue取出数据,然后通过rpc发送给flume。

 akka的初始化,启动是伴随着play! 的启动而进行的,每个play!只有一个akka system。所以首先要编写一个Global extends GlobalSetting(GlobalSettings is instantiated by the framework when an application starts,to let you perform specific tasks at start-up or shut-down),然后override onStart方法,在此方法中初始化akka的调度器。

代码如下:

        ActorRef dispatcher = Akka.system().actorOf(new Props(Dispatcher.class));            Akka.system().scheduler().schedule(                  Duration.create(200,TimeUnit.MICROSECONDS),                  Duration.create(2,TimeUnit.SECONDS),                  dispatcher,                  "run",                  Akka.system().dispatcher(),                  null          );

 可以看到,每个两秒钟,scheduler就会调用dispatcher,让他工作。

dispatcher在这里相当于一个master,他接到工作后,会通知自己的slave去工作(发送数据给rpc)。

代码如下:

    ActorRef workRouter = getContext().actorOf(              new Props(WorkerActor.class).withRouter(new RoundRobinRouter(40))              , "transferRouter");        @Override      public void onReceive(Object message) throws Exception {          ConcurrentLinkedQueue<String> runLogs = QueueManager.INSTANCE.getRunLogs();          dispatch(runLogs);        }        private void dispatch(Queue<?> queue) {            if (!queue.isEmpty()) {              Object obj = queue.poll();              List<String> data = new ArrayList<>();              while (obj != null) {                  data.add(obj.toString());                  obj = queue.poll();              }                workRouter.tell(new DispatchMsg("runlogs", data), getSelf());          }      }

 首先定义了一个router,他负责按照轮训算法,找到到底要让哪个slave去工作。

当dispatcher收到消息后,就让router通知WorkerActor去工作,并把从queue取出的数据给他,让他将这些数据通过rpc发送给远端的flume。

这样设计的目的在于:

    接口接受消息,暂时保存在queue中,快速回复客户端,不堵塞。

    利用akka并发能力,从queue中取出消息,找到一个worker去进行耗时较长的rpc工作。


workeractor

    @Override      public void onReceive(Object message) throws Exception {          if (message instanceof DispatchMsg) {              DispatchMsg msg = (DispatchMsg) message;              String business = msg.business;              List<String> datas = (List<String>) msg.data;                sendMsg.ToFlume.sendDataToFlume(datas);            }      }

3.4 rpc发送

flume集成了rpc

    public void sendDataToFlume(List<String> datas) {            List<Event> es = new LinkedList<Event>();          for (String data : datas){              // Create a Flume Event object that encapsulates the sample data              Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));              es.add(event);          }            // Send the event          try {              client.appendBatch(es);              System.out.println("data sent");          } catch (EventDeliveryException e) {              // clean up and recreate the client                client.close();              client = null;              client = RpcClientFactory.getDefaultInstance(hostname, port);          }      }

这里接受的数据为list,批处理。

首先新建client,这里注意hostname跟port,是flume服务器端source的ip跟端口。

然后批量发送数据。

4. Flume

flume的配置内容如下:exemple.conf

a1.channels = c1  a1.sources = r1  a1.sinks = k1    a1.channels.c1.type = memory    a1.sources.r1.channels = c1  a1.sources.r1.type = avro    a1.sources.r1.bind = 0.0.0.0   a1.sources.r1.port = 41414    a1.sinks.k1.channel = c1  a1.sinks.k1.type = logger

此处flume端为简单的单点配置,source接收41414的rpc消息,然后保存到channel中,sink到console中(数据收集一般sink到HDFS中,并且可以多点收集)。

启动命令如下:

bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console


好了,启动flume后,启动play!然后利用客户端发送消息,就可以在flume端看到消息打印到console了。

项目的所有代码在 https://github.com/YulinGUO/collectEvents

如果有问题,请留言。

来自:http://my.oschina.net/yulinguo/blog/372191