1. PIG on Storm
P R E S E N T E D B Y M r i d u l J a i n ⎪ J u n e 3 , 2 0 1 4
2 0 1 4 H a d o o p S u m m i t , S a n J o s e , C a l i f o r n i a
1 2014 Hadoop Summit, San Jose, California
2. • Intro – PIG, Storm
• PIG on Storm
• PIG – Hybrid Mode
2 2014 Hadoop Summit, San Jose, California
3. Quick Intuition: PIG
3 2014 Hadoop Summit, San Jose, California
Q = LOAD “sentences” USING PigStorage() AS (query:chararray);
words = FOREACH Q GENERATE FLATTEN(Tokenize(query));
word_grps = GROUP words BY $0;
word_counts = FOREACH word_grps GENERATE $0, COUNT($1);
4. Quick Intuition: Storm
4 2014 Hadoop Summit, San Jose, California
Kafka Loader
Tokenizer
Group By
Count
“Obama wins elections”
“Obama care ”
“Obama wins
elections”
“Obama
Care”
“Obama”
“Care”“Obama” “wins”
“elections”
“Obama”
“Obama”
“wins”
“elections”
“care”
5. Quick Intuition: PIG on Storm
5 2014 Hadoop Summit, San Jose, California
Kafka Loader
Tokenizer
Group By
Count
“Obama wins elections”
“Obama care ”
“Obama wins
elections”
“Obama
Care”
“Obama”
“Care”“Obama” “wins”
“elections”
“Obama”
“Obama”
“wins”
“elections”
“care”
Q = LOAD “sentences” USING KafkaLoader() AS
(query:chararray);
words = FOREACH Q GENERATE
FLATTEN(Tokenize(query));
word_grps = GROUP words BY $0;
word_counts = FOREACH word_grps GENERATE
$0, COUNT($1);
6. Storm Modes
6 2014 Hadoop Summit, San Jose, California
Bolt
A
Bolt
B
Bolt
CSpout
Bolt
A
Bolt
B
Bolt
C
C
S
E1
E2
E3
Kafka
AMQ
Event Processing
Batch Processing
Event
Data
Stream
Bunch of events, tagged by BatchID
11. PIG on Storm
Write once run anywhere
11 2014 Hadoop Summit, San Jose, California
12. PIG Script
Map
Reduce
Storm
• Express in PIG and run on Storm -
simplified code & inbuilt ops.
• Think & write in PIG - scripts as well as
UDFs.
• The same script would generally run
on MR or Storm - existing scripts easy
to move over to realtime, quickly
• Easy pluggability to any streaming
data source.
Batch Aggregation
• Batches supported in PIG on which
aggregation happen
• Aggregation across batches also possible
now!
Rich State Semantics
• A state can now be associated & represented as a
PIG relation & operated upon as a usual PIG relation.
• Sliding Windows now available in Storm via PIG -
automatically updates the window with every new
batch: HBase & any other store pluggable
• Global Mutable State - updated state available with
every batch and exclusively accessible during
commits: PigStorage, HBase & any other store
pluggable.
• Richer operations & state mgmt - upcoming!
Hybrid Mode
Mode which decides what parts of your PIG
script to run on Storm & what on MR,
automatically.
12 2014 Hadoop Summit, San Jose, California
13. Think streaming in PIG
A = Load "a.source" from StorageA();
B = Load "b.source" from StorageB();
C = foreach A generate PIGUDF(*);
D = group A by $0;
E = foreach D generate PIGUDF1(*)
F = cross A,B;
A script variable which contains the PIG types are open pipes
which will get data as time passes, than all records available
upfront unlike PIG.
Two streams A and B are open here.
• Semantics are same as PIG's i.e programmer deals with batches of records and thinks in
the same terms.
• Each batch here corresponds to a single batch in storm.
• The tuples for a batch get generated as timepasses, in a streaming fashion; though tuples
start moving in the pipeline as and when they are generated within a batch, than waiting
for whole batch to finish.
• Pipelining of batches is supported as the batch doesn't have to traverse the topology (the
whole pig script completely here), before the next batch can start.
• In-line with PIG's philosophy, all operations like joining, merging of stream and every other
stream transformation is explicitly done by the programmer.
13 2014 Hadoop Summit, San Jose, California
14. Language Models for Trend Detection
Copy
Query @ time
1600hrs today:
Obama wins
elections
Current Window (Past x hrs from
current time) Calculator
Yesterday's Window (Past x hrs from
current time, yesterday) Calculator
Total Count of each n-gram from 1200-1600hrs
today:
Obama wins elections
Total Count 10 30 5
Total Vocab size for the window 1200-1600hrs
today: 1000
Total Count of each n-gram from 1200-1600hrs
yesterday:
Obama wins elections
Total Count 5 10 2
Total Vocab size for the window 1200-1600hrs
yesterday: 1200
Probability("Obama wins
elections") in current window
-----------------------------------------
Probability("Obama wins
elections") in Yesterday's window
● Detects trends in Search or Twitter signals by comparing n-gram
frequencies across current and historic time windows
● Needs notion of time windows
● Needs state mgmt for historic data
14 2014 Hadoop Summit, San Jose, California
16. register '/home/mridul/posapps/trending/ngram_query.py' using jython as nqfunc;
--get live queries as a batch {(q1),(q2),(q3)...}
LiveQueries = LOAD 'twitter' USING org.pos.udfs.kafka.KafkaEmitter('60');
--generate the relation Ngrams having {(n1,q1),(n2,q2)...} from LiveQueries
Ngrams = FOREACH LiveQueries GENERATE FLATTEN(nqfunc.split_ngram_query($0));
--store the above Ngram in in Hbase for sliding window
STORE Ngram INTO 'testtable' USING org.pos.state.hbase.WindowHbaseStore('fam');
--load the current 5 hr window from the datasource which is of the form
{(n1,c1),(n2,c2),(n3,c3)...}
NgramModel = LOAD 'testtable,-6,-1' USING
org.pos.state.hbase.WindowHbaseStore('fam') as (word, cnt);
--group all to find the total in next step {(ALL,{(n1,c1),(n2,c2),(n3,c3)...})}
GM = GROUP NgramModel ALL;
--find total count of all tuples in the current window
TotalNgramCount = FOREACH GM GENERATE SUM($1.$1);
--find the unique count of tuples in the current window
VocabCount = FOREACH GM GENERATE COUNT($1);
--Next steps get all the data per ngram in a fmt which helps in calculating MLE
--
{(ngram1,query1,ngram1,ngram1_frequency,total_ngrams,vocab_size),(ngram2,query2,ng
ram2,ngram2_frequency,total_ngrams,vocab_size)}
CW1 = JOIN Ngrams BY $0, NgramModel BY $0;
Kafka Spout
WindowHbaseStore
Spout
1 2 3 4 5 6 7 8
CW
1
Joined
batch
search/twitter
--Join the streams to calculate the counts for an Ngram in
the query and unique vocab, from NgramModel (every
batch)
16 2014 Hadoop Summit, San Jose, California
17. Advanced State features on Storm
● Efficient in-memory state
● Rich out-of-the-box in-memory data structures (union, intersections
and sketches)
● Advanced semantics like sliding windows supported inherently
● Rich expression of state via PIG
● Fast recovery of the in-memory state
● Potentially query-able state and immutable jobs on those states
● Scheduling of tasks based on data locality
17 2014 Hadoop Summit, San Jose, California
18. PIG Hybrid Mode
One script to bind it ALL!
18 2014 Hadoop Summit, San Jose, California
19. Fast path + Slow path
User Profile & History
(Offline Model)
Realtime User
Event
Enriched
User Event
UDF
1. User Event Processing
Motivation
User latest profile
lookup
● Merge latest user event into the
user profile model.
● User profile model crunches
huge data, periodically.
● Separate processing logic for
realtime vs batched pipeline.
19 2014 Hadoop Summit, San Jose, California
20. Current Solutions
Batch processing and real-time processing systems have been developed in isolation and
maintained separately.
Requires architecting the whole system explicitly as there is no system supporting both
currently.
Shared state store schema is tied to application logic.
Read-write sync logic and locking needs to be custom designed between the pipelines.
20 2014 Hadoop Summit, San Jose, California
21. Hybrid Mode
--get live queries as a batch {(q1),(q2),(q3)...}
LiveQueries = LOAD 'twitter' USING org.pos.udfs.kafka.KafkaEmitter('60');
--generate the relation Ngrams having {(n1,q1),(n2,q2)...} from LiveQueries
Ngrams = FOREACH LiveQueries GENERATE FLATTEN(nqfunc.split_ngram_query($0));
--store the above Ngram in in Hbase for sliding window
STORE Ngram INTO 'testtable' USING org.pos.state.hbase.WindowHbaseStore('fam');
--load the current 5 hr window from the datasource which is of the form {(n1,c1),(n2,c2)}
NgramModel = LOAD 'testtable,-1000,-1,10' USING org.pos.state.hbase.WindowHbaseStore('fam') as
(word, cnt);
--group all to find the total in next step {(ALL,{(n1,c1),(n2,c2),(n3,c3)...})}
GM = GROUP NgramModel ALL;
--find total count of all tuples in the current window
TotalNgramCount = FOREACH GM GENERATE SUM($1.$1);
--find the unique count of tuples in the current window
VocabCount = FOREACH GM GENERATE COUNT($1);
--Next steps get all the data per ngram in a fmt which helps in calculating MLE
--
{(ngram1,query1,ngram1,ngram1_frequency,total_ngrams,vocab_size),(ngram2,query2,ngram2,ngram2
_frequency,total_ngrams,vocab_size)}
CW1 = JOIN Ngrams BY $0, NgramModel BY $0;
Every 10th batch, read in a
relative state of 1000 batches
from the current batch. Process
that range in MR/Offline.
Large Batch Range & Low
Frequency of Processing + High
Data payload - helps decide the
Storm/MR parts.
Point of merge/interaction with
the online relation (Ngrams)
defines the boundary for the MR
Job.
21 2014 Hadoop Summit, San Jose, California
23. Next
● Perf testing @ scale
● Install & Setup - environments, dependencies, paths, software installs
on different systems
● Demo & Documentation
● State related optimizations
● Hybrid Mode
23 2014 Hadoop Summit, San Jose, California
28. Grouping Implementation - MR
D
LR
D
GR
D
Pkg
D
Pkg
D
LR
“cow” “jumped”
“jumped”
“cow”
“jumped”
“jumped”
MR
Implementation
Physical Plan
Relation D (for grouping) gets broken into
3 operators from Logical to Physical Plan
Ma
p
Reduc
e
29. test-load-2: Store(file:///Users/mridul/workspace/PIGOnStorm/screen:org.apache.pig.builtin.PigStorage) - scope-23
|
|---F: New For Each(false,false)[bag] - scope-22
| |
| Project[chararray][0] - scope-18
| |
| Project[long][1] - scope-20
|
|---E: New For Each(false,false)[bag] - scope-17
| |
| Project[chararray][0] - scope-12
| |
| POUserFunc(org.apache.pig.builtin.COUNT)[long] - scope-15
| |
| |---Project[bag][1] - scope-14
|
|---D: Package[tuple]{chararray} - scope-9
|
|---D: Global Rearrange[tuple] - scope-8
|
|---D: Local Rearrange[tuple]{chararray}(false) - scope-10
| |
| Project[chararray][0] - scope-11
|
|---C: New For Each(true)[bag] - scope-7
| |
| Project[bag][0] - scope-5
|
|---B: New For Each(false)[bag] - scope-4
| |
| POUserFunc(org.apache.pig.builtin.TOKENIZE)[bag] - scope-2
| |
| |---Project[bytearray][0] - scope-1
|
|---A: Load(sentences.txt:org.pos.main.RandomSentenceLoadFunc('sentence','2','the cow jumped over the moon,the cow man went to the store
and bought some candy moon, the cow went to the moon')) - scope-0
A
B
C
D
LR
D
GR
D
Pkg
E
F
Physical Plan
30. Clustering Algo(Boundary detection to form a job):
● How are POs clustered into an MR Job?
● How POs within a job, are clustered into Maps and Reduces?
PO1
PO2 PO3
PO4
pass
thro’
Map
PO5
MR Job1 MR Job2
Map containing 2 Physical Operators where data is passed directly
from one operator to another.
Reduce containing a reducing operator PO3 followed by PO4 which processes the output of PO3.
Shuffling needs to be done between PO2 and PO3. This defines a boundary condition between a
Map & a Reduce within a MR Job.
An MR Job can have only 1 Map & 1
Reduce.
PO5 is another reducer which cannot be in
the same job as MR1 and so a new job MR2
is created as a result of this boundary
condition.
Load PO gets mapped to an MROp which becomes parts of the data loading framework for a MR Job. After loading the
data, it is passed tuple by tuple to the subsequent Map by the MR framework.
35. Data Flow @ Runtime - MR
A
B
C
D
LR
D
GR
D
Pkg
E
F
MROp(Map)
MROp(Reduce)
MR Job
36. Data Flow @ Runtime - MR
A
B
C
D
LR
MROp(Map)
MR Job
37. Data Flow @ Runtime - MR
A
B
C
D
LR
MROp(Map)
MR Job
Loads data from HDFS. MR framework
passes it tuple by tuple to the
subsequent Map(MROp here).
The MROp callback should attach the tuple given
by the framework to the root node of it’s local tree
(B here)
The leaf node in every MROp pulls the data tuple recursively
(i.e D->C->B). This is the same tuple which was attached
initially to B.
Process repeats.
38. Data Flow @ Runtime - Storm
A
B
C
D
LR
D
GR
D
Pkg
E
F
StormOp(Bolt)
StormOp(Bolt)
Storm
Topology
Spout pulls data from a datasource or
queue and emits to the next Bolt.
Bolt attaches the tuple to the root node B.
The leaf node D pulls the tuple and emits.
39. Grouping Implementation - Storm
D
LR
D
GR
D
Pkg
D
Pkg
D
LR
“cow” “jumped”
“jumped”
“cow”
“jumped”
“jumped”
Storm
Implementation
Physical Plan
Each task of DPkg could maintain multiple
states corresponding to specific tokens (like a
reducer)
40. Distinct Implementation - Storm
D
Pkg
D
LR
“cow” “jumped”
“jumped”
“cow”
“jumped”
“jumped”
Each task of DPkg could maintain multiple
states corresponding to the specific token (like a
reducer) and emit only the distinct tokens from
each, directly.
D
LR
D
GR
D
Pkg
Storm
Implementation
Physical Plan
41. Sort Implementation - Storm
D
Pkg
D
LR
“cow” “jumped”
“jumped” “cow”
“jumped”
“jumped”
Single DPkg task maintains a sorted Treemap,
which is emitted at end of the batch.
D
LR
D
GR
D
Pkg
Storm
Implementation
Physical Plan
Global
Grouping
42. Cogroup Implementation - Storm
D
Pkg
Each DPkg task emits a tuple containing the field grouped
on and a bag having grouped tuples, from relation A and B,
which is emitted at end of the batch.
A
LR
B
LR
D
PkgA
LR
B
LR
Relation A and B are merged into a Stream by
Trident for the field grouped tuple before
passing to resp POPackage
“cow” “jumped”
“cow”
{“cow”, “cow”}
Field grouping as
property of the edge
43. Data Format Transformation during Data Flow (Storm)
A B
C
D
LR
Queu
e
StormOp(Bolt)
Takes Storm tuple and converts into PIG tuple
before fwding/attaching as input to POs.
Converts from PIG tuple back to Storm tuple, before
emitting from a bolt. Payload is packaged into a single
PIG tuple i.e outgoing stream has complex data
structures (like bags etc) which are packed as blobs in
a tuple.
Editor's Notes
Its not very clear - the two tuples 1 and 3 became one tuple 1’ post Bolt A? Or we are just saying here that for all tuples that are generated from a particular batch, the ID remains same even though the tuples change themselves
This one is WIP i guess
Should we add some info on how we support rich state semantics? Whats our storage and what could be its constraints if any?
Lets say that we are now talking about a real world example, the transition from previous to this one is a little abrupt. For instance,
Example application - Trend detection
Requires -
Time windows
State management
We need to add label for 4 in the image. I am assuming the unlabeled arrow is number 4, is it? If yes, then arrow’s direction should be reversed as well as the flow is from MR to Hbase. The right side script can completely go, already covered in slide 16