Julia 并行計算

2018-08-12 21:26 更新

并行計算

Julia 提供了一個基于消息傳遞的多處理器環(huán)境,能夠同時在多處理器上使用獨(dú)立的內(nèi)存空間運(yùn)行程序。

Julia 的消息傳遞與 MPI [1] 等環(huán)境不同。Julia 中的通信是“單邊”的,即程序員只需要管理雙處理器運(yùn)算中的一個處理器即可。

Julia 中的并行編程基于兩個原語:remote referencesremote calls 。remote reference 對象,用于從任意的處理器,查閱指定處理器上存儲的對象。 remote call 請求,用于一個處理器對另一個(也有可能是同一個)處理器調(diào)用某個函數(shù)處理某些參數(shù)。 remote call 返回 remote reference 對象。 remote call 是立即返回的;調(diào)用它的處理器繼續(xù)執(zhí)行下一步操作,而 remote call 繼續(xù)在某處執(zhí)行。可以對 remote reference 調(diào)用 wait ,以等待 remote call 執(zhí)行完畢,然后通過 fetch 獲取結(jié)果的完整值。使用 put 可將值存儲到 remote reference 。

通過 julia -p n 啟動,可以在本地機(jī)器上提供 n 個處理器。一般 n 等于機(jī)器上 CPU 內(nèi)核個數(shù):

$ ./julia -p 2

julia> r = remotecall(2, rand, 2, 2)
RemoteRef(2,1,5)

julia> fetch(r)
2x2 Float64 Array:
 0.60401   0.501111
 0.174572  0.157411

julia> s = @spawnat 2 1 .+ fetch(r)
RemoteRef(2,1,7)

julia> fetch(s)
2x2 Float64 Array:
 1.60401  1.50111
 1.17457  1.15741

remote_call 的第一個參數(shù)是要進(jìn)行這個運(yùn)算的處理器索引值。Julia 中大部分并行編程不查詢特定的處理器或可用處理器的個數(shù),但可認(rèn)為 remote_call 是個為精細(xì)控制所提供的低級接口。第二個參數(shù)是要調(diào)用的函數(shù),剩下的參數(shù)是該函數(shù)的參數(shù)。此例中,我們先讓處理器 2 構(gòu)造一個 2x2 的隨機(jī)矩陣,然后我們在結(jié)果上加 1 。兩個計算的結(jié)果保存在兩個 remote reference 中,即 rs 。 @spawnat 宏在由第一個參數(shù)指明的處理器上,計算第二個參數(shù)中的表達(dá)式。

remote_call_fetch 函數(shù)可以立即獲取要在遠(yuǎn)端計算的值。它等價于 fetch(remote_call(...)) ,但比之更高效:

julia> remotecall_fetch(2, getindex, r, 1, 1)
0.10824216411304866

getindex(r,1,1) :ref:等價于 <man-array-indexing> r[1,1] ,因此,這個調(diào)用獲取 remote reference 對象 r 的第一個元素。

remote_call 語法不太方便。 @spawn 宏簡化了這件事兒,它對表達(dá)式而非函數(shù)進(jìn)行操作,并自動選取在哪兒進(jìn)行計算:

julia> r = @spawn rand(2,2)
RemoteRef(1,1,0)

julia> s = @spawn 1 .+ fetch(r)
RemoteRef(1,1,1)

julia> fetch(s)
1.10824216411304866 1.13798233877923116
1.12376292706355074 1.18750497916607167

注意,此處用 1 .+ fetch(r) 而不是 1 .+ r 。這是因為我們不知道代碼在何處運(yùn)行,而 fetch 會將需要的 r 移到做加法的處理器上。此例中, @spawn 很聰明,它知道在有 r 對象的處理器上進(jìn)行計算,因而 fetch 將不做任何操作。

@spawn 不是內(nèi)置函數(shù),而是 Julia 定義的 :ref:宏 <man-macros>

所有執(zhí)行程序代碼的處理器上,都必須能獲得程序代碼。例如,輸入:

julia> function rand2(dims...)
         return 2*rand(dims...)
       end

julia> rand2(2,2)
2x2 Float64 Array:
 0.153756  0.368514
 1.15119   0.918912

