Skip to content
Giwi edited this page Nov 16, 2012 · 5 revisions

Introduction

Camel-kafka is an Apache Camel component designed to interact with the Apache Kafka messaging system.

Quickstart

Here is a Quickstart guide

URI format

kafka:topicName[?options]

Dependencies

Maven users will need to add the following dependency to their pom.xml for this component. You'll also need kafka-0.7.2.jar (Download) and of course the most exciting component : camel-kafka (Download)

Options

Producer

Property Default Description
zkConnect For using the zookeeper based automatic broker discovery, use this config to pass in the zookeeper connection url to the zookeeper cluster where the Kafka brokers are registered. ie : localhost:2181
serializerClass kafka.serializer.DefaultEncoder. This is a no-op encoder. The serialization of data to Message should be handled outside the Producer class that implements the kafka.serializer.Encoder<T> interface, used to encode data of type T into a Kafka message
partitionerClass kafka.producer.DefaultPartitioner<T> - uses the partitioning strategy hash(key)%num_partitions. If key is null, then it picks a random partition. class that implements the kafka.producer.Partitioner<K>, used to supply a custom partitioning strategy on the message key (of type K) that is specified through the ProducerData<K, T> object in the kafka.producer.Producer<T> send API
producerType sync this parameter specifies whether the messages are sent asynchronously or not. Valid values are -
  • async for asynchronous batching send through kafka.producer.AyncProducer
  • sync for synchronous send through kafka.producer.SyncProducer
brokerList null. Either this parameter or zk.connect needs to be specified by the user. For bypassing zookeeper based auto partition discovery, use this config to pass in static broker and per-broker partition information. Format-brokerid1:host1:port1, brokerid2:host2:port2. If you use this option, the partitioner.class will be ignored and each producer request will be routed to a random broker partition.
bufferSize 102400 the socket buffer size, in bytes
connectTimeoutMs 5000 the maximum time spent by kafka.producer.SyncProducer trying to connect to the kafka broker. Once it elapses, the producer throws an ERROR and stops.
socketTimeoutMs 30000 The socket timeout in milliseconds
reconnectInterval 30000 the number of produce requests after which kafka.producer.SyncProducer tears down the socket connection to the broker and establishes it again
maxMessageSize 1000000 the maximum number of bytes that the kafka.producer.SyncProducer can send as a single message payload
compressioncodec 0 (No compression) This parameter allows you to specify the compression codec for all data generated by this producer.
compressedtopics null This parameter allows you to set whether compression should be turned on for particular topics. If the compression codec is anything other than NoCompressionCodec, enable compression only for specified topics if any. If the list of compressed topics is empty, then enable the specified compression codec for all topics. If the compression codec is NoCompressionCodec, compression is disabled for all topics.
zkReadNumretries 3 The producer using the zookeeper software load balancer maintains a ZK cache that gets updated by the zookeeper watcher listeners. During some events like a broker bounce, the producer ZK cache can get into an inconsistent state, for a small time period. In this time period, it could end up picking a broker partition that is unavailable. When this happens, the ZK cache needs to be updated. This parameter specifies the number of times the producer attempts to refresh this ZK cache.
Options for Asynchronous Producers (producer.type=async)
queueTime 5000 maximum time, in milliseconds, for buffering data on the producer queue. After it elapses, the buffered data in the producer queue is dispatched to the event.handler.
queueSize 10000 the maximum size of the blocking queue for buffering on the kafka.producer.AsyncProducer
batchSize 200 the number of messages batched at the producer, before being dispatched to the event.handler
eventHandler kafka.producer.async.EventHandler<T> the class that implements kafka.producer.async.IEventHandler<T> used to dispatch a batch of produce requests, using an instance of kafka.producer.SyncProducer.
eventHandlerProps null the java.util.Properties() object used to initialize the custom event.handler through its init() API
callbackHandler null the class that implements kafka.producer.async.CallbackHandler<T> used to inject callbacks at various stages of the kafka.producer.AsyncProducer pipeline.
callbackHandlerProps null the java.util.Properties() object used to initialize the custom callback.handler through its init() API

see here

Consumer

Property Default Description
zkConnect For using the zookeeper based automatic broker discovery, use this config to pass in the zookeeper connection url to the zookeeper cluster where the Kafka brokers are registered. ie : localhost:2181
groupId This is a string that uniquely identifies a set of consumers within the same consumer group.
concurrentConsumers 1 The number of concurrent threads that read the topic
socketTimeoutMs 30000 controls the socket timeout for network requests
socketBuffersize 64*1024 controls the socket receive buffer for network requests
fetchSize 300 * 1024 controls the number of bytes of messages to attempt to fetch in one request to the Kafka server
backoffIncrementMs 1000 This parameter avoids repeatedly polling a broker node which has no new data. We will backoff every time we get an empty set from the broker for this time period
queuedchunksMax 100 the high level consumer buffers the messages fetched from the server internally in blocking queues. This parameter controls the size of those queues
autocommitEnable true if set to true, the consumer periodically commits to zookeeper the latest consumed offset of each partition.
autocommitIntervalMs 10000 is the frequency that the consumed offsets are committed to zookeeper.
autooffsetReset smallest
  • smallest: automatically reset the offset to the smallest offset available on the broker.
  • largest : automatically reset the offset to the largest offset available on the broker.
  • anything else: throw an exception to the consumer.
consumerTimeoutMs -1 By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption. By setting the value to a positive integer, a timeout exception is thrown to the consumer if no message is available for consumption after the specified timeout value.
rebalanceRetriesMax 4 max number of retries during rebalance
mirrorTopicsWhitelist Whitelist of topics for this mirror's embedded consumer to consume. At most one of whitelist/blacklist may be specified.
mirrorTopicsBlacklist Topics to skip mirroring. At most one of whitelist/blacklist may be specified
mirrorConsumerNumthreads 4 The number of threads to be used per topic for the mirroring consumer, by default

see here

Exchange data format

This component consume T<Serializable> or List<T<Serializable>>. It will produce only T<Serializable>.

Message Headers

Property Default Description
topicNameHeader This is the name of the topic used by a producer, if present, it will override the topic name present in the uri.

Samples

Single message

Here is an example of sending and reading a simple message to Kafka on a topic named "TOPIC-TEST" :

public void configure() {
    from("timer://foo?fixedRate=true&period=5000")
        .setBody(constant("hello from Giwi Softwares"))
        .to("kafka:TOPIC-TEST?zkConnect=localhost:2181");
    
    // Recieving
    from("kafka:TOPIC-TEST?groupId=camelTest&zkConnect=localhost:2181").log("${body}");
}

Multiple messages

You may want to deliver a bunch of messages in a single request :

from("timer://foo?fixedRate=true&period=5000").process(new Processor() {
    @Override
    public void process(Exchange exchange) throws Exception {
        List<Serializable> list = new ArrayList<Serializable>();
        for (int i = 0; i < 100; i++) {
            list.add("This my dummy message #" + i);
        }
        exchange.getIn().setBody(list);
    }
}).to("kafka:TOPIC-TEST?zkConnect=localhost:2181");