- $ echo "170" >  /usr/local/tmp/zookeeper/myid
   </div>  </div>  </div>  </td>  </tr>  </tbody>  </table>  </div>  </div>  
bin/zkServer.sh start 启动zookeeper
  
  
  
  
  
  
  二. Kafka集群构建
  
  
  1. 下载并解压Kafka
  看官方文档里介绍,先得构建scala环境, 但是我自己没有执行下面的操作,居然也能运行,有点莫名其妙,
    > tar xzf kafka-<VERSION>.tgz  > cd kafka-<VERSION>  > ./sbt update  > ./sbt package  > ./sbt assembly-package-dependency
  
  2. 修改配置文件
    修改 conf/server.properties
  
  
  zookeeper.connect=192.168.197.170:2181,192.168.197.171:2181,192.168.197.172:2181
  
  
  broker.id分别改成
  broker.id=170
  broker.id=171
  broker.id=172
  host.name分别改成(如果不改,client访问集群时,如果没在hosts配置对应机器的 hostname,访问将会报错)
  host.name=192.168.197.170
  host.name=192.168.197.171
  
  host.name=192.168.197.172
  
  可根据需求修改
  port: broker节点使用端口号 默认 9092
  log.dir: 消息目录位置
  
  
  
  3. 启动Kafka
    cd /usr/local/kafka_2.8.0-0.8.0
  JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &
  
  
  
  4. 创建Topic并查看
  bin/kafka-create-topic.sh --zookeeper 192.168.197.170:2181 --partition 1 --topic leo-test
  bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181
    topic: leo-test partition: 0 leader: 171 replicas: 171 isr: 171
  说明:
  partiton: partion id,由于此处只有一个partition,因此partition id 为0
  leader:当前负责读写的lead broker id
  relicas:当前partition的所有replication broker list
  isr:relicas的子集,只包含出于活动状态的broker
  
  
  bin/kafka-create-topic.sh --zookeeper 192.168.197.170:2181 --replica 2 --partition 2 --topic leo-test2
  topic: leo-test partition: 0 leader: 171 replicas: 171 isr: 171
  topic: leo-test2 partition: 0 leader: 171 replicas: 171,170 isr: 171,170
  topic: leo-test2 partition: 1 leader: 170 replicas: 170,171 isr: 170,171
  5.试着干掉一个非leader 的broker,然后在干掉leader broker,看看会有什么情况发生。
    命令:
  pkill -9 -f server.properties
  
  
  
  
  
  
  安装过程遇到的问题:
  1. kafka启动后提示
  Unrecognized VM option '+UseCompressedOops'
  Could not create the Java virtual machine.
  开始以为是内存大小的问题, 后来发现不是,是JDK的问题, 我用的32位centos,jdk1.6_24,  换成JDK1.7依然报错。
  查看 bin/kafka-run-class.sh 
  找到
  if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
    KAFKA_JVM_PERFORMANCE_OPTS="-server  -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
  fi
  
  去掉-XX:+UseCompressedOops
  JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &
  启动成功