基于Golang将MongoDB的数据同步到Elasticsearch

jopen 8年前

Elasticsearch是一个分布式可扩展的实时搜索和分析引擎。它能帮助你搜索、分析和浏览数据。Elasticsearch 是一个基于Lucene实现的搜索服务器,用Java开发实现。它提供了RESTful web接口,并作为Apache许可条款下的开放源码发布,能够达到实时搜索,稳定,可靠,快速,安装使用方便。

Elasticsearch关键概念

Cluster集群有相同集群名称的节点Node的集合。集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的。es的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看es集群,在逻辑上是个整体,你与任何一个节点的通信和与整个es集群通信是等价的。

Cluster集群

Node节点
一个elasticsearch运行的实例。其实就是一个java进程。一般情况下,一台机器运行在一台机器上。
Shards分片
代表索引分片,es可以把一个完整的索引分成多个分片,这样的好处是可以把一个大的索引拆分成多个,分布到不同的节点上。构成分布式搜索。分片的数量只能在索引创建前指定,并且索引创建后不能更改。
Replicas副本
代表索引副本,es可以设置多个索引的副本,副本的作用一是提高系统的容错性,当某个节点某个分片损坏或丢失时可以从副本中恢复。二是提高es的查询效率,es会自动对搜索请求进行负载均衡。
Recovery恢复
代表数据恢复或叫数据重新分布,es在有节点加入或退出时会根据机器的负载对索引分片进行重新分配,挂掉的节点重新启动时也会进行数据恢复。
River
代表es的一个数据源,也是其它存储方式(如:数据库)同步数据到es的一个方法。它是以插件方式存在的一个es服务,通过读取river中的数据并把它索引到es中,官方的river有couchDB的,RabbitMQ的,推ter的,Wikipedia的。River在1.5中弃用了,2.0 中移除了。
Gateway
代表es索引快照的存储方式,es默认是先把索引存放到内存中,当内存满了时再持久化到本地硬盘。gateway对索引快照进行存储,当这个es集群关闭再重新启动时就会从gateway中读取索引备份数据。es支持多种类型的gateway,有本地文件系统(默认),分布式文件系统,Hadoop的 HDFS和amazon的s3云存储服务。
Discovery.zen
代表es的自动发现节点机制,es是一个基于p2p的系统,它先通过广播寻找存在的节点,再通过多播协议来进行节点之间的通信,同时也支持点对点的交互。
Transport
代表es内部节点或集群与客户端的交互方式,默认内部是使用tcp协议进行交互,同时它支持http协议(json格式)、thrift、servlet、memcached、zeroMQ等的传输协议(通过插件方式集成)。
Index索引
Elasticsearch用来存储数据的逻辑区域,它类似于关系型数据库中的table概念。一个index可以在一个或者多个shard上面,同时一个shard也可能会有多个replicas。
Document
Elasticsearch里面存储的实体数据,类似于关系数据中一个table里面的一行数据。 document由多个field组成,不同的document里面同名的field一定具有相同的类型。document里面field可以重复出现,也就是一个field会有多个值,即multivalued。
Document type
为了查询需要,一个index可能会有多种document,也就是document type,但需要注意,不同document里面同名的field一定要是相同类型的。
Mapping
存储field的相关映射信息,不同document type会有不同的mapping。

本文不是介绍Elasticsearch的安装配置文档,这些文档你可以在本文的参考资料中获得, 而是笔者在项目中的实践笔记。
我最近在开发过程中需要将Mongo数据库中的文章进行索引, 避免直接对Mongo数据库进行搜索导致的性能降低。基于Elasticsearch的手册的描述,以及在一些大公司如推ter的应用的经验,我选用它作为现在的项目的索引服务器。

现在的项目是Golang语言实现的,所以我调研的目标放在了Golang + MongoDB + Elasticsearch上面。

elasticsearch-river-mongodb是一个针对Mongo的elasticsearch river的插件。它从Mongo oplog中读取信息,将Mongo集群中的数据导入到 elasticsearch 中。考虑到river被弃用,暂时不考虑这个方案。

mandeepm91在文章How To Sync Transformed Data from MongoDB to Elasticsearch with Transporter on Ubuntu 14.04提到了另外一个工具Transporter,这是一个相当好的工具,可以抽取Mongo单例或者Mongo集群的数据,然后使用otto框架进行Javascript处理,而且处理是通道式的。在调研中很容易的将Mongo数据库导入到Elasticsearch。

不过我也没有采用这个方案。因为我的项目中,对文章的增删改的动作比较少,可以直接调用Elasticsearch的API进行操作。而且这样可以做到数据的实时索引和查询。

上面的方案多少会影响服务器的性能,有可能会block在Elasticsearch的API调用上。 所以我在增删改文章时,将操作命令的log放入到一个消息服务器中(nsq或者kafka),然后在单独的一台服务器上接收消息并调用Elasticsearch的API。

参考资料

  1. https://www.digitalocean.com/community/tutorials/how-to-sync-transformed-data-from-mongodb-to-elasticsearch-with-transporter-on-ubuntu-14-04
  2. https://github.com/compose/transporter
  3. Elasticsearch 权威指南
  4. elasticsearch中文指南
  5. https://www.elastic.co/guide/en/elasticsearch/guide/current/_empty_search.html
  6. https://github.com/mattbaird/elastigo
  7. https://github.com/olivere/elastic
  8. https://github.com/richardwilly98/elasticsearch-river-mongodb
  9. http://baike.baidu.com/item/elasticsearch
  10. https://www.elastic.co/blog/deprecating-rivers

原文链接: http://colobu.com/2015/10/27/Sync-Transformed-Data-from-MongoDB-to-Elasticsearch/