The Data Scientist’s Guide to Apache Spark


The Data Scientist’s Guide to Apache Spark Hands on with a practical case study Jonathan Dinu VP of Academic Excellence, Galvanize @clearspandex Data Science Immersive Full Stack Immersive Data Engineering Immersive Weekend Workshops + Questions? tweet @clearspandex Spark Fundamentals 5 What Is Spark? 6 What Is Spark? • Framework for distributed processing • In-memory, fault tolerant data structures • Flexible APIs in Scala, Java, Python, SQL… and now R! • Open Source Acquisition 7 Parse Storage Transform/Explore Vectorization Train Model Expose Presentation requests BeautifulSoup4 pandas pymongo Flask Dataframes/Spark SQL HDFS MLlib/spark.ml Spark Streaming scikit-learn/NLTK PySpark Data PipelineAt Scale Locally picklemodel.save() Unified Platform 8 Performance • Very fast at iterative algorithms • DAG scheduler supports cyclic flows (and graph computation) • Intermediate results kept in memory when possible • Bring computation to the data (data locality) 9 Rich API map() reduce() filter() sortBy() join() groupByKey() first() count() … and more … map() reduce() 10 Spark Core Standalone Scheduler YARN Mesos Spark StreamingDataFrames MLlib GraphXspark.ml PySpark SparkR Scala Java 11 Review • Framework for distributed processing • In-memory, fault tolerant data structures • Flexible APIs in Scala, Java, Python, SQL… and now R! • Open Source 12 Spark Programming Basics 13 Spark Execution Context Cluster Manager Laptop Driver Program SparkContext Worker Node Worker Node Executor Executor cache cache Task Task Task Task Standalone YARN Mesos Cluster 14 Terminology Term Meaning Driver Process that contains the SparkContext Executor Process that executes one or more Spark tasks Master Process that manages applications across the cluster Worker Process that manages executors on a particular node 15 • Resilient: if the data in memory (or on a node) is lost, it can be recreated • Distributed: data is chunked into partitions and stored in memory across the cluster • Dataset: initial data can come from a file or be created programmatically What is a RDD? Note: RDDs are read-only and immutable, we will come back to this later… 16 Functions Deconstructed import random flips = 1000000 # lazy eval coins = xrange(flips) # lazy eval, nothing executed heads = sc.parallelize(coins) \ .map(lambda i: random.random()) \ .filter(lambda r: r < 0.51) \ .count() Python Generator Create RDD Transformations Action (materialize result) 17 Functions Deconstructed import random flips = 1000000 # lazy eval coins = xrange(flips) # lazy eval, nothing executed heads = sc.parallelize(coins) \ .map(lambda i: random.random()) \ .filter(lambda r: r < 0.51) \ .count() # create a closure with the lambda function # apply function to data Closures 18 Spark Functions Transformations Actions Lazy Evaluation (does not immediately evaluate) Returns new RDD Materialize Data (evaluates RDD lineage) Returns final value (on driver) 19 Transformations # Every Spark application requires a Spark Context # Spark shell provides a preconfigured Spark Context called `sc` nums = sc.parallelize([1,2,3]) # Pass each element through a function squared = nums.map(lambda x: x*x) # => {1, 4, 9} # Keep elements passing a predicate even = squared.filter(lambda x: x % 2 == 0) # => [4] # Map each element to zero or more others nums.flatMap(lambda x: range(x)) # => {0, 0, 1, 0, 1, 2} 20 Actions nums = sc.parallelize([1, 2, 3]) # Retrieves RDD contents as a local collection nums.collect() # => [1, 2, 3] # Returms first K elements nums.take(2) # => [1, 2] # Count number of elements nums.count() # => 3 # Merge elements with an associative function nums.reduce(lambda: x, y: x + y) # => 6 # Write elements to a text file nums.saveAsTextFile("hdfs://file.txt") 21 Functions Revisited import random flips = 1000000 # lazy eval coins = xrange(flips) # lazy eval, nothing executed heads_rdd = sc.parallelize(coins) \ .map(lambda i: random.random()) \ .filter(lambda r: r < 0.51) head_count = heads_rdd.count() nothing runs here Everything runs here 22 Functions Revisited import random flips = 1000000 # local sequence coins = xrange(flips) # distributed sequence coin_rdd = sc.parallelize(coins) flips_rdd = coin_rdd.map(lambda i: random.random()) heads_rdd = flips_rdd.filter(lambda r: r < 0.51) # local value head_count = heads_rdd.count() 23 worker (distributed)driver driver RDD Lineage coins coin_rdd flips_rdd heads_rdd head_count sc.parallelize() map() filter() count() 24 Key-Value Operations pets = sc.parallelize([("cat", 1), ("dog", 1), ("cat", 2)]) pets.reduceByKey(lambda x, y: x + y) # => {(cat, 3), (dog, 1)} pets.groupByKey() # => {(cat, [1, 2]), (dog, [1])} pets.sortByKey() # => {(cat, 1), (cat, 2), (dog, 1)} 25 Notebook https://github.com/Jay-Oh-eN/data-scientists-guide-apache-spark/blob/master/pyspark.ipynb 26 Functional Programming Primer • Functions are applied to data (RDDs) • RDDs are Immutable: f(RDD) -> RDD2 • Function application necessitates creation of new data 27 Review • Client-Server execution model • Spark leverages higher-order functions (map(), filter(), etc.) • Transformations create new RDDs and are lazily evaluated • Actions force materialization of RDD on driver 28 Spark Programming APIs 29 ClusterLocal Spark Context PySpark Py4J Java Spark Context Local File System Spark Worker Spark WorkerSocket Python Python Python Python Python Python Python Python Pipe Python 30 Review • PySpark enables developers to write driver programs in Python • For both of these, closures are serialized and sent to workers • Execution happens in native language (Python/R) of closure Data Science Applications with Spark 32 Acquisition Parse Storage Transform/Explore Vectorization Train Model Expose Presentation Dataframes/Spark SQL HDFS MLlib/spark.ml Spark Streaming PySpark Data PipelineAt Scale model.save() We are Here 33 What Is Exploratory Data Analysis? • Developed at Bell Labs in the 1960’s by John Tukey • Techniques used to visualize and summarize data • Five-number summary: describe() • Distributions: box plots, stem and leaf, histogram, scatterplot 34 Goals of Exploratory Data Analysis • Gain greater intuition • Validate our data (consistency and completeness) • Make comparisons between distributions • Find outliers • Treat missing data • Summarize data (a statistic -> one number that represents many #'s) 35 Case Study: DonorsChoose.org 36 http://data.donorschoose.org/open-data/overview/ 37 Unique Values • rdd.distinct() • rdd.countApproxDistinct(relative_accuracy) http://content.research.neustar.biz/blog/hll.html http://dx.doi.org/10.1145/2452376.2452456 38 Missing Values https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameNaFunctions • column.isNull() • dataframe.fillna() … 39 Missing Values 40 Frequently Occurring Values • dataframe.freqItems(columns, support) http://dl.acm.org/citation.cfm?doid=762471.762473 required minimum proportion of rows Note: this is an approximate algorithm that always returns all the frequent items, but may contain false positives. Summary Statistics • dataframe.describe(column_name) 41 Interlude: Sometimes numbers aren’t enough! 42 Anscombe’s Quartet 4 6 8 12 16 4 6 8 10 12 x1 y 1 4 6 8 12 16 4 6 8 10 12 x2 y 2 4 6 8 12 16 4 6 8 10 12 x3 y 3 4 6 8 12 16 4 6 8 10 12 x4 y 4 Mean (x) 9 Sample Variance (x) 11 Mean (y) 7.50 Sample Variance (y) 4.127 Correlation 0.816 Linear Regression y = 3.00 + 0.500x 43 44 45 46 Notebook https://github.com/Jay-Oh-eN/data-scientists-guide-apache-spark/blob/master/donors_choose_eda.ipynb Review 47 • The data science process is inherently interactive • Spans many scales of data and computation • Data pipelines require linking many diverse tasks (and data) • Quick insights necessary for fast iteration How Spark can help • Interactive REPL • Rapid computation (especially aggregates) on large amounts of data • High level abstractions for efficient querying of data • “Condense” data for easier local exploration and visualization 48 Natural Language Processing 50 Acquisition Parse Storage Transform/Explore Vectorization Train Model Expose Presentation Dataframes/Spark SQL HDFS MLlib/spark.ml Spark Streaming PySpark Data PipelineAt Scale model.save() We are Here 51 Natural Language Processing [1, 3, 1, 1, 2, 0, 1, 0] [0, 1, 4, 0, 0, 1, 1, 1] [3, 0, 1, 1, 2, 2, 3, 2] [0, 1, 1, 1, 0, 3, 2, 3] [1, 2, 1, 2, 2, 0, 0, 0] [1, 0, 1, 1, 0, 1, 1, 1] [0, 2, 0, 0, 2, 2, 0, 0] [1, 1, 1, 1, 0, 1, 1, 1] 52 DonorsChoose: Project Essays 53 Bag of Words • Document: Single row of data/corpus • Corpus: Entire set of all documents • Vocabulary: Set of all words in corpus • Vector: Mathematical representation of document (counts of word occurrences) 54 Bag of Words original document dictionary of word counts feature vector The brown fox { “the” : 1, “brown”: 1, “fox” : 1 } [0,0,1,0,1,0,...] brown fox Tokenization Vectorization 55 Tokenization 56 Vectorization 57 With MlLib 58 Vector Space Model By Riclas (Own work) CC BY 3.0 , via Wikimedia Commons Similarity is a measure of “distance” 59 Interlude: How to Scale testing Start small (data) and fast (development) testing Increase size of data set Optimize and productionize PROFIT! $$$ 60 TF-IDF • Measure of discriminatory power of word (feature) • Highest when term occurs many times in a small number of documents • Lowest when term occurs few times in document or many times in corpus • Useful for information retrieval (queries) and keyword extraction (among other things) tf (t,d) = fd (t) | d | idf (t,D) = log( | D | | {d ∈D :t ∈d} |) 61 TF-IDF 62 TF-IDF Most Common Least Common 63 Summarization 64 Scale Up 65 Notebook https://github.com/Jay-Oh-eN/data-scientists-guide-apache-spark/blob/master/natural_language_processing.ipynb 66 Review • We need to represent text as vectors to model documents • The Bag-of-words model uses word counts (tf-idf improves on this) • In vector space, we can compare documents using linear algebra • Spark provides feature transformers to handle text input Word2Vec 68 Vector Space Model Source: deeplearning4j 69 Vector Space Model Source: Vector representation of words. Source: Mikolovov T., et al. : Linguistic Regularities in Continuous Space Word Representations, NAACL 2013. 70 Acquisition Parse Storage Transform/Explore Vectorization Train Model Expose Presentation Dataframes/Spark SQL HDFS MLlib/spark.ml Spark Streaming PySpark Data PipelineAt Scale model.save() We are Here 71 Predict Context Source: https://districtdatalabs.silvrback.com/modern-methods-for-sentiment-analysis 72 doc2vec 73 Notebook https://github.com/Jay-Oh-eN/data-scientists-guide-apache-spark/blob/master/word2vec_search.ipynb Search and Results 75 Machine Learning Pipeline Raw Text Words Tokenization Contextualized Documents Query Search Results Vectorization doc2vec word2vec 76 Machine Learning Pipeline 77 Machine Learning Pipeline 78 Machine Learning Pipeline 79 Machine Learning Pipeline Questions? Thank You! Jonathan Dinu VP of Academic Excellence, Galvanize @clearspandex Appendix: Installing Spark 82 Installation: Requirements • Spark binary (version 1.4.1) • Java JDK 6/7 • Scientific Python (and Jupyter notebook) • py4j • (Optional) IRKernel (for Jupyter) 83 Installation: Requirements • Spark binary (version 1.4.1) • Java JDK 6/7 • Scientific Python (and Jupyter notebook) • py4j • (Optional) IRKernel (for Jupyter) 84 Installation: Requirements NOTE Please do not install Spark with • Homebrew on OSX • Cygwin on Windows 85 Installation: Spark • Find your OS here: http://spark.apache.org/downloads.html • Select “Pre-built for Hadoop 2.4” or earlier under “Choose a package type” • Download the tar package for spark-1.4.1-bin-hadoop1.tgz (If you are not sure pick the latest version.) Make sure you are downloading the binary version, not the source version. 86 Installation: Configuration • Unzip the file and place it at your home directory (/Users/jonathandinu/) • Set PATH: Include the following lines in your ~/.bash_profile (or ~/.bashrc): 87 Installation: Configuration • Unzip the file and place it at your home directory (/Users/jonathandinu/) • Set PATH: Include the following lines in your ~/.bash_profile (or ~/.bashrc): export SPARK_HOME=/full/path/to/your/unzipped/spark/folder export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH 88 Installation: Java JDK • http://www.oracle.com/technetwork/java/javase/downloads/jdk8- downloads-2133151.html • Find download for your OS • Follow install instructions/wizard • Make sure you get JDK instead of JRE 89 Installation: Requirements • Spark binary • Java JDK 6/7 • Scientific Python (and Jupyter notebook) • py4j • (Optional) IRKernel (for Jupyter) 90 Installation: Scientific Python • http://continuum.io/downloads 91 Installation: Scientific Python • http://continuum.io/downloads • Find download for your OS (make sure it is Python 2.7) • Follow install instructions/wizard 92 Installation: Scientific Python • http://continuum.io/downloads • Find download for your OS (make sure it is Python 2.7) • Follow install instructions/wizard To make sure it installed correctly: ipython notebook 93 And finally: pip install py4j 94 Installation: Test It All Out jonathan$ ipython 95 Installation: Requirements • Spark binary • Java JDK 6/7 • Scientific Python (and Jupyter notebook) • py4j • (Optional) IRKernel (for Jupyter) 96 Installation: IRKernel (Jupyter kernel for R) • Make sure R is installed: https://cran.r-project.org/bin/ • Install kernel via R (get into an R shell): install.packages(c('rzmq','repr','IRkernel','IRdisplay'), repos = c('http://irkernel.github.io/', getOption('repos'))) IRkernel::installspec() 97 And in the notebook: # Set this to where Spark is installed Sys.setenv(SPARK_HOME=“/Users/jonathandinu/spark") # This line loads SparkR from the installed directory .libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) https://github.com/apache/spark/tree/master/R Installation: IRKernel (Jupyter kernel for R) 98 Installation: IRKernel (Jupyter kernel for R) And in the notebook: library(SparkR) https://github.com/apache/spark/tree/master/R 99 Installation: IRKernel (Jupyter kernel for R) https://github.com/apache/spark/tree/master/R 100 Note: If for any reason you cannot get Spark installed on your OS following these instructions, Cloudera and Hortonworks provide Linux VMs with Spark installed. • http://www.cloudera.com/content/cloudera/en/downloads/quickstart_vms/cdh-5-4-x.html • http://hortonworks.com/products/hortonworks-sandbox/#install 101 Review • Command-line Spark shell: ./bin/pyspark • Spark module: import pyspark as ps • Jupyter Notebook interface: ipython notebook • Also R support in the notebook (or RStudio)! Spark Deployment Configuring a cluster 103 Spark Deployment Local Mode Cluster Mode • Single threaded: SparkContext(‘local’) • Multi-threaded: SparkContext(‘local[4]’) • Pseudo-distributed cluster • Standalone • Mesos • YARN • Amazon EC2 104 Spark Deployment: Local Mode Advantage Single threaded sequential execution allows easier debugging of program logic Multi-threaded concurrent execution leverages parallelism and allows debugging of coordination Pseudo-distributed cluster distributed execution allows debugging of communication and I/O 105 Standalone • Packaged with Spark core • Great if all you need is a dedicated Spark cluster • Doesn’t support integration with any other applications on a cluster. The Standalone cluster manager also has a high-availability mode that can leverage Apache ZooKeeper to enable standby master nodes. 106 Mesos • General purpose cluster and global resource manager (Spark, Hadoop, MPI, Cassandra, etc.) • Two-level scheduler: enables pluggable scheduler algorithms • Multiple applications can co-locate (like an operating system for a cluster) 107 YARN • Created to scale Hadoop, optimized for Hadoop (stateless batch jobs with long runtimes) • Monolithic scheduler: manages cluster resources as well as schedules jobs • Not well suited for long-running, real-time, or stateful/interactive services (like database queries) 108 EC2 • Launch scripts bundled with Spark • Elastic and ephemeral cluster • Sets up: • Spark • HDFS • Hadoop MR 109 Spark Deployment: Cluster Mode Advantage Standalone Encapsulated cluster manager isolates complexity Mesos global resource manager facilitates multi- tenant and heterogeneous workloads YARN Integrates with existing Hadoop cluster and applications EC2 elastic scalability and ease of setup 110 Pseudo-distributed local clusterMaster Standalone Scheduler Web UI (monitoring/logging) Worker Node Executor cache Task Task Worker Node Executor cache Task Task Laptop 111 Pseudo-distributed local cluster ${SPARK_HOME}/bin/spark-class org.apache.spark.deploy.master.Master \ -h 127.0.0.1 \ -p 7077 \ --webui-port 8080 Master ${SPARK_HOME}/bin/spark-class org.apache.spark.deploy.worker.Worker \ -c 1 \ -m 1G \ spark://127.0.0.1:7077 Workers (x 2) 112 Pseudo-distributed local cluster One Master + two Workers: Run each process in separate terminal window 113 EC2: Setup 1. Create AWS account: https://aws.amazon.com/account/ 2. Get Access keys: 114 1. Include the following lines in your ~/.bash_profile (or ~/.bashrc): 2. Download EC2 keypair: EC2: Setup export AWS_ACCESS_KEY_ID=xxxxxxx export AWS_SECRET_ACCESS_KEY=xxxxxx 115 1. Launch EC2 cluster with script in $SPARK_HOME/ec2: EC2: Launch ./spark-ec2 -k keyname -i ~/.ssh/keyname.pem --copy-aws-credentials --instance-type=m1.large -m m1.large -s 19 launch spark 116 117 EC2: Scripts ./spark-ec2 -k keyname -i ~/.ssh/keyname.pem login spark Login to the Master ./spark-ec2 stop spark Stop cluster ./spark-ec2 destroy spark Terminate Cluster ./spark-ec2 -k keyname -i ~/.ssh/keyname.pem start spark Restart cluster (after stopped) 118 Setup IPython/Jupyter ./spark-ec2 -k keyname -i ~/.ssh/keyname.pem login spark Login to the Master Installed needed packages (on master) # Install all the necessary packages on Master yum install -y tmux yum install -y pssh yum install -y python27 python27-devel yum install -y freetype-devel libpng-devel wget https://bitbucket.org/pypa/setuptools/raw/bootstrap/ez_setup.py -O - | python27 easy_install-2.7 pip easy_install py4j pip2.7 install ipython==2.0.0 pip2.7 install pyzmq==14.6.0 pip2.7 install jinja2==2.7.3 pip2.7 install tornado==4.2 pip2.7 install numpy pip2.7 install matplotlib pip2.7 install nltk 119 Setup IPython/Jupyter ./spark-ec2 -k keyname -i ~/.ssh/keyname.pem login spark Login to the Master Installed needed packages (on workers) # Install all the necessary packages on Workers pssh -h /root/spark-ec2/slaves yum install -y python27 python27-devel pssh -h /root/spark-ec2/slaves "wget https://bitbucket.org/pypa/setuptools/raw/bootstrap/ez_setup.py -O - | python27" pssh -h /root/spark-ec2/slaves easy_install-2.7 pip pssh -t 10000 -h /root/spark-ec2/slaves pip2.7 install numpy pssh -h /root/spark-ec2/slaves pip2.7 install nltk 120 Allow inbound requests to enable IPython/Jupyter notebook (WARNING: this will create a security risk however) 121 IPython/Jupyter Profile ./spark-ec2 -k keyname -i ~/.ssh/keyname.pem login spark Login to the Master Set notebook password ipython profile create default
 python -c "from IPython.lib import passwd; print passwd()" \ > /root/.ipython/profile_default/nbpasswd.txt cat /root/.ipython/profile_default/nbpasswd.txt # sha1:128de302ca73:6b9a8bd5bhjde33d48cd65ad9cafb0770c13c9df 122 Configure IPython/Jupyter Settings /root/.ipython/profile_default/ipython_notebook_config.py: # Configuration file for ipython-notebook. c = get_config() # Notebook config c.NotebookApp.ip = '*' c.NotebookApp.open_browser = False # It is a good idea to put it on a known, fixed port c.NotebookApp.port = 8888 PWDFILE="/root/.ipython/profile_default/nbpasswd.txt" c.NotebookApp.password = open(PWDFILE).read().strip() 123 Configure IPython/Jupyter Settings /root/.ipython/profile_default/startup/pyspark.py: # Configure the necessary Spark environment import os os.environ['SPARK_HOME'] = '/root/spark/' # And Python path import sys sys.path.insert(0, '/root/spark/python') # Detect the PySpark URL CLUSTER_URL = open('/root/spark-ec2/cluster-url').read().strip() 124 Configure IPython/Jupyter Settings Add the following to /root/spark/conf/spark-env.sh: export PYSPARK_PYTHON=python2.7 Sync across workers: ~/spark-ec2/copy-dir /root/spark/conf Make sure master’s env is correct source /root/spark/conf/spark-env.sh 125 IPython/Jupyter initialization (on master) Start remote window manager (screen or tmux): ipython notebook screen Start notebook server: Ctrl-a d Detach from session: 126 IPython/Jupyter login On your laptop: http://[YOUR MASTER IP/DNS HERE]:8888 127 IPython/Jupyter login Test that it all works: 128 EC2: Data /root/ephemeral-hdfs HDFS (ephemeral) s3n://bucket_name Amazon S3 /root/persistent-hdfs HDFS (persistent) 129 Review • Local mode or cluster mode each have their benefits • Spark can be run on a variety of cluster managers • Amazon EC2 enables elastic scaling and ease of development • By leveraging IPython/Jupyter you can get the performance of a cluster with the ease of interactive development
还剩128页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 8 金币 [ 分享pdf获得金币 ] 0 人已下载

下载pdf

pdf贡献者

cpgc

贡献于2015-12-14

下载需要 8 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf