• 1. 大数据(离线/实时)计算架构
  • 2. 整体架构整体架构
  • 3. 实时计算流程1.数据采集:负责从各节点上实时采集数据,选用cloudera的flume来实现 2.数据接入:由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,选用apache的kafka 3.流式计算:对采集到的数据进行实时分析,选用apache的storm 4.数据输出:对分析后的结果持久化,暂定用mysql 对应的大数据组件架构是: Zookeeper+Flume-ng+Kafka+Storm+Mysql
  • 4. Flume是Cloudera提供的一个分布式、可靠、和高可用的海量日志采集、聚合和传输的日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。 Flume的简介
  • 5. Source: 从数据发生器接收数据,并将接收的数据以Flume的event格式传递给一个或者多个通道channal,Flume提供多种数据接收的方式,比如Avro,Thrift,twitter1%等 Channel: channal是一种短暂的存储容器,它将从source处接收到的event格式的数据缓存起来,直到它们被sinks消费掉,它在source和sink间起着一共桥梁的作用,channal是一个完整的事务,这一点保证了数据在收发的时候的一致性. 并且它可以和任意数量的source和sink链接. 支持的类型有: JDBC channel , File System channel , Memort channel等. sink: sink将数据存储到集中存储器比如Hbase和HDFS,它从channals消费数据(events)并将其传递给目标地. 目标地可能是另一个sink,也可能HDFS,HBase. Flume的简介
  • 6. Flume常用架构之一
  • 7. Flume常用架构二
  • 8. 1.下载安装包version:1.6.0 2.安装JDK,并配置JDK的环境变量 3.解压flume安装包,解压完配置环境变量 4.修改配置文件,将flume-env.sh.template文件复制一份并重命名为flume-env.sh,然后在该文件中修改JAVA_HOME的安装路径并保存即可 3.验证flume安装的正确性: 5.验证flume安装是否成功。 Flume安装部署
  • 9. Flume的典型的应用场景:监听文件夹,数据写入文件夹内时将数据汇聚到HDFS: 首先在flume的安装目录下进入conf目录下并新建xxx.conf的文件,并开始写conf的内容: 1.定义agent: agent1.channels=c1 agent1.sources=avro-source1 agent1.sinks=k1 2.定义source: #define sources agent1.sources.avro-source1.channels=c1 agent1.sources.avro-source1.type=spooldir agent1.sources.avro-source1.spoolDir=/DataCollect/data/Agricultural/ agent1.sources.avro-source1.fileHeader=false agent1.sources.avro-source1.basenameHeader = true agent1.sources.avro-source1.basenameHeaderKey = basename 3.定义channel: #define channels agent1.channels.c1.type=file agent1.channels.c1.checkpointDir=/root/flume/about_check agent1.channels.c1.dataDirs=/root/flume/about_data agent1.channels.c1.transactionCapacity=30000 agent1.channels.c1.capacity=500000 Flume的使用
  • 10. Flume的典型的应用场景:监听文件夹,数据写入文件夹内时将数据汇聚到HDFS: 3.定义sink #define sinks agent1.sinks.k1.channel=c1 agent1.sinks.k1.type=hdfs agent1.sinks.k1.hdfs.path=hdfs://192.168.185.15:8020/DataCollect/Agricultural/data/%y-%m-%d/%{basename} agent1.sinks.k1.hdfs.filePrefix=%{basename} agent1.sinks.k1.hdfs.useLocalTimeStamp=true agent1.sinks.k1.hdfs.inUserSuffix=.tmp agent1.sinks.k1.hdfs.writeFormat=Text agent1.sinks.k1.hdfs.fileType=DataStream agent1.sinks.k1.hdfs.rollInterval=0 agent1.sinks.k1.hdfs.rollSize=104857600 agent1.sinks.k1.hdfs.batchSize=1000 agent1.sinks.k1.hdfs.threadsPoolSize=30 agent1.sinks.k1.hdfs.callTimeout=30000 #agent1.sinks.k1.hdfs.round = true #agent1.sinks.k1.hdfs.roundValue = 1 #agent1.sinks.k1.hdfs.roundUnit = minute agent1.sinks.k1.hdfs.retryInterval=3 agent1.sinks.k1.hdfs.minBlockReplicas=1 agent1.sinks.k1.hdfs.rollCount=0 agent1.sources.avro-source1.threads=50 Flume的使用
  • 11. Flume的典型的应用场景:实时动态的去检测文件中新增部分,并追加到HDFS 自定义source: #define sources agent1.sources.avro-source1.type = exec agent1.sources.avro-source1.command = tail -F /var/log/messages agent1.sources.avro-source1.channels = c1 agent1.sources.avro-source1.threads=50 Flume的使用
  • 12. Flume的典型的应用场景:多sink场景  #flume配置文件    agent1.sources=avrosource    agent1.sinks=hdfssink1 hdfssink2    agent1.channels=filechannel        #sink groups 可以用空格分开配置多个    非常影响性能关闭    #agent1.sinkgroups=hdfsGroup   #agent1.sinkgroups.hdfsGroup.sinks = hdfssink1 hdfssink2    #sink调度模式 load_balance  failover    agent1.sinkgroups.hdfsGroup.processor.type=load_balance    #负载均衡模式  轮询  random  round_robin    agent1.sinkgroups.hdfsGroup.processor.selector=round_robin   传输数据选择机制,默认是轮调,生产机设置为随机,后期优化可测试调整 #agent1.sinkgroups. hdfsGroup.processor.selector= random   #失效降级    agent1.sinkgroups.hdfsGroup.processor.backoff=true    #降级时间30秒    agent1.sinkgroups.hdfsGroup.processor.maxTimeOut=30000 Flume的使用
  • 13. Flume的启动/停止命令: #conf NAME conf=xxx.conf #task NAME task_name=xxxx #Flume PATH flumedir=/usr/hdp/current/flume-server #log PATH logdir=/usr/hdp/current/flume-server/logs 启动:nohup $flumedir/bin/flume-ng agent -n agent1 -f $flumedir/conf/$conf -Dname=$task_name,console > $logdir/current-$logdate.log 2>&1 & 停止:kill -9 \$(ps -ef |grep java |grep \"name=$task_name\" |awk '{print \$2}') Flume的使用
  • 14. Zookeeper 是为分布式应用程序提供高性能协调服务的工具集合,也是Google的Chubby一个开源的实现,是Hadoop 的分布式协调服务。它包含一个简单的原语集5,分布式应用程序可以基于它实现配置维护、命名服务、分布式同步、组服务等。Zookeeper可以用来保证数据在ZK集群之间的数据的事务性一致6。其中ZooKeeper提供通用的分布式锁服务7,用以协调分布式应用。 Zookeeper作为Hadoop项目中的一个子项目,是 Hadoop集群管理的一个必不可少的模块,它主要用来解决分布式应用中经常遇到的数据管理问题,如集群管理、统一命名服务、分布式配置管理、分布式消息队列、分布式锁、分布式协调等。在Hadoop中,它管理Hadoop集群中的NameNode,还有在Hbase中Master Election、Server 之间状态同状步等。 Zoopkeeper 提供了一套很好的分布式集群管理的机制,就是它这种基于层次型的目录树的数据结构,并对树中的节点进行有效管理,从而可以设计出多种多样的分布式的数据管理模型。zookeeper简介
  • 15. 集群中的角色: 在ZooKeeper集群当中,集群中的服务器角色有两种Leader和Learner,Learner角色又分为Observer和Follower,具体功能如下: 1.领导者(leader),负责进行投票的发起和决议,更新系统状态 2.学习者(learner),包括跟随者(follower)和观察者(observer), 3.follower用于接受客户端请求并向客户端返回结果,在选主过程中参与投票 4.Observer可以接受客户端请求,将写请求转发给leader,但observer不参加投票过程,只同步leader的状态,observer的目的是为了扩展系统,提高读取速度。 5. 客户端(client),请求发起方 注意: 通常Zookeeper由2n+1台servers组成,每个server都知道彼此的存在。每个server都维护的内存状态镜像以及持久化存储的事务日志和快照。为了保证Leader选举能过得到多数的支持,所以ZooKeeper集群的数量一般为奇数。对于2n+1台server,只要有n+1台(大多数)server可用,整个系统保持可用。 Zookeeper架构
  • 16. 1.下载安装包:version:3.4.8 2.解压完之后,进入zk的主目录下的conf目录下,复制配置文件并重命名: cp zoo_sample.cfg zoo.cfg 制作dataLogDir和运行Dir sudo mkdir /tmp/zookeeper sudo mkdir /var/log/zookeeper 3.在配置文件中添加以下内容: Zookeeper的安装部署
  • 17. 4.在/tmp/zookeeper文件夹下新建文件:myid 添加server序号到myid中,如: server.1 机器的myid 只需要加入数字 1 即可; 5.启动zookeeper bin/zkServer.sh start 刚起来的时候日志中会报错,全部机器都起来后,就不会报错了 6. 更改日志路径 默然日志是放到"."下的zookeeper.out 修改zkEnv.sh,添加 : vim zkEnv.sh ZOO_LOG_DIR=/var/log/zookeeper Zk在kafka中的作用:zookeeper在kafka中的作用 1)Broker注冊 Broker在zookeeper中保存为一个临时节点,节点的路径是/brokers/ids/[brokerid],每个节点都保存对应的ID Zk在storm中的作用:Storm的所有的状态信息都保存在Zookeeper里面,nimbus通过在zookeeper上面写状态信息来分配任务: 使得nimbus可以监控整个storm集群的状态,从而可以重启一些挂掉的task。 ZooKeeper使得整个storm集群十分的健壮-—任何一台工作机器挂掉都没有关系,只要重启然后从zookeeper上面重新获取状态信息就可以了。 Zookeeper的安装部署
  • 18. Kafka生产的数据,是由Flume的Sink提供的,这里我们需要用到Flume集群,通过Flume集群将Agent的日志收集分发到 Kafka(供实时计算处理)和HDFS(离线计算处理)。 kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。 高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。 支持通过kafka服务器和消费机集群来分区消息。 支持Hadoop并行数据加载。 kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。 Kafka简介
  • 19. kafka分布式订阅架构如下图: Kafka架构Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。 1.topic:消息存放的目录即主题 2.Producer:生产消息到topic的一方 3.Consumer:订阅topic消费消息的一方 4.Broker:Kafka的服务实例就是一个broker
  • 20. Kafka架构Kafka Topic&Partition: 消息发送时都被发送到一个topic,其本质就是一个目录,而topic由是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:
  • 21. Kafka架构 Kafka Topic&Partition: 我们可以看到,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。  Kafka集群会保存所有的消息,不管消息有没有被消费;我们可以设定消息的过期时间,只有过期的数据才会被自动清除以释放磁盘空间。比如我们设置消息过期时间为2天,那么这2天内的所有消息都会被保存到集群中,数据只有超过了两天才会被清除。  Kafka需要维持的元数据只有一个–消费消息在Partition中的offset值,Consumer每消费一个消息,offset就会加1。其实消息的状态完全是由Consumer控制的,Consumer可以跟踪和重设这个offset值,这样的话Consumer就可以读取任意位置的消息。
  • 22. Kafka 核心组件Replications、Partitions 和Leaders 通过上面介绍的我们可以知道,kafka中的数据是持久化的并且能够容错的。Kafka允许用户为每个topic设置副本数量,副本数量决定了有几个broker来存放写入的数据。如果你的副本数量设置为3,那么一份数据就会被存放在3台不同的机器上,那么就允许有2个机器失败。一般推荐副本数量至少为2,这样就可以保证增减、重启机器时不会影响到数据消费。如果对数据持久化有更高的要求,可以把副本数量设置为3或者更多。  Kafka中的topic是以partition的形式存放的,每一个topic都可以设置它的partition数量,Partition的数量决定了组成topic的log的数量。Producer在生产数据时,会按照一定规则(这个规则是可以自定义的)把消息发布到topic的各个partition中。上面将的副本都是以partition为单位的,不过只有一个partition的副本会被选举成leader作为读写用。  关于如何设置partition值需要考虑的因素。一个partition只能被一个消费者消费(一个消费者可以同时消费多个partition),因此,如果设置的partition的数量小于consumer的数量,就会有消费者消费不到数据。所以,推荐partition的数量一定要大于同时运行的consumer的数量。另外一方面,建议partition的数量大于集群broker的数量,这样leader partition就可以均匀的分布在各个broker中,最终使得集群负载均衡。
  • 23. Kafka 核心组件 Producers Producers直接发送消息到broker上的leader partition,不需要经过任何中介一系列的路由转发。为了实现这个特性,kafka集群中的每个broker都可以响应producer的请求,并返回topic的一些元信息,这些元信息包括哪些机器是存活的,topic的leader partition都在哪,现阶段哪些leader partition是可以直接被访问的。  Producer客户端自己控制着消息被推送到哪些partition。Kafka提供了接口供用户实现自定义的分区,用户可以为每个消息指定一个partitionKey,通过这个key来实现一些hash分区算法。比如,把userid作为partitionkey的话,相同userid的消息将会被推送到同一个分区。 以Batch的方式推送数据可以极大的提高处理效率,kafka Producer 可以将消息在内存中累计到一定数量后作为一个batch发送请求。Batch的数量大小可以通过Producer的参数控制,参数值可以设置为累计的消息的数量(如500条)、累计的时间间隔(如100ms)或者累计的数据大小(64KB)。通过增加batch的大小,可以减少网络请求和磁盘IO的次数。 Kafka 消息有一个定长的header和变长的字节数组组成。因为kafka消息支持字节数组,也就使得kafka可以支持任何用户自定义的序列号格式或者其它已有的格式如Apache Avro、protobuf等。Kafka没有限定单个消息的大小。
  • 24. Kafka 核心组件Consumers 在kafka中,当前读到消息的offset值是由consumer来维护的,因此,consumer可以自己决定如何读取kafka中的数据。比如,consumer可以通过重设offset值来重新消费已消费过的数据。不管有没有被消费,kafka会保存数据一段时间,这个时间周期是可配置的,只有到了过期时间,kafka才会删除这些数据。
  • 25. Kafka特性消息可靠性: 在消息系统中,保证消息在生产和消费过程中的可靠性是十分重要的,在实际消息传递过程中,可能会出现如下三中情况: 1.一个消息发送失败 2.一个消息被发送多次 3.最理想的情况:exactly-once ,一个消息发送成功且仅发送了一次 有许多系统声称它们实现了exactly-once,但是它们其实忽略了生产者或消费者在生产和消费过程中有可能失败的情况。比如虽然一个Producer成功发送一个消息,但是消息在发送途中丢失,或者成功发送到broker,也被consumer成功取走,但是这个consumer在处理取过来的消息时失败了。  从Producer端看:Kafka是这么处理的,当一个消息被发送后,Producer会等待broker成功接收到消息的反馈(可通过参数控制等待时间),如果消息在途中丢失或是其中一个broker挂掉,Producer会重新发送(我们知道Kafka有备份机制,可以通过参数控制是否等待所有备份节点都收到消息)。  从Consumer端看:前面讲到过partition,broker端记录了partition中的一个offset值,这个值指向Consumer下一个即将消费message。当Consumer收到了消息,但却在处理过程中挂掉,此时Consumer可以通过这个offset值重新找到上一个消息再进行处理。Consumer还有权限控制这个offset值,对持久化到broker端的消息做任意处理。
  • 26. kafka 应用场景: 日志收集:可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。 消息系统:解耦和生产者和消费者、缓存消息等。 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。 流式处理:比如spark streaming和storm Kafka架构
  • 27. 1.下载安装包:version:0.9.0.1 2.解压并配置kafka的环境变量 3.进入kafka的安装目录conf下,然后修改server.properties文件,该文件主要的几个配置参数为: zookeeper.connect=10.4.82.73:2181 Port:9092 Broker.id=0 log.dirs=/tmp/kafka-logs 具体的配置文件中各个参数的意思可见下表: Kafka安装部署
  • 28. #server.properties配置文件 broker.id=1 port=9092 host.name=10.0.0.8 zookeeper.connect=10.0.0.8:2181,10.0.0.0.9:2181,10.0.0.10:2181 num.network.threads=8   #broker处理消息的最大线程数 一般等于核心数    num.io.threads=8# 同上 socket.send.buffer.bytes=1048576       #socket 发送缓冲区 socket调优参数 socket.receive.buffer.bytes=1048576     #接收缓冲区  socket调优参数 socket.request.max.bytes=104857600     #socket请求最大数值,防止serverOOM log.dirs=/data/kafka/kafka-logs num.partitions=8                        #默认制定分区 会被命令行参数覆盖 log.retention.check.interval.ms=60000     #文件大小检查周期 log.cleaner.enable=false           #是否启用压缩 segment.ms=24*60*60                  num.replica.fetchers=4           #leader 进行复制的线程数。 增大这个数值会增加follow的io default.replication.factor=2       # 创建topic的时候的副本数,可以创建topic时制定参数覆盖      replica.fetch.max.bytes=2048000    # replicas 每次获取数据的最大大小   replica.fetch.wait.max.ms=500        # replicas 同leader之间的通信的最大等待时间,失败了会重试。 replica.high.watermark.checkpoint.interval.ms=5000   #每个replica检查是否将最高水平进行固化的频率 Kafka安装部署
  • 29. replica.socket.timeout.ms=30000       #follower与leader之间的socket超时时间 replica.socket.receive.buffer.bytes=65536   #leader复制时间的socket缓存大小 replica.lag.time.max.ms=10000              #replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中 replica.lag.max.messages=4000            #如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效 ##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后 ##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移 ##到其他follower中. ##在broker数量较少,或者网络不足的环境中,建议提高此值. controller.socket.timeout.ms=30000   # partition leader与replicas之间通讯时,socket的超时时间 controller.message.queue.size=10   #partition leader与replicas 数据同步时,消息的队列尺寸                message.max.bytes=2048000   #消息体的最大大小 但是是字节 -------------------- auto.create.topics.enable=true  #是否允许自动创建topic,若是false就需要通过命令创建topic log.index.interval.bytes=4096      #当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数 log.index.size.max.bytes=10485760  #对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖 log.retention.hours=24             #数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理。 log.flush.interval.ms=10000      #仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.   --------Kafka安装部署
  • 30. log.flush.interval.messages=20000 #log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失. log.flush.scheduler.interval.ms=2000 #检查是否需要固化到硬盘的时间间隔 log.roll.hours=24 #这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖 log.cleanup.interval.mins=30 #检查处理规则间隔 log.segment.bytes=1073741824 #一个消息长度 超过在创建一个 zookeeper.connection.timeout.ms=6000 #zookeeper连接超时时间 zookeeper.sync.time.ms=2000 #一个zk flower能落后leader多久 fetch.purgatory.purge.interval.requests=1000 ## f防止oom 的参数 用于request 状态转变为complete后从purgatory中移除。 producer.purgatory.purge.interval.requests=1000 ## f防止oom 的参数 用于request 状态转变为complete后从purgatory中移除。Kafka安装部署
  • 31. Kafka的启动命令: kafka-server-start.sh /home/admin/kafka_2.11-0.9.0.1/config/server.properties & 停止命令: 直接kill -9 相应的进程号 测试安装部署是否正确:打开2个终端:分别在不同的终端启动kafka-console-producer.sh和kafka-console-cousumer.sh这个只是系统提供的命令行工具。这里启动是为了测试是否能正常生产消费;验证流程正确性。 生产者命令如下: kafka-console-producer.sh --broker-list 10.4.82.73:9092,10.4.82.110:2181 –topic $topic_name 消费者命令如下: kafka-console-consumer.sh --zookeeper 10.4.82.73:2181 --topic $topic_name --from beginning 只要在生产者命令行下随意输入,如果在消费者端看到生产者输入的信息,表示安装成功。Kafka的使用—shell
  • 32. 常用的kafka概念: Consumergroup: 各个consumer可以组成一个组,每个消息只能被组中的一个consumer消费,如果一个消息可以被多个consumer消费的话,那么这些consumer必须在不同的组。 消息状态: 在Kafka中,消息的状态被保存在consumer中,broker不会关心哪个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着如果consumer处理不好的话,broker上的一个消息可能会被消费多次。 消息持久化: Kafka中会把消息持久化到本地文件系统中,并且保持极高的效率。 消息有效期: Kafka会长久保留其中的消息,以便consumer可以多次消费,当然其中很多细节是可配置的。 批量发送: Kafka支持以消息集合为单位进行批量发送,以提高push效率。 push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。 Kafka集群中broker之间的关系:不是主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点。 负载均衡方面: Kafka提供了一个 metadata API来管理broker之间的负载(对Kafka0.8.x而言,对于0.7.x主要靠zookeeper来实现负载均衡)。 同步异步: Producer采用异步push方式,极大提高Kafka系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。 分区机制partition: Kafka的broker端支持消息分区,Producer可以决定把消息发到哪个分区,在一个分区中消息的顺序就是Producer发送消息的顺序,一个主题中可以有多个分区,具体分区的数量是可配置的。分区的意义很重大,后面的内容会逐渐体现。 离线数据装载: Kafka由于对可拓展的数据持久化的支持,它也非常适合向Hadoop或者数据仓库中进行数据装载。 插件支持:现在不少活跃的社区已经开发出不少插件来拓展Kafka的功能,如用来配合Storm、Hadoop、flume相关的插件。Kafka的使用—shell
  • 33. public MyProducer() { Properties props = new Properties(); //指定代理服务器的地址 props.put("metadata.broker.list", "192.168.66.2:9092,192.168.66.3:9092,192.168.66.4:9092"); //配置value的序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); //配置key的序列化类 props.put("key.serializer.class", "kafka.serializer.StringEncoder"); //指定分区 props.put("partitioner.class", "com.hashleaf.kafka.MyPartitioner"); props.put("request.required.acks","-1"); //创建producer 对象 producer = new Producer(new ProducerConfig(props)); } public void produce(){ int count = 1000; for (int i = 0; i < count; i++) { String key = String.valueOf(i); String message = "hashleaf-"+i; producer.send(new KeyedMessage(HASHLEAF_KAFKA_TOPIC, key ,message)); } //发送完成后关闭 //producer.close(); }Kafka的使用—java producer
  • 34. static { prop = new Properties(); prop.setProperty("zookeeper.connect", "10.4.82.73:2181"); prop.setProperty("group.id", "1"); prop.setProperty("zookeeper.session.timeout.ms", "6000"); prop.setProperty("zookeeper.sync.time.ms", "200"); prop.setProperty("auto.commit.interval.ms", "1000"); } // 消费数据,构建流 public static void consumer() { ConsumerConfig config = new ConsumerConfig(prop); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); Map topicmap = new HashMap(); topicmap.put("renren-topic", 1); Map>> messagestream = consumer .createMessageStreams(topicmap); KafkaStream stream = messagestream.get("renren-topic") .get(0); ConsumerIterator iterator = stream.iterator(); while (iterator.hasNext()) { String message = new String(iterator.next().message()); System.out.println("**********" + message); InsertMysql.inserdata(message); } } Kafka的使用—java consumer
  • 35. 启动ZK : bin/zookeeper-server-start.shconfig/zookeeper.properties 启动服务: bin/kafka-server-start.sh config/server.properties>/dev/null 2>&1& 创建主题 : bin/kafka-topics.sh --create --zookeeperlocalhost:2181 --replication-factor 1 --partitions 1 --topic test 查看主题 : bin/kafka-topics.sh --list --zookeeperlocalhost:2181 查看主题详情: bin/kafka-topics.sh--describe --zookeeper localhost:2181 --topic test 删除主题 : bin/kafka-run-class.shkafka.admin.DeleteTopicCommand --topic test --zookeeper localhost:2181 创建生产者: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 创建消费者: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test--from-beginning Kafka的使用—常用命令操作
  • 36. KafkaOffsetMonitor是有由Kafka开源社区提供的一款Web管理界面,这个应用程序用来实时监控Kafka服务的Consumer以及它们所在的Partition中的Offset,你可以通过浏览当前的消费者组,并且每个Topic的所有Partition的消费情况都可以观看的一清二楚。它让我们很直观的知道,每个Partition的Message是否消费掉,有木有阻塞等等。 这个Web管理平台保留的Partition、Offset和它的Consumer的相关历史数据,我们可以通过浏览Web管理的相关模块,清楚的知道最近一段时间的消费情况。 该Web管理平台有以下功能: 1.对Consumer的消费监控,并列出每个Consumer的Offset数据 保护消费者组列表信息 2.每个Topic的所有Partition列表包含:Topic、Pid、Offset、LogSize、Lag以及Owner等等 3.浏览查阅Topic的历史消费信息Kafka的监控插件-kafkaOffsetmonitor
  • 37. 安装部署 KafkaOffsetMonitor的安装部署较为简单,所有的资源都打包到一个JAR文件中了,因此,直接运行即可,省去了我们去配置。这里我们可以新建一个目录单独用于Kafka的监控目录,我这里新建一个kafka_monitor文件目录,然后我们在准备启动脚本,脚本内容如下所示: java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \     com.quantifind.kafka.offsetapp.OffsetGetterWeb \     --zk h1:2181 \     --port 8089 \     --refresh 10.seconds \     --retain 1.days 解释下这条启动命令的含义,首先我们需要指明运行Web监控的类,然后需要用到ZooKeeper,所有要填写ZK集群信息,接着是Web运行端口,页面数据刷新的时间以及保留数据的时间值。 可以写个启动脚本: vi start.sh nohup java -cp ./kom-local.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 192.168.1.110:2181,192.168.1.111:2181 --port xxxx--refresh 10.seconds --retain 3.days & 访问: http://IP:xxxx/ Kafka的监控插件-kafkaOffsetmonitor
  • 38. Kafka的监控插件-kafkaOffsetmonitor一些参数的含义如下: Topic:创建Topic名称 Partition:分区编号 Offset:表示该Parition已经消费了多少Message  LogSize:表示该Partition生产了多少Message Lag:表示有多少条Message未被消费 Owner:表示消费者 Created:表示该Partition创建时间 Last Seen:表示消费状态刷新最新时间
  • 39. Storm是一个分布式的、容错的实时计算系统,可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm之于实时处理,就好比Hadoop之于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息。可以使用任意编程语言来做开发。 Storm的优点 1. 简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。 2. 服务化,一个服务框架,支持热部署,即时上线或下线App. 3. 可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。 4. 容错性。Storm会管理工作进程和节点的故障。 5. 水平扩展。计算是在多个线程、进程和服务器之间并行进行的。 6. 可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失败时,它会负责从消息源重试消息。 7. 快速。系统的设计保证了消息能得到快速的处理,使用ZeroMQ作为其底层消息队列。 8. 本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。这让你可以快速进行开发和单元测试。 Storm目前存在的问题: 1.目前的开源版本中只是单节点Nimbus,挂掉只能自动重启,可以考虑实现一个双nimbus的布局。 2.Clojure是一个在JVM平台运行的动态函数式编程语言,优势在于流程计算,Storm的部分核心内容由Clojure编写,虽然性能上提高不少但同时也提升了维护成本。 Storm简介
  • 40. Storm集群由一个主节点和多个工作节点组成。主节点运行了一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测。每个工作节点都运行了一个名为“Supervisor”的守护进程,用于监听工作,开始并终止工作进程。Nimbus和Supervisor都能快速失败,而且是无状态的,这样一来它们就变得十分健壮,两者的协调工作是由Zookeeper来完成的。ZooKeeper用于管理集群中的不同组件。Storm架构
  • 41. Storm术语解释: Storm的术语包括Stream、Spout、Bolt、Task、Worker、Stream Grouping和Topology。Stream是被处理的数据。Sprout是数据源。Bolt处理数据。Task是运行于Spout或Bolt中的线程。Worker是运行这些线程的进程。Stream Grouping规定了Bolt接收什么东西作为输入数据。数据可以随机分配(术语为Shuffle),或者根据字段值分配(术语为Fields),或者 广播(术语为All),或者总是发给一个Task(术语为Global),也可以不关心该数据(术语为None),或者由自定义逻辑来决定(术语为Direct)。Topology是由Stream Grouping连接起来的Spout和Bolt节点网络.下面进行详细介绍: Topologies 用于封装一个实时计算应用程序的逻辑,类似于Hadoop的MapReduce Job Stream 消息流,是一个没有边界的tuple序列,这些tuples会被以一种分布式的方式并行地创建和处理 Spouts 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息:tuple Bolts 消息处理者,所有的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生输出的新数据流,可执行过滤,聚合,查询数据库等操作 Task 每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程. Stream groupings 消息分发策略,定义一个Topology的其中一步是定义每个tuple接受什么样的流作为输入,stream grouping就是用来定义一个stream应该如果分配给Bolts们. Storm架构
  • 42. stream grouping分类: 1. Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同. 2. Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts. 3. All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到. 4. Global Grouping: 全局分组,这个tuple被分配到storm中的一个bolt的其中一个task.再具体一点就是分配给id值最低的那个task. 5. Non Grouping: 不分组,意思是说stream不关心到底谁会收到它的tuple.目前他和Shuffle grouping是一样的效果,有点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程去执行. 6. Direct Grouping: 直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者举鼎由消息接收者的哪个task处理这个消息.只有被声明为Direct Stream的消息流可以声明这种分组方法.而且这种消息tuple必须使用emitDirect方法来发射.消息处理者可以通过TopologyContext来或者处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)Storm架构
  • 43. Storm如何保证消息被处理: storm保证每个tuple会被topology完整的执行。storm会追踪由每个spout tuple所产生的tuple树(一个bolt处理一个tuple之后可能会发射别的tuple从而可以形成树状结构), 并且跟踪这棵tuple树什么时候成功处理完。每个topology都有一个消息超时的设置, 如果storm在这个超时的时间内检测不到某个tuple树到底有没有执行成功, 那么topology会把这个tuple标记为执行失败,并且过一会会重新发射这个tuple。 Storm架构
  • 44. 1.下载安装包:version:0.10.0 2.修改配置文件:storm.yaml 注意:每行开头留一格空格,冒号后面留一格空格。 3.启动Storm各个后台进程。 Nimbus启动命令: storm nimbus
  • 45. Storm安装部署
  • 46. Storm如何通过java API构建拓扑: // 创建拓扑 TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("spout", new MySpout(), 1); topologyBuilder.setBolt("splitbolt", new MyBoltSplit(), 2).shuffleGrouping("spout"); topologyBuilder.setBolt("countbolt", new MyBoltCount(), 2).fieldsGrouping("splitbolt", new Fields("data")); Config conf = new Config(); conf.setDebug(false); conf.setNumWorkers(2); //storm集群执行 StormSubmitter.submitTopology("word-sucess", conf, topologyBuilder.createTopology()); //本地集群 LocalCluster cluster = new LocalCluster(); //提交拓扑(该拓扑的名字叫word-count cluster.submitTopology("word-count",conf,topologyBuilder.createTopology()); Thread.sleep(10000); Storm拓扑
  • 47. Storm如何通过java API构建拓扑: Spout: 1.实现IBaseRichSpout接口 2.继承BaseRichSpout类 主要是实现其中的nextTuple() bolt: 1.实现IBaseRichSpout接口 2.继承BaseRichSpout类 主要是实现其中的execute(Tuple tuple)() // 创建拓扑 TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("spout", new MySpout(), 1); topologyBuilder.setBolt("splitbolt", new MyBoltSplit(), 2).shuffleGrouping("spout"); topologyBuilder.setBolt("countbolt", new MyBoltCount(), 2).fieldsGrouping("splitbolt", new Fields("data")); Config conf = new Config(); conf.setDebug(false); conf.setNumWorkers(2); //storm集群执行 StormSubmitter.submitTopology("word-sucess", conf, topologyBuilder.createTopology()); //本地集群 LocalCluster cluster = new LocalCluster(); //提交拓扑(该拓扑的名字叫word-count cluster.submitTopology("word-count",conf,topologyBuilder.createTopology()); Thread.sleep(10000); Storm拓扑
  • 48. 谢谢大家捧场!