Spark GraphX Pregel API

2018-11-26 16:36 更新

Spark GraphX Pregel API

圖本身是遞歸數(shù)據(jù)結(jié)構(gòu),頂點(diǎn)的屬性依賴于它們鄰居的屬性,這些鄰居的屬性又依賴于自己鄰居的屬性。所以許多重要的圖算法都是迭代的重新計算每個頂點(diǎn)的屬性,直到滿足某個確定的條件。一系列的graph-parallel抽象已經(jīng)被提出來用來表達(dá)這些迭代算法。GraphX公開了一個類似Pregel的操作,它是廣泛使用的Pregel和GraphLab抽象的一個融合。

在GraphX中,更高級的Pregel操作是一個約束到圖拓?fù)涞呐客剑╞ulk-synchronous)并行消息抽象。Pregel操作者執(zhí)行一系列的超級步驟(super steps),在這些步驟中,頂點(diǎn)從之前的超級步驟中接收進(jìn)入(inbound)消息的總和,為頂點(diǎn)屬性計算一個新的值,然后在以后的超級步驟中發(fā)送消息到鄰居頂點(diǎn)。不像Pregel而更像GraphLab,消息作為一個邊三元組的函數(shù)被并行計算,消息計算既訪問了源頂點(diǎn)特征也訪問了目的頂點(diǎn)特征。在超級步中,沒有收到消息的頂點(diǎn)被跳過。當(dāng)沒有消息遺留時,Pregel操作停止迭代并返回最終的圖。

注意,與更標(biāo)準(zhǔn)的Pregel實現(xiàn)不同的是,GraphX中的頂點(diǎn)僅僅能發(fā)送信息給鄰居頂點(diǎn),并利用用戶自定義的消息函數(shù)構(gòu)造消息。這些限制允許在GraphX進(jìn)行額外的優(yōu)化。

一下是 Pregel操作(ClassTag[A]):Graph[VD,ED])的類型簽名以及實現(xiàn)草圖(注意,訪問graph.cache已經(jīng)被刪除)

class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    // Receive the initial message at each vertex
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
    // compute the messages
    var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
    var activeMessages = messages.count()
    // Loop until no messages remain or maxIterations is achieved
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages: -----------------------------------------------------------------------
      // Run the vertex program on all vertices that receive messages
      val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
      // Merge the new vertex values back into the graph
      g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
      // Send Messages: ------------------------------------------------------------------------------
      // Vertices that didn't receive a message above don't appear in newVerts and therefore don't
      // get to send messages.  More precisely the map phase of mapReduceTriplets is only invoked
      // on edges in the activeDir of vertices in newVerts
      messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
      activeMessages = messages.count()
      i += 1
    }
    g
  }
}

注意,pregel有兩個參數(shù)列表(graph.pregel(list1)(list2))。第一個參數(shù)列表包含配置參數(shù)初始消息、最大迭代數(shù)、發(fā)送消息的邊的方向(默認(rèn)是沿邊方向出)。第二個參數(shù)列表包含用戶自定義的函數(shù)用來接收消息(vprog)、計算消息(sendMsg)、合并消息(mergeMsg)。

我們可以用Pregel操作表達(dá)計算單源最短路徑( single source shortest path)。

import org.apache.spark.graphx._
// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Int, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => {  // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  (a,b) => math.min(a,b) // Merge Message
  )
println(sssp.vertices.collect.mkString("\n"))
以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號