使用Spark DataFrame进行大数据处理

jopen 8年前

简介

    DataFrame让Spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,计算性能更还快了两倍。这一个小小的API,隐含着Spark希望大一统「大数据江湖」的野心和决心。DataFrame像是一条联结所有主流数据源并自动转化为可并行处理格式的水渠,通过它Spark能取悦大数据生态链上的所有玩家,无论是善用R的数据科学家,惯用SQL的商业分析师,还是在意效率和实时性的统计工程师。

例子说明

    提供了将结构化数据为DataFrame并注册为表,使用SQL查询的例子

    提供了从RMDB中读取数据为DataFrame的例子

    提供了将数据写入到RMDB中的例子

代码样例

import scala.collection.mutable.ArrayBuffer  import scala.io.Source  import java.io.PrintWriter  import util.control.Breaks._  import org.apache.spark.SparkContext  import org.apache.spark.sql.SQLContext  import java.sql.DriverManager  import java.sql.PreparedStatement  import java.sql.Connection  import org.apache.spark.sql.types.IntegerType  import org.apache.spark.sql.types.StructType  import org.apache.spark.sql.types.StructField  import org.apache.spark.sql.types.StringType  import org.apache.spark.sql.Row  import java.util.Properties  import org.apache.spark.sql.SaveMode    object SimpleDemo extends App {    val sc = new SparkContext("local[*]", "test")    val sqlc = new SQLContext(sc)    val driverUrl = "jdbc:mysql://ip:3306/ding?user=root&password=root&zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8"    val tableName = "tbaclusterresult"      //把数据转化为DataFrame,并注册为一个表    val df = sqlc.read.json("G:/data/json.txt")    df.registerTempTable("user")    val res = sqlc.sql("select * from user")    println(res.count() + "---------------------------")    res.collect().map { row =>      {        println(row.toString())      }    }      //从MYSQL读取数据    val jdbcDF = sqlc.read      .options(Map("url" -> driverUrl,        //      "user" -> "root",        //      "password" -> "root",        "dbtable" -> tableName))      .format("jdbc")      .load()    println(jdbcDF.count() + "---------------------------")    jdbcDF.collect().map { row =>      {        println(row.toString())      }    }      //插入数据至MYSQL    val schema = StructType(      StructField("name", StringType) ::        StructField("age", IntegerType)        :: Nil)      val data1 = sc.parallelize(List(("blog1", 301), ("iteblog", 29),      ("com", 40), ("bt", 33), ("www", 23))).      map(item => Row.apply(item._1, item._2))    import sqlc.implicits._    val df1 = sqlc.createDataFrame(data1, schema)    //  df1.write.jdbc(driverUrl, "sparktomysql", new Properties)    df1.write.mode(SaveMode.Overwrite).jdbc(driverUrl, "testtable", new Properties)      //DataFrame类中还有insertIntoJDBC方法,调用该函数必须保证表事先存在,它只用于插入数据,函数原型如下:    //def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit      //插入数据到MYSQL    val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))    data.foreachPartition(myFun)      case class Blog(name: String, count: Int)      def myFun(iterator: Iterator[(String, Int)]): Unit = {      var conn: Connection = null      var ps: PreparedStatement = null      val sql = "insert into blog(name, count) values (?, ?)"      try {        conn = DriverManager.getConnection(driverUrl, "root", "root")        iterator.foreach(data => {          ps = conn.prepareStatement(sql)          ps.setString(1, data._1)          ps.setInt(2, data._2)          ps.executeUpdate()        })      } catch {        case e: Exception => e.printStackTrace()      } finally {        if (ps != null) {          ps.close()        }        if (conn != null) {          conn.close()        }      }    }  }

来自: http://my.oschina.net/cloudcoder/blog/599859