julia> @spawn rand2(2,2)
RemoteRef(1,1,1)

julia> @spawn rand2(2,2)
RemoteRef(2,1,2)

julia> exception on 2: in anonymous: rand2 not defined 

進(jìn)程 1 知道 rand2 函數(shù),但進(jìn)程 2 不知道。 require 函數(shù)自動在當(dāng)前所有可用的處理器上載入源文件,使所有的處理器都能運(yùn)行代碼:

julia> require("myfile")

在集群中,文件(及遞歸載入的任何文件)的內(nèi)容會被發(fā)送到整個網(wǎng)絡(luò)??梢允褂?@everywhere 宏在所有處理器上執(zhí)行命令:

julia> @everywhere id = myid()

julia> remotecall_fetch(2, ()->id)
2

@everywhere include("defs.jl")

文件也可以在多個進(jìn)程啟動時預(yù)加載,并且一個驅(qū)動腳本可以用于驅(qū)動計算:

    julia -p <n> -L file1.jl -L file2.jl driver.jl

每個進(jìn)程都有一個關(guān)聯(lián)的標(biāo)識符。這個過程提供的 Julia 提示總是有一個 id 值為 1 ,就如上面例子中 julia 進(jìn)程會運(yùn)行驅(qū)動腳本一樣。這個被默認(rèn)用作平行操作的進(jìn)程被稱為 workers。當(dāng)只有一個進(jìn)程的時候,進(jìn)程 1 就被當(dāng)做一個 worker。否則,worker 就是指除了進(jìn)程 1 之外的所有進(jìn)程。

Julia 內(nèi)置有對于兩種集群的支持:

  • 如上文所示,一個本地集群指定使用 —p 選項。
  • 一個集群生成機(jī)器使用 --machinefile 選項。它使用一個無密碼的 ssh 登來在指定的機(jī)器上啟動 julia 工作進(jìn)程(以相同的路徑作為當(dāng)前主機(jī))。

函數(shù) addprocs,rmprocs,workers,當(dāng)然還有其他的在一個集群中可用的以可編程的方式進(jìn)行添加,刪除和查詢的函數(shù)。

其他類型的集群可以通過編寫自己的自定義 ClusterManager。請參閱 ClusterManagers 部分。

數(shù)據(jù)移動

并行計算中,消息傳遞和數(shù)據(jù)移動是最大的開銷。減少這兩者的數(shù)量,對性能至關(guān)重要。

fetch 是顯式的數(shù)據(jù)移動操作,它直接要求將對象移動到當(dāng)前機(jī)器。 @spawn (及相關(guān)宏)也進(jìn)行數(shù)據(jù)移動,但不是顯式的,因而被稱為隱式數(shù)據(jù)移動操作。對比如下兩種構(gòu)造隨機(jī)矩陣并計算其平方的方法: :

    # method 1
    A = rand(1000,1000)
    Bref = @spawn A^2
    ...
    fetch(Bref)

    # method 2
    Bref = @spawn rand(1000,1000)^2
    ...
    fetch(Bref)

方法 1 中,本地構(gòu)造了一個隨機(jī)矩陣,然后將其傳遞給做平方計算的處理器。方法 2 中,在同一處理器構(gòu)造隨機(jī)矩陣并進(jìn)行平方計算。因此,方法 2 比方法 1 移動的數(shù)據(jù)少得多。

并行映射和循環(huán)

大部分并行計算不需要移動數(shù)據(jù)。最常見的是蒙特卡羅仿真。下例使用 @spawn 在兩個處理器上仿真投硬幣。先在 count_heads.jl 中寫如下函數(shù):

    function count_heads(n)
        c::Int = 0
        for i=1:n
            c += randbool()
        end
        c
    end

在兩臺機(jī)器上做仿真,最后將結(jié)果加起來:

    require("count_heads")

    a = @spawn count_heads(100000000)
    b = @spawn count_heads(100000000)
    fetch(a)+fetch(b)

在多處理器上獨(dú)立地進(jìn)行迭代運(yùn)算,然后用一些函數(shù)把它們的結(jié)果綜合起來。綜合的過程稱為 約簡 。

