- 1. HBase Coprocessor to Index Columns into ElasticSearch ClusterDibyendu BhattacharyaArchitect – Big Data AnalyticsHappiestMinds
- 2. About HappiestMinds
Next Gen IT Consultancy Company launched Aug 2011 . Head office in Bangalore, India, have offices in USA, UK, Canada, Australia and Singapore. Core focus on disruptive technologies like Big Data/Analytics, Cloud, Mobile and Social.
Raised USD 45M Series A Funding from prominent VCs , Intel Capital, Canaan Partners and founders.
45 + Client Globally, 800 + Employees.
About Myself :
Dibyendu is Big Data Architect at HappiestMinds where he is involved in architecting and developing solutions on a Hadoop-based analytics and search platform. In the past few years, he has worked on complex data analytics related projects that utilize Hadoop, HBase, and real time analytics. Before HappiestMinds, he worked at EMC, FairIsaac, Cisco, IBM etc.
- 3. This Presentation….…….will explores the design and challenges HappiestMinds faced while implementing a storage and search infrastructure for a library procurement system where books/documents/artifacts related records are stored in Apache HBase. Upon bulk insert of book records into HBase, the Elasticsearch index is built offline using MapReduce but there are certain use cases where the records need to be re-indexed in Elasticsearch using Region Observer Coprocessors.
- 4. Storing and Indexing Book records from Publishers and Libraries Publisher/
Library Data HDFSHBase ClusterData Pre Processing
Data ingestion to HadoopData Loading : Map Reduce
Bulk Data upload to HBase table
12123Elastic Search Cluster3Data Indexing : Map Reduce
Incremental Data Indexing to ElasticSearch
Part of the document is indexed.
User Search Data.
Search engine display results.
Full data access request fetch from HBase.
User Update data5a5b 5
User update HBase record.
Update will propagate to Search Cluster.
- 5. HBase Write Path
- 6. HBase Write Path
- 7. HBase Storage LayoutRegion Server ………………….…….
- 8. HBase Put Request
- 9. Here comes the CoprocessorsThe idea of HBase Coprocessors was inspired by Google’s Big Table coprocessors.
HBase coprocessors are an addition to data-manipulation toolset that were introduced as a feature in HBase in the 0.92.0 release.
With the introduction of coprocessors, we can push arbitrary computation out to the HBase nodes hosting data.
Coprocessors can be loaded globally on all tables and regions hosted by the region server, or the administrator can specify which coprocessors should be loaded on all regions for a table on a per-table basis.
- 10. Coprocessors Class and InterfacesThe Coprocessor Interface
All User code must inherit from this class
The CoprocessorEnvironement Interface
Retain state across invocation
The CoprocessorHost interfaces
Tied state and the user code
- 11. Observer CoprocessorsTwo types of Coprocessor
observer, which are like triggers in conventional databases.
endpoint, dynamic RPC endpoints that resemble stored procedures.
Observer Coprocessor : Callback functions/hooks for every explicit API method
Hooks into HMaster API
Hooks into Region related operations
Hooks into write-ahead log operations
- 12. RegionObserver Coprocessor … Put ( )RegionObserver: Provides hooks for data manipulation events, Get, Put, Delete, Scan, and so on. There is an instance of a RegionObserver coprocessor for every table region and the scope of the observations they can make is constrained to that region.
- 13. RegionObserver Coprocessor ... Get ( )
- 14. Let us see what is ElasticSearch
- 15. Distributed Search Engine : ElasticSearchDistributed
REST based search engine (on top of Lucene)
Designed to speak JSON (JSON in, JSON out)
Built on top of Lucene.
For each index you can specify:
Number of shards
Each index has fixed number of shards
Number of replicas
Each shard can have 0-many replicas, can be changed dynamically
- 16. ElasticSearch : Automatic DiscoveryDiscovery Module responsible for discovering nodes within the cluster , as well as electing master node.
The responsibility of master node is to maintain global cluster state, and act if nodes join or leave cluster by reassigning shards.
- 17. ElasticSearch : Talking to Cluster
- 18. ElasticSearch : Nodes are Different
- 19. The idea is to perform Indexing into ElasticSearch from HBase Coprocessors…..
- 20. We need a Java Client…Use ElasticSearch Transport Client : The Transport Client connects remotely to an ElasticSearch cluster. It does not join the cluster, but simply gets one or more initial transport addresses and communicates with them in round robin fashion on each action (though most actions will probably be “two hop” operations).
- 21. And Index with Transport Client…
- 22. But this approach has a problem..Client does not have the knowledge of the ElasticSearch cluster.
Two Hop indexing.
No fault tolerant mechanism if transport address is down.
HBase Region Servers can have hundreds regions and hence hundreds of transport client.
Use ElasticSearch Node Client. Client Node does not hold index but have knowledge of complete Cluster.
Use HBASE-6505 to share Node Client across Regions in a RegionServer.
- 23. HBase 6505RegionCoprocessorEnvironment provides a getSharedData() method, which returns a ConcurrentMap, which is held by the RegionCoprocessorHost as a weak reference (in a special map with strongly referenced keys and weakly referenced values), and held strongly by the RegionEnvironment.
That way if the coprocessor is blacklisted the coprocessors environment is removed, and any shared data is immediately available for garbage collection. This shared data is per RegionServer. As long as there is at least one region observer or endpoint active this shared data is not garbage collected and can be accessed to share state between the remaining coprocessors of the same class.
- 24. Shared Node Client across Regions
- 25. Shared Node Client across Regions
- 26. The Final Problem….Concurrency Control …
HBase Solve it using MVCC (Multi Version Concurrency Control):
Implement updates not by deleting an old piece of data and overwriting it with a new one, but instead by making the old data as obsolete and adding newer version
And ElasticSearch using OCC (Optimistic Concurrency Control) :
Multiple transactions can complete without affecting each other, and that therefore transactions can proceed without locking the data resources that they affect. Before committing, each transaction verifies that no other transaction has modified its data. If the check reveals conflicting modifications, the committing transaction rolls back.
- 27. Let See a Conflict.. Search and UpdateHBaseESC1C2V1V1V1(M/R)HBaseESC1C2V1V1V2 (Update success)ConflictV2(CP)V1(M/R)
- 28. One More Conflict.. Search and UpdateHBaseESC1C2V1V1V1(M/R)V1(M/R)HBaseESC1C2V1V1ConflictV2(M/R)Conflict
- 29. The bottom line is.Search and Update should only be successful when the Version of ElasticSearch and Version of HBase is same during the update.
- 30. Solution..Data Load from Source to HBase will insert a document with Put call.
2. postPut coprocessor will perform incrementColumnValue for a version column.
- 31. Solution..3. Same Version number will be propagated to ElasticSearch during Map Reduce based bulk indexing. ElasticSearch support version number supplied externally.
Step 1-3 will repeat for any new data upload.
During search and update , the client will perform checkAndPut () call.
5i. Client perform search and get the Version number from ElasticSearch
5ii. Client construct a Put with new Version No = Old Version + 1
5iii. Client perform checkAndPut, and check for old Version number before doing Put.
5iv. postCheckAndPut Coprocessor invoked to propagate the successful Put to Search Cluster.
5v. After this step the Version Number of HBase column and ElasticSearch version will be equal.
- 32. Solution..……………………………….
- 33. ThanksDibyendu.B@happiestminds.com