分布式发布订阅消息系统Kafka JAVA客户端代码示例

jopen 10年前

介绍

     http://kafka.apache.org
    kafka是一种高吞吐量的分布式发布订阅消息系统
    kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)

    当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。

高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理

测试环境

    kafka_2.10-0.8.1.1 3个节点做的集群

    zookeeper-3.4.5 一个实例节点

代码示例

消息生产者代码示例

import java.util.Collections;  import java.util.Date;  import java.util.Properties;  import java.util.Random;    import kafka.javaapi.producer.Producer;  import kafka.producer.KeyedMessage;  import kafka.producer.ProducerConfig;    /**   * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example   * @author Fung   *   */  public class ProducerDemo {   public static void main(String[] args) {    Random rnd = new Random();    int events=100;      // 设置配置属性    Properties props = new Properties();    props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");    props.put("serializer.class", "kafka.serializer.StringEncoder");    // key.serializer.class默认为serializer.class    props.put("key.serializer.class", "kafka.serializer.StringEncoder");    // 可选配置,如果不配置,则使用默认的partitioner    props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo");    // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失    // 值为0,1,-1,可以参考    // http://kafka.apache.org/08/configuration.html    props.put("request.required.acks", "1");    ProducerConfig config = new ProducerConfig(props);      // 创建producer    Producer<String, String> producer = new Producer<String, String>(config);    // 产生并发送消息    long start=System.currentTimeMillis();    for (long i = 0; i < events; i++) {     long runtime = new Date().getTime();     String ip = "192.168.2." + i;//rnd.nextInt(255);     String msg = runtime + ",www.example.com," + ip;     //如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0     KeyedMessage<String, String> data = new KeyedMessage<String, String>(       "page_visits", ip, msg);     producer.send(data);    }    System.out.println("耗时:" + (System.currentTimeMillis() - start));    // 关闭producer    producer.close();   }  }

消息消费者代码示例

import java.util.HashMap;  import java.util.List;  import java.util.Map;  import java.util.Properties;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;    import kafka.consumer.Consumer;  import kafka.consumer.ConsumerConfig;  import kafka.consumer.KafkaStream;  import kafka.javaapi.consumer.ConsumerConnector;    /**   * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example   *    * @author Fung   *   */  public class ConsumerDemo {   private final ConsumerConnector consumer;   private final String topic;   private ExecutorService executor;     public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {    consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));    this.topic = a_topic;   }     public void shutdown() {    if (consumer != null)     consumer.shutdown();    if (executor != null)     executor.shutdown();   }     public void run(int numThreads) {    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();    topicCountMap.put(topic, new Integer(numThreads));    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer      .createMessageStreams(topicCountMap);    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);      // now launch all the threads    executor = Executors.newFixedThreadPool(numThreads);      // now create an object to consume the messages    //    int threadNumber = 0;    for (final KafkaStream stream : streams) {     executor.submit(new ConsumerMsgTask(stream, threadNumber));     threadNumber++;    }   }     private static ConsumerConfig createConsumerConfig(String a_zookeeper,     String a_groupId) {    Properties props = new Properties();    props.put("zookeeper.connect", a_zookeeper);    props.put("group.id", a_groupId);    props.put("zookeeper.session.timeout.ms", "400");    props.put("zookeeper.sync.time.ms", "200");    props.put("auto.commit.interval.ms", "1000");      return new ConsumerConfig(props);   }     public static void main(String[] arg) {    String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" };    String zooKeeper = args[0];    String groupId = args[1];    String topic = args[2];    int threads = Integer.parseInt(args[3]);      ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);    demo.run(threads);      try {     Thread.sleep(10000);    } catch (InterruptedException ie) {      }    demo.shutdown();   }  }

消息处理类

import kafka.consumer.ConsumerIterator;  import kafka.consumer.KafkaStream;    public class ConsumerMsgTask implements Runnable {   private KafkaStream m_stream;   private int m_threadNumber;     public ConsumerMsgTask(KafkaStream stream, int threadNumber) {    m_threadNumber = threadNumber;    m_stream = stream;   }     public void run() {    ConsumerIterator<byte[], byte[]> it = m_stream.iterator();    while (it.hasNext())     System.out.println("Thread " + m_threadNumber + ": "       + new String(it.next().message()));    System.out.println("Shutting down Thread: " + m_threadNumber);   }  }

Partitioner类示例

import kafka.producer.Partitioner;  import kafka.utils.VerifiableProperties;    public class PartitionerDemo implements Partitioner {   public PartitionerDemo(VerifiableProperties props) {     }     @Override   public int partition(Object obj, int numPartitions) {    int partition = 0;    if (obj instanceof String) {     String key=(String)obj;     int offset = key.lastIndexOf('.');     if (offset > 0) {      partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;     }    }else{     partition = obj.toString().length() % numPartitions;    }        return partition;   }    }

参考

https://cwiki.apache.org/confluence/display/KAFKA/Index

https://kafka.apache.org/

来自:http://my.oschina.net/cloudcoder/blog/299215