上例中,我們顯式調(diào)用了兩個 @spawn 語句,它將并行計算限制在兩個處理器上。要在任意個數(shù)的處理器上運(yùn)行,應(yīng)使用 并行 for 循環(huán) ,它在 Julia 中應(yīng)寫為:

    nheads = @parallel (+) for i=1:200000000
      int(randbool())
    end

這個構(gòu)造實(shí)現(xiàn)了給多處理器分配迭代的模式,并且使用特定約簡來綜合結(jié)果(此例中為 (+) )。

注意,盡管并行 for 循環(huán)看起來和一組 for 循環(huán)差不多,但它們的行為有很大區(qū)別。第一,循環(huán)不是按順序進(jìn)行的。第二,寫進(jìn)變量或數(shù)組的值不是全局可見的,因為迭代運(yùn)行在不同的處理器上。并行循環(huán)內(nèi)使用的所有變量都會被復(fù)制、廣播到每個處理器。

下列代碼并不會按照預(yù)想運(yùn)行:

    a = zeros(100000)
    @parallel for i=1:100000
      a[i] = i
    end

如果不需要,可以省略約簡運(yùn)算符。但此代碼不會初始化 a 的所有元素,因為每個處理器上都只有獨(dú)立的一份兒。應(yīng)避免類似的并行 for 循環(huán)。但是我們可以使用分布式數(shù)組來規(guī)避這種情形,后面我們會講。

如果“外部”變量是只讀的,就可以在并行循環(huán)中使用它:

    a = randn(1000)
    @parallel (+) for i=1:100000
      f(a[randi(end)])
    end

有時我們不需要約簡,僅希望將函數(shù)應(yīng)用到某個范圍的整數(shù)(或某個集合的元素)上。這時可以使用 并行映射 pmap 函數(shù)。下例中并行計算幾個大隨機(jī)矩陣的奇異值:

    M = {rand(1000,1000) for i=1:10}
    pmap(svd, M)

被調(diào)用的函數(shù)需處理大量工作時使用 pmap ,反之,則使用 @parallel for 。

與遠(yuǎn)程引用同步

調(diào)度

Julia 的平行編程平臺使用任務(wù)(也成為協(xié)程) ,其可在多個計算中切換。每當(dāng)代碼執(zhí)行一個通信操作,例如 fetch 或者 wait,當(dāng)前任務(wù)便暫停同時調(diào)度器會選擇另一個任務(wù)運(yùn)行。在事件等待完成后,任務(wù)會重新啟動。

對于很多問題,沒必要直接考慮任務(wù)。然而,由于提供了動態(tài)調(diào)度,可以同時等待多個事件。在動態(tài)調(diào)度中,一個程序決定計算什么和在哪計算,這是基于其他工作何時完成的。這是被不可預(yù)知的或不可平衡的工作荷載所需要的,只有當(dāng)他們結(jié)束當(dāng)前任務(wù)我們才能分配更多的工作進(jìn)程。

作為一個例子,考慮計算不同大小的矩陣的奇異值:

    M = {rand(800,800), rand(600,600), rand(800,800), rand(600,600)}
    pmap(svd, M)

如果一個進(jìn)程要處理 800 x 800 矩陣和另一個 600 x 600 矩陣,我們不會得到很多的可伸縮性。解決方案是讓本地的任務(wù)在他們完成當(dāng)前的任務(wù)時去“喂”每個進(jìn)程中的工作。pmap 的實(shí)現(xiàn)過程中可以看到這個:

    function pmap(f, lst)
        np = nprocs()  # determine the number of processes available
        n = length(lst)
        results = cell(n)
        i = 1
        # function to produce the next work item from the queue.
        # in this case it's just an index.
        nextidx() = (idx=i; i+=1; idx)
        @sync begin
            for p=1:np
                if p != myid() || np == 1 
                    @async begin
                        while true
                            idx = nextidx()
                            if idx > n
                                break
                            end
                            results[idx] = remotecall_fetch(p, f, lst[idx])
                        end
                    end
                end
            end
        end
        results
    end

