Apache Flink Next-gen data analysis Kostas Tzoumas ktzoumas@apache.org @kostas_tzoumas What is Flink •  Project undergoing incubation in the Apache Software Foundation •  Originating from the Stratosphere research project started at TU Berlin in 2009 •  http://flink.incubator.apache.org •  58 contributors (doubled in ~ 4 months) •  Has a cool squirrel for a logo 2 This talk •  Data processing engines in Hadoop •  Flink from a user perspective •  Tour of Flink internals •  Closing 3 DATA PROCESSING IN THE HADOOP ECOSYSTEM 4 Open source data infrastructure – an era of choice 5 MapReduce Hive Flink Spark Storm Yarn Mesos HDFS Mahout Cascading Tez Pig Data processing engines App and resource management Applications Storage, streams Kafka HBase … … Engine paradigms & systems 6 MapReduce (OSDI’04) RDDs (HotCloud’10, NSDI’12) Dryad, Nephele (EuroSys’07) PACTs (SOCC’10, VLDB’12) Apache Tez Apache Spark Apache Flink (incubating) Apache Hadoop 1 7 •  Small recoverable tasks •  Sequential code inside map & reduce functions •  Extends map/reduce model to DAG model •  Backtracking-based recovery •  Functional implementation of Dryad recovery (RDDs) •  Restrict to coarse- grained transformations •  Direct execution of API •  Embed query processing runtime in DAG engine •  Extend DAG model to cyclic graphs •  Incremental construction of graphs Dryad Engine comparison 8 Paradigm Optimization Execution API Optimization in all APIs Optimization of SQL queries none none DAG Transformations on k/v pair collections Iterative transformations on collections RDD Cyclic dataflows MapReduce on k/v pairs k/v pair Readers/Writers Batch sorting Batch sorting and partitioning Batch with memory pinning Stream with out-of-core algorithms MapReduce USING FLINK 9 Data sets and operators 10 DataSet A DataSet B DataSet C A (1) A (2) B (1) B (2) C (1) C (2) X X Y Y Program Parallel Execution X Y Operator X Operator Y Rich operator and functionality set 11 Reduce! Join! Map! Reduce! Map! Iterate! Source! Sink! Source! Map, Reduce, Join, CoGroup, Union, Iterate, Delta Iterate, Filter, FlatMap, GroupReduce, Project, Aggregate, Distinct, Vertex-Update, Accumulators WordCount in Java ExecutionEnvironment!env!=!! !!ExecutionEnvironment.getExecutionEnvironment();! - DataSet!text!=!readTextFile!(input);! - DataSet>!counts=!text! --.map-(l!A>!l.split(“\\W+”))! !!.flatMap!((String[]!tokens,!! !!!!!!!!!!!!!Collector>!out)!A>!{! !!!!!Arrays.stream(tokens)! !!!!!.filter(t!A>!t.length()!>!0)! !!!!!.forEach(t!A>!out.collect(new!Tuple2<>(t,!1)));! !!!})! !!.groupBy(0)! !!.sum(1);! ! env.execute("Word!Count!Example");!!!! !!! ! 12 text flatMap reduce counts WordCount in Scala 13 text flatMap reduce counts val-env!=!ExecutionEnvironment! !!.getExecutionEnvironment! ! val!input!=!env.readTextFile(textInput)! ! val-counts!=!text! !!.flatMap!{!l!=>!l.split("\\W+")!! !!.filter!{!t!=>!t.nonEmpty!}!}! !!.map!{!t!=>!(t,!1)!}! !!.groupBy(0)! !!.sum(1)! - env.execute()! DataSet!large!=!env.readCsv(...);! DataSet!medium!=!env.readCsv(...);! DataSet!small!=!env.readCsv(...);! ! DataSet!joined1!=!large! !!!!.join(medium)! !!!!.where(3).equals(1)! !!!!.with(new!JoinFunction()!{!...!});! ! DataSet!joined2!=!small! !!!!.join(joined1)! !!!!.where(0).equals(2)! !!!!.with(new!JoinFunction()!{!...!});! ! DataSet!result!=!joined2! !!!!.groupBy(3)! !!!!.max(2);!! Long operator pipelines 14 ! ! γ! large! medium! small! Beyond Key/Value Pairs 15 DataSet!pages!=!...;! DataSet!impressions!=!...;! !!! DataSet!aggregated!=!! !impressions! !!.groupBy("url")! !!.sum("count");! !! pages.join(impressions).where("url").equalTo("url")! ! ! //!custom!data!types! ! class!Impression!{! !!!!public!String!url;! !!!!public!long!count;! }! class!Page!{! !!!!public!String!url;! !!!!public!String!topic;! }! Beyond Key/Value Pairs Why not key/value pairs •  Programs are much more readable ;-) •  Functions are self-contained, do not need to set key for successor) •  Much higher reusability of data types and functions –  Within Flink programs, or from other programs 16 “Iterate” operator •  Built-in operator to support looping over data •  Applies step function to partial solution until convergence •  Step function can be arbitrary Flink program •  Convergence via fixed number of iterations or custom convergence criterion 17 partial solution partial solution X other datasets Y Iterate initial solution iteration result Replace Step function Transitive closure in Java DataSet>!edges!=!…! ! IterativeDataSet>!paths!=!! !!edges.iterate(10);! ! DataSet>!nextPaths!=!paths! !!.join(edges)! !!.where(1).equalTo(0)! !!.with((left,!right)!A>!{! !!!!!!!return-new-Tuple2(! !!!!!!!!!new!Long(left.f0),! !!!!!new!Long(right.f1));})! !!.union(paths)! !!.distinct();! ! DataSet>!transitiveClosure!=!! !!paths.closeWith(nextPaths);! 18 join! paths! edges! U! dis4 nct! edges! closure! Transitive closure in Scala 19 join! paths! edges! U! dis4 nct! edges! closure! val-edges!=!…! ! val-paths!=!edges.iterate!(10)!{!! !!prevPaths:!DataSet[(Long,!Long)]!=>! !!!!prevPaths! !!!!!!.join(edges)! !!!!!!.where(1).equalTo(0)!{! !!!!!!!!(left,!right)!=>!! !!!!!!!!!!!!(left._1,right._2)! !!!!!!}! !!!!!!.union(prevPaths)! !!!!!!.groupBy(0,!1)! !!!!!!.reduce((l,!r)!=>!l)! }! “Delta Iterate” operator 20 partial solution delta set X other datasets Y Delta-Iterate initial solution iteration result workset A B workset Merge deltas Replace initial workset •  Compute next workset and changes to the partial solution until workset is empty •  Similar to semi-naïve evaluation in datalog •  Generalizes vertex-centric computing of Pregel and GraphLab Using Spargel: The graph API 21 ExecutionEnvironment!env!=!getExecutionEnvironment();! ! DataSet!vertexIds!=!env.readCsv(...);! DataSet>!edges!=!env.readCsv(...);! ! DataSet>!vertices!=!vertexIds! !!!!!.map(new!IdAssigner());! ! DataSet>!result!=!vertices.runOperation(! !!!!!!!!VertexCentricIteration.withPlainEdges(! !!!!!!!!!!!!!!!!!!edges,!new!CCUpdater(),!new!CCMessager(),!100));! ! result.print();! env.execute("Connected!Components");! Pregel/Giraph-style Graph Computation Spargel: Implementation 22 Spargel is implemented in < 500 lines of code on top of delta iterations Hadoop Compatibility Flink supports out-of-the-box supports •  Hadoop data types (writables) •  Hadoop Input/Output Formats •  Hadoop functions and object model 23 Input! Map! Reduce! Output! DataSet DataSet DataSet Red Join DataSet Map DataSet Output!S Input! FLINK INTERNALS 24 Distributed architecture 25 val-paths!=!edges.iterate!(maxIterations)!{!! !!prevPaths:!DataSet[(Long,!Long)]!=>! !!val-nextPaths!=!prevPaths! !!!!.join(edges)! !!!!.where(1).equalTo(0)!{! !!!!!!(left,!right)!=>!(left._1,right._2)! !!!!}! !!!!.union(prevPaths)! !!!!.groupBy(0,!1)! !!!!.reduce((l,!r)!=>!l)! !!nextPaths! }! Client Optimization and translation to data flow Job Manager Scheduling, resource negotiation, … Task Manager Data node memory heap Task Manager Data node memory heap Task Manager Data node memory heap 26 Common API Storage Streams Hybrid Batch/Streaming Runtime HDFS Files S3 Cluster Manager YARN EC2 Native Flink Optimizer Scala API (batch) Graph API („Spargel“) JDBC Redis Rabbit MQ Kafka Azure … Java Collections Streams Builder Apache Tez Python API Java API (streaming) Apache MRQL Batch Streaming Java API (batch) Local Execution The growing Flink stack… Program lifecycle 27 val-source1!=!…! val!source2!=!…! val!maxed!=!source1! !!.map(v!=>!(v._1,v._2,! !!!!!!!!!!!!!math.max(v._1,v._2))! val!filtered!=!source2! !!.filter(v!=>!(v._1!>!4))! val!result!=!maxed! !!.join(filtered).where(0).equalTo(0)!! !!.filter(_1!>!3)! !!.groupBy(0)! !!.reduceGroup!{……}! Common API Storage Streams Hybrid Batch/Streaming Runtime HDFS Files S3 Cluster Manager YARN EC2 Native Flink Optimizer Scala API (batch) Graph API („Spargel“) JDB C Redis Rabbit MQ Kafka Azure … Java Collections Streams Builder Apache Tez Python API Java API (streaming) Apache MRQL Java API (batch) Local Execution Memory management 28 public-class-WC-{- --public-String-word;- --public-int-count;- }- empty! page! Pool of Memory Pages •  Flink manages its own memory •  User data stored in serialize byte arrays •  In-memory caching and data processing happens in a dedicated memory fraction •  Never breaks the JVM heap •  Very efficient disk spilling and network transfers JVM Heap Flink Managed Heap Network Buffers Unmanaged Heap Little tuning or configuration required •  Requires no memory thresholds to configure –  Flink manages its own memory •  Requires no complicated network configs –  Pipelining engine requires much less memory for data exchange •  Requires no serializers to be configured –  Flink handles its own type extraction and data representation •  Programs can be adjusted to data automatically –  Flink’s optimizer can choose execution strategies automatically 29 Understanding Programs 30 Visualize and understand the operations and the data movement of programs Analyze!a=er!execu4on! Screenshot from Flink’s plan visualizer Understanding Programs 31 Analyze!a=er!execu4on!(4mes,!stragglers,!…)! Understanding Programs 32 Analyze!a=er!execu4on!(4mes,!stragglers,!…)! Built-in vs driver-based iterations Step Step Step Step Step Client! Step Step Step Step Step Client! map! join! red.! join! Loop outside the system, in driver program Iterative program looks like many independent jobs Dataflows with feedback edges System is iteration- aware, can optimize the job 33 Delta iterations 34 0! 200! 400! 600! 800! 1000! 1200! 1400! 0! 2! 4! 6! 8! 10!12!14!16!18!20!22!24!26!28!30!32!34! #"Ver&ces"(thousands)" Itera&on" Bulk Delta 0! 1000! 2000! 3000! 4000! 5000! 6000! TwiOer! Webbase!(20)! Computations performed in each iteration for connected communities of a social graph Runtime (secs) Cover typical use cases of Pregel-like systems with comparable performance in a generic platform and developer API. *Note: Runtime experiment uses larger graph Optimizing iterative programs Caching!LoopSinvariant!Data!Pushing!work! „out!of!the!loop“! Maintain!state!as!index! 35 WIP: Flink on Tez •  Flink has a modular design that makes it easy to add alternative frontends and backends •  Flink programs are completely unmodified •  System generates a Tez DAG instead of a Flink DAG 36 Common API Hybrid Batch/Streaming Runtime Flink Optimizer Scala API (batch) Graph API („Spargel“) Java Collections Streams Builder Apache Tez Python API Java API (streaming) Apache MRQL Java API (batch) First prototype will be available in the next release CLOSING 37 Upcoming features •  Robust and fast engine –  Finer-grained fault tolerance, incremental plan rollout, optimization, and interactive shell clients •  Alternative backends: Flink on * –  Tez backend •  Alternative frontends: * on Flink –  ML and Graph functionality, Python support, logical (SQL-like) field addressing •  Flink Streaming –  Combine stream and batch processing in programs 38 40 flink.incubator.apache.org @ApacheFlink “Flink Stockholm week” Wednesday •  Flink introduction and hackathon @ 9:00 •  Flink talk at Spotify (SHUG) @ 18:00 Thursday •  Hackathons at KTH (streaming & graphs) @ 9:00




