基于Spark Graphx的大规模用户图计算和应用


基于Graphx的大规模用户图计算 淘宝技术部——数据挖掘与计算 明风 目录 Graphx简介和特性 图计算场景 整体模型,流程和算法 调优与改进 性能和技巧 总结 Graphx的发展 0.5 • 2012-10-24 • Bagel 0.8 • 2013-09-25 • Graphx-Branch 0.9 • 2014-03-03 • Graphx-Alpha 特性 • 离线 • 批量 • 同步 竞争对手 • GraphLab • Giraph 合作伴侣 • Neo4j • Titan Graphx架构 Pregel GraphLab Graph ConnectedCo mponents SVDPlusPlus PageRank TriangleCount VertexRDD EdgeRDD RDD[EdgeTriplet] RoutingTable ReplicatedVertexView GraphImpl StronglyConnectedCo mponents GraphOps MessageToPartition PartitionStrategy EdgePartition VertexPartition Edge EdgeTriplet 算法 模型 实现 关键点 3个核心的RDD Vertices Edges Triplets 3个特性 不变性——Immutable 分布性——Distributed 容错性——Fault-Tolerant 边分割和点分割 • VertexPartition & EdgePartition • Routing Table Part. 2 Part. 1 Vertex Table (RDD) B C A D F E A D Property Graph Edge Table (RDD) A B A C C D B C A E A F E F E D B C D E A F Routing Table (RDD) B C D E A F 1 2 1 2 1 2 1 2 2D Vertex Cut Heuristic 点分割实现 Graphx主要的接口 基本信息接口 • numEdges • numVertices • degrees(in/out) 转换操作 • mapVertice s • mapEdges • mapTriplet s 结构操作 • reverse • subgraph • mask • groupEdges 联边聚合接口 • mapReduceTriplets • collectNeighbors 缓存操作 • cache • unpersistVertices 重要接口 mapReduceTriplets attr11 dstAttr1 val sumWeight: VertexRDD[Double] = g.mapReduceTriplets( ) map (srcId1, weight11) (srcId1, weight12) (srcId1, weight13) (srcId2, weight21) (srcId2, weight22) (srcId2, weight23) reduce VertexRDD[Double] (srcId1,sumW1) …… (srcId2,sumW2) …… attr13 dstAttr3 srcAttr1 attr12 dstAttr2 MAP triplet => { Array((triplet.srcAttr.id, triplet.attr.weight)).iterator }, REDUCE (a, b) => a + b 用户图计算的场景 基于度分布的中枢节点发现 基于最大连通图的社区发现 基于三角形计数的关系衡量 基于随机游走的用户属性传播 …… 用户能量模型 Supervised Random Walks: Predicting and Recommending Links in Social Network Edge{f1,f2……weight} Vertex{ tScore,dScore, weights[], pScores[] } 模型设计 1. 点能量传播 2. 训练集求AUC 3. AUC求偏导 4. 更新边的权重 正能量,负能量都会向周围的点传播,而能量从点i到点j传播 的比例是:边(j-i)在点i的所有边的权重和的占比 Aij = (1+e-w(xij-q ))-1 j (1+e-w×(xij-q ))-1å 使用新的权重调节因子,对图的所有边的权重,再做一次更新 S(x;b) = 1 1+e-bX 标签集能量传播之后,在训练集上计算AUC AUC(w,q)= iÎPT jÎNTåå S(P(i)-P(j)) | PT || NT | 对AUC求偏导,得到每个维度的权重和偏移量 ¶AUC(w,q) ¶w = iÎPT jÎNT ¶S(dij ) dij ¶P(i) ¶w - ¶P(j) ¶w æ èç ö ø÷åå PTNT 模型训练 init Graph tr cw br fr cs pw ps sw ss Weight Adjustor partialDerivativeAUC PT NT PTT NTT derivative_AUC_cw derivative_AUC_cs derivative_AUC_pw derivative_AUC_ps derivative_AUC_sw derivative_AUC_ss AUC 训练次数:3+ 每次训练Pregel次数:2+6 每次平均迭代:15 360 基于Pregel的随机游走 g message newMessage newVerts new G g i0 mapReduceTriplets message innerJoin outerJoinVertices mapReduceTriplets diffVerts diffPer < threshold 内存释放:unpersistVertices 3个函数 • 点消息处理 vprog{ v.score = msg + (1 - ALPHA) * v.weight } • 发送消息 sendMsg{ (destId, ALPHA * edgeTriplet.srcAttr.score * edgeTriplet.attr.weight) } • 消息合并 mergeMsg{ v1+v2 } Graphx的性能参考 0 50 100 2000w 2亿 20亿 秒 单次Pregel迭代性能 Edges 运行环境 200 Worker 5 Core/Per Worker 30G/Per Worker Graphx使用技巧 循序渐进 使用Sample,逐步增大数据量 从边生成点,再去关联点属性 val edgeRdd = loadEdges(edgefile).sample(false, 0.01, 1) val verticesRdd= extractVerticesFromEdge(edgeRdd) Graph(verticesRdd, edgeRdd) Graphx使用技巧 释放内存 保留旧图的引用,并尽快释放 prevG = g g = g.outerJoinVertices(newVerts) {……}.cache() (……do some action to g……) prevG.unpersistVertices(blocking=false) Graphx使用技巧 异常定位 • 加入带时间点的count,作为调试锚点 g = g.outerJoinVertices(newVerts) {……}.cache() if (debug_mode){ logWithTime(g.vertices.count()) } Graphx的配置建议 spark.default.parallelism num-workers * worker-cores master-memory spark.storage.memoryFraction=0.5 加入我们 • 我们需要 • 复杂网络算法师 • Spark攻城师 联系方式 • 微博: @明风Andy 谢谢 Q&A
还剩21页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 6 金币 [ 分享pdf获得金币 ] 0 人已下载

下载pdf