只有在本地運(yùn)行任務(wù)的過程中,@async 才與 @spawn 類似。我們使用它來為每個流程創(chuàng)建一個“供給”的任務(wù)。每個任務(wù)選擇下一個需要被計算的指數(shù),然后等待它的進(jìn)程完成,接著一直重復(fù)到用完指數(shù)。注意,“供給”任務(wù)只有當(dāng)主要任務(wù)到達(dá) @sync 塊結(jié)束時才開始執(zhí)行,此時它放棄控制并等待所有的本地任務(wù)在從函數(shù)返回之前完成。供給任務(wù)可以通過 nextidx() 共享狀態(tài),因為它們都在相同的進(jìn)程上運(yùn)行。這個過程不需要鎖定,因為線程是實(shí)時進(jìn)行調(diào)度的而不是一成不變。這意味著內(nèi)容的切換只發(fā)生在定義好的時候:在這種情況下,當(dāng) remotecall_fetch 會被調(diào)用。

分布式數(shù)組

并行計算綜合使用多個機(jī)器上的內(nèi)存資源,因而可以使用在一個機(jī)器上不能實(shí)現(xiàn)的大數(shù)組。這時,可使用分布式數(shù)組,每個處理器僅對它所擁有的那部分?jǐn)?shù)組進(jìn)行操作。

分布式數(shù)組(或 全局對象 )邏輯上是個單數(shù)組,但它分為很多塊兒,每個處理器上保存一塊兒。但對整個數(shù)組的運(yùn)算與在本地數(shù)組的運(yùn)算是一樣的,并行計算是隱藏的。

分布式數(shù)組是用 DArray 類型來實(shí)現(xiàn)的。 DArray 的元素類型和維度與 Array 一樣。 DArray 的數(shù)據(jù)的分布,是這樣實(shí)現(xiàn)的:它把索引空間在每個維度都分成一些小塊。

一些常用分布式數(shù)組可以使用 d 開頭的函數(shù)來構(gòu)造:

    dzeros(100,100,10)
    dones(100,100,10)
    drand(100,100,10)
    drandn(100,100,10)
    dfill(x, 100,100,10)

最后一個例子中,數(shù)組的元素由值 x 來初始化。這些函數(shù)自動選取某個分布。如果要指明使用哪個進(jìn)程,如何分布數(shù)據(jù),應(yīng)這樣寫:

    dzeros((100,100), [1:4], [1,4])

第二個參數(shù)指定了數(shù)組應(yīng)該在處理器 1 到 4 中創(chuàng)建。劃分含有很多進(jìn)程的數(shù)據(jù)時,人們經(jīng)??吹叫阅苁找孢f減。把 DArrays 放在一個進(jìn)程的子集中,該進(jìn)程允許多個 DArray 同時計算,并且每個進(jìn)程擁有更高比例的通信工作。

第三個參數(shù)指定了一個分布;數(shù)組第 n 個元素指定了應(yīng)該分成多少個塊。在本例中,第一個維度不會分割,而第二個維度將分為四塊。因此每個局部塊的大小為 (100,25)。注意,分布式數(shù)組必須與進(jìn)程數(shù)量相符。

distribute(a::Array) 可用來將本地數(shù)組轉(zhuǎn)換為分布式數(shù)組。

localpart(a::DArray) 可用來獲取 DArray 本地存儲的部分。

localindexes(a::DArray) 返回本地進(jìn)程所存儲的維度索引值范圍多元組。

convert(Array, a::DArray) 將所有數(shù)據(jù)綜合到本地進(jìn)程上。

使用索引值范圍來索引 DArray (方括號)時,會創(chuàng)建 SubArray 對象,但不復(fù)制數(shù)據(jù)。

構(gòu)造分布式數(shù)組

DArray 的構(gòu)造函數(shù)是 darray ,它的聲明如下:

    DArray(init, dims[, procs, dist])

init 函數(shù)的參數(shù),是索引值范圍多元組。這個函數(shù)在本地聲名一塊分布式數(shù)組,并用指定索引值來進(jìn)行初始化。 dims 是整個分布式數(shù)組的維度。 procs 是可選的,指明一個存有要使用的進(jìn)程 ID 的向量 。 dist 是一個整數(shù)向量,指明分布式數(shù)組在每個維度應(yīng)該被分成幾塊。

最后倆參數(shù)是可選的,忽略的時候使用默認(rèn)值。

下例演示如果將本地數(shù)組 fill 的構(gòu)造函數(shù)更改為分布式數(shù)組的構(gòu)造函數(shù):

    dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...)

此例中 init 函數(shù)僅對它構(gòu)造的本地塊的維度調(diào)用 fill 。

分布式數(shù)組運(yùn)算

在這個時候,分布式數(shù)組沒有太多的功能。主要功能是通過數(shù)組索引來允許進(jìn)行通信,這對許多問題來說都很方便。作為一個例子,考慮實(shí)現(xiàn)“生活”細(xì)胞自動機(jī),每個單元網(wǎng)格中的細(xì)胞根據(jù)其鄰近的細(xì)胞進(jìn)行更新。每個進(jìn)程需要其本地塊中直接相鄰的細(xì)胞才能計算一個迭代的結(jié)果。下面的代碼可以實(shí)現(xiàn)這個功能:

    function life_step(d::DArray)
        DArray(size(d),procs(d)) do I
            top   = mod(first(I[1])-2,size(d,1))+1
            bot   = mod( last(I[1])  ,size(d,1))+1
            left  = mod(first(I[2])-2,size(d,2))+1
            right = mod( last(I[2])  ,size(d,2))+1

            old = Array(Bool, length(I[1])+2, length(I[2])+2)
            old[1      , 1      ] = d[top , left]   # left side
            old[2:end-1, 1      ] = d[I[1], left]
            old[end    , 1      ] = d[bot , left]
            old[1      , 2:end-1] = d[top , I[2]]
            old[2:end-1, 2:end-1] = d[I[1], I[2]]   # middle
            old[end    , 2:end-1] = d[bot , I[2]]
            old[1      , end    ] = d[top , right]  # right side
            old[2:end-1, end    ] = d[I[1], right]
            old[end    , end    ] = d[bot , right]

            life_rule(old)
        end
    end

可以看到,我們使用一系列的索引表達(dá)式來獲取一個本地數(shù)組中的數(shù)組 old。注意,do 塊語法方便 init 函數(shù)傳遞給 DArray 構(gòu)造函數(shù)。接下來,連續(xù)函數(shù) life_rule 被調(diào)用以提供數(shù)據(jù)的更新規(guī)則,產(chǎn)生所需的 DArray 塊。 life_ruleDArray-specific 沒有關(guān)系,但為了完整性,我們在此仍將它列出:

    function life_rule(old)
        m, n = size(old)
        new = similar(old, m-2, n-2)
        for j = 2:n-1
            for i = 2:m-1
                nc = +(old[i-1,j-1], old[i-1,j], old[i-1,j+1],
                       old[i  ,j-1],             old[i  ,j+1],
                       old[i+1,j-1], old[i+1,j], old[i+1,j+1])
                new[i-1,j-1] = (nc == 3 || nc == 2 && old[i,j])
            end
        end
        new
    end

共享數(shù)組 (用于試驗, 僅在 unix 上)

共享陣列使用在許多進(jìn)程中共享內(nèi)存來映射相同數(shù)組的系統(tǒng)。雖然與 DArray 有一些相似之處,但是 SharedArray 的行為是完全不同的。在一個 DArray 中,每個進(jìn)程只能本地訪問一塊數(shù)據(jù),并且兩個進(jìn)程共享同一塊;相比之下,在 SharedArray 中,每個“參與”的進(jìn)程能夠訪問整個數(shù)組。當(dāng)你想要在同一臺機(jī)器上大量數(shù)據(jù)共同訪問兩個或兩個以上的進(jìn)程時, SharedArray 是一個不錯的選擇。

SharedArray 索引(分配和訪問值)與常規(guī)數(shù)組一樣工作,并且是非常高效的,因為其底層內(nèi)存可用于本地進(jìn)程。因此,大多數(shù)算法自然地在 SharedArrays 上運(yùn)行,即使在單進(jìn)程模式中。當(dāng)某個算法必須在一個 Array 輸入的情況下,可以從 SharedArray 檢索底層數(shù)組通過調(diào)用 sdata(S) 取回。對于其他 AbstractArray 類型, sdata 返回對象本身,所以在任何數(shù)組類型下使用 sdata 都是很安全的。

共享數(shù)字構(gòu)造函數(shù)數(shù)的形式:

  SharedArray(T::Type, dims::NTuple; init=false, pids=Int[])

創(chuàng)建一個被 pids 進(jìn)程指定的,bitstype 為 T 并且大小為 dims 的共享數(shù)組。與分布式陣列不同,共享數(shù)組只能用于這些參與人員指定的以 pid 命名的參數(shù)(如果在同一個主機(jī)上,創(chuàng)建過程也同樣如此)。

如果一個簽名為 initfn(S::SharedArray)init 函數(shù)被指定,它會被所有參與人員調(diào)用。你可以控制它,每個工人可以在數(shù)組的不同部分運(yùn)行 init 函數(shù),因此進(jìn)行并行的初始化。

這里有一個簡單的例子:

  julia> addprocs(3)
  3-element Array{Any,1}:
   2
   3
   4

  julia> S = SharedArray(Int, (3,4), init = S -> S[localindexes(S)] = myid())
  3x4 SharedArray{Int64,2}:
   2  2  3  4
   2  3  3  4
   2  3  4  4

  julia> S[3,2] = 7
  7

  julia> S
  3x4 SharedArray{Int64,2}:
   2  2  3  4
   2  3  3  4
   2  7  4  4

localindexes 提供不相交的一維索引的范圍,它有時方便進(jìn)程之間的任務(wù)交流。當(dāng)然,你可以按你希望的方式來劃分工作:

  julia> S = SharedArray(Int, (3,4), init = S -> S[myid()-1:nworkers():length(S)] = myid())
  3x4 SharedArray{Int64,2}:
   2  2  2  2
   3  3  3  3
   4  4  4  4

因為所有進(jìn)程都可以訪問底層數(shù)據(jù),你必須小心不要設(shè)置沖突。例如:

  @sync begin
      for p in workers()
          @async begin
              remotecall_wait(p, fill!, S, p)
          end
      end
  end

這有可能導(dǎo)致未定義的行為:因為每個進(jìn)程有他自己的 pid 來充滿整個數(shù)組,無論最后執(zhí)行的是哪一個進(jìn)程(任何特定元素 S)都將保留他的 pid。

ClusterManagers

Julia 工作進(jìn)程也可以在任意機(jī)器中產(chǎn)生,讓 Julia 的自然并行功能非常透明地在集群環(huán)境中運(yùn)行。ClusterManager 接口提供了一種方法來指定啟動和管理工作進(jìn)程的手段。例如, ssh 集群也使用 ClusterManager 來實(shí)現(xiàn):

    immutable SSHManager <: ClusterManager
        launch::Function
        manage::Function
        machines::AbstractVector

        SSHManager(; machines=[]) = new(launch_ssh_workers, manage_ssh_workers, machines)
    end

    function launch_ssh_workers(cman::SSHManager, np::Integer, config::Dict)
        ...
    end

    function manage_ssh_workers(id::Integer, config::Dict, op::Symbol)
        ...
    end

launch_ssh_workers 負(fù)責(zé)實(shí)例化新的 Julia 進(jìn)程并且 manage_ssh_workers 提供了一種方法來管理這些進(jìn)程,例如發(fā)送中斷信號。在運(yùn)行時可以使用 addprocs 添加新進(jìn)程:

    addprocs(5, cman=LocalManager())

來指定添加一批進(jìn)程并且 ClusterManager 用于啟動這些進(jìn)程。

腳注

[1]:在這邊文中, MPI 是指 MPI-1 標(biāo)準(zhǔn)。從 MPI-2 開始,MPI 標(biāo)準(zhǔn)委員會引入了一系列新的通信機(jī)制,統(tǒng)稱為遠(yuǎn)程內(nèi)存訪問 (RMA) 。添加 RMA MPI 標(biāo)準(zhǔn)的動機(jī)是改善單方面的溝通模式。最新的 MPI 標(biāo)準(zhǔn)的更多信息,參見 http://www.mpi-forum.org/docs

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號