Julia 提供了一個基于消息傳遞的多處理器環(huán)境,能夠同時在多處理器上使用獨(dú)立的內(nèi)存空間運(yùn)行程序。
Julia 的消息傳遞與 MPI [1] 等環(huán)境不同。Julia 中的通信是“單邊”的,即程序員只需要管理雙處理器運(yùn)算中的一個處理器即可。
Julia 中的并行編程基于兩個原語:remote references 和 remote calls 。remote reference 對象,用于從任意的處理器,查閱指定處理器上存儲的對象。 remote call 請求,用于一個處理器對另一個(也有可能是同一個)處理器調(diào)用某個函數(shù)處理某些參數(shù)。
remote call 返回 remote reference 對象。 remote call 是立即返回的;調(diào)用它的處理器繼續(xù)執(zhí)行下一步操作,而 remote call 繼續(xù)在某處執(zhí)行。可以對 remote
reference 調(diào)用 wait
,以等待 remote call 執(zhí)行完畢,然后通過 fetch
獲取結(jié)果的完整值。使用 put
可將值存儲到 remote reference 。
通過 julia -p n
啟動,可以在本地機(jī)器上提供 n
個處理器。一般 n
等于機(jī)器上 CPU 內(nèi)核個數(shù):
$ ./julia -p 2
julia> r = remotecall(2, rand, 2, 2)
RemoteRef(2,1,5)
julia> fetch(r)
2x2 Float64 Array:
0.60401 0.501111
0.174572 0.157411
julia> s = @spawnat 2 1 .+ fetch(r)
RemoteRef(2,1,7)
julia> fetch(s)
2x2 Float64 Array:
1.60401 1.50111
1.17457 1.15741
remote_call
的第一個參數(shù)是要進(jìn)行這個運(yùn)算的處理器索引值。Julia 中大部分并行編程不查詢特定的處理器或可用處理器的個數(shù),但可認(rèn)為 remote_call
是個為精細(xì)控制所提供的低級接口。第二個參數(shù)是要調(diào)用的函數(shù),剩下的參數(shù)是該函數(shù)的參數(shù)。此例中,我們先讓處理器 2 構(gòu)造一個 2x2 的隨機(jī)矩陣,然后我們在結(jié)果上加 1 。兩個計算的結(jié)果保存在兩個 remote reference 中,即 r
和 s
。 @spawnat
宏在由第一個參數(shù)指明的處理器上,計算第二個參數(shù)中的表達(dá)式。
remote_call_fetch
函數(shù)可以立即獲取要在遠(yuǎn)端計算的值。它等價于 fetch(remote_call(...))
,但比之更高效:
julia> remotecall_fetch(2, getindex, r, 1, 1)
0.10824216411304866
getindex(r,1,1)
:ref:等價于 <man-array-indexing>
r[1,1]
,因此,這個調(diào)用獲取 remote reference 對象 r
的第一個元素。
remote_call
語法不太方便。 @spawn
宏簡化了這件事兒,它對表達(dá)式而非函數(shù)進(jìn)行操作,并自動選取在哪兒進(jìn)行計算:
julia> r = @spawn rand(2,2)
RemoteRef(1,1,0)
julia> s = @spawn 1 .+ fetch(r)
RemoteRef(1,1,1)
julia> fetch(s)
1.10824216411304866 1.13798233877923116
1.12376292706355074 1.18750497916607167
注意,此處用 1 .+ fetch(r)
而不是 1 .+ r
。這是因為我們不知道代碼在何處運(yùn)行,而 fetch
會將需要的 r
移到做加法的處理器上。此例中, @spawn
很聰明,它知道在有 r
對象的處理器上進(jìn)行計算,因而 fetch
將不做任何操作。
( @spawn
不是內(nèi)置函數(shù),而是 Julia 定義的 :ref:宏 <man-macros>
)
所有執(zhí)行程序代碼的處理器上,都必須能獲得程序代碼。例如,輸入:
julia> function rand2(dims...)
return 2*rand(dims...)
end
julia> rand2(2,2)
2x2 Float64 Array:
0.153756 0.368514
1.15119 0.918912
julia> @spawn rand2(2,2)
RemoteRef(1,1,1)
julia> @spawn rand2(2,2)
RemoteRef(2,1,2)
julia> exception on 2: in anonymous: rand2 not defined
進(jìn)程 1 知道 rand2
函數(shù),但進(jìn)程 2 不知道。 require
函數(shù)自動在當(dāng)前所有可用的處理器上載入源文件,使所有的處理器都能運(yùn)行代碼:
julia> require("myfile")
在集群中,文件(及遞歸載入的任何文件)的內(nèi)容會被發(fā)送到整個網(wǎng)絡(luò)??梢允褂?@everywhere
宏在所有處理器上執(zhí)行命令:
julia> @everywhere id = myid()
julia> remotecall_fetch(2, ()->id)
2
@everywhere include("defs.jl")
文件也可以在多個進(jìn)程啟動時預(yù)加載,并且一個驅(qū)動腳本可以用于驅(qū)動計算:
julia -p <n> -L file1.jl -L file2.jl driver.jl
每個進(jìn)程都有一個關(guān)聯(lián)的標(biāo)識符。這個過程提供的 Julia 提示總是有一個 id 值為 1 ,就如上面例子中 julia 進(jìn)程會運(yùn)行驅(qū)動腳本一樣。這個被默認(rèn)用作平行操作的進(jìn)程被稱為 workers
。當(dāng)只有一個進(jìn)程的時候,進(jìn)程 1 就被當(dāng)做一個 worker。否則,worker 就是指除了進(jìn)程 1 之外的所有進(jìn)程。
Julia 內(nèi)置有對于兩種集群的支持:
—p
選項。--machinefile
選項。它使用一個無密碼的 ssh
登來在指定的機(jī)器上啟動 julia 工作進(jìn)程(以相同的路徑作為當(dāng)前主機(jī))。函數(shù) addprocs
,rmprocs
,workers
,當(dāng)然還有其他的在一個集群中可用的以可編程的方式進(jìn)行添加,刪除和查詢的函數(shù)。
其他類型的集群可以通過編寫自己的自定義 ClusterManager。請參閱 ClusterManagers 部分。
并行計算中,消息傳遞和數(shù)據(jù)移動是最大的開銷。減少這兩者的數(shù)量,對性能至關(guān)重要。
fetch
是顯式的數(shù)據(jù)移動操作,它直接要求將對象移動到當(dāng)前機(jī)器。 @spawn
(及相關(guān)宏)也進(jìn)行數(shù)據(jù)移動,但不是顯式的,因而被稱為隱式數(shù)據(jù)移動操作。對比如下兩種構(gòu)造隨機(jī)矩陣并計算其平方的方法: :
# method 1
A = rand(1000,1000)
Bref = @spawn A^2
...
fetch(Bref)
# method 2
Bref = @spawn rand(1000,1000)^2
...
fetch(Bref)
方法 1 中,本地構(gòu)造了一個隨機(jī)矩陣,然后將其傳遞給做平方計算的處理器。方法 2 中,在同一處理器構(gòu)造隨機(jī)矩陣并進(jìn)行平方計算。因此,方法 2 比方法 1 移動的數(shù)據(jù)少得多。
大部分并行計算不需要移動數(shù)據(jù)。最常見的是蒙特卡羅仿真。下例使用 @spawn
在兩個處理器上仿真投硬幣。先在 count_heads.jl
中寫如下函數(shù):
function count_heads(n)
c::Int = 0
for i=1:n
c += randbool()
end
c
end
在兩臺機(jī)器上做仿真,最后將結(jié)果加起來:
require("count_heads")
a = @spawn count_heads(100000000)
b = @spawn count_heads(100000000)
fetch(a)+fetch(b)
在多處理器上獨(dú)立地進(jìn)行迭代運(yùn)算,然后用一些函數(shù)把它們的結(jié)果綜合起來。綜合的過程稱為 約簡 。
上例中,我們顯式調(diào)用了兩個 @spawn
語句,它將并行計算限制在兩個處理器上。要在任意個數(shù)的處理器上運(yùn)行,應(yīng)使用 并行 for 循環(huán) ,它在 Julia 中應(yīng)寫為:
nheads = @parallel (+) for i=1:200000000
int(randbool())
end
這個構(gòu)造實(shí)現(xiàn)了給多處理器分配迭代的模式,并且使用特定約簡來綜合結(jié)果(此例中為 (+)
)。
注意,盡管并行 for 循環(huán)看起來和一組 for 循環(huán)差不多,但它們的行為有很大區(qū)別。第一,循環(huán)不是按順序進(jìn)行的。第二,寫進(jìn)變量或數(shù)組的值不是全局可見的,因為迭代運(yùn)行在不同的處理器上。并行循環(huán)內(nèi)使用的所有變量都會被復(fù)制、廣播到每個處理器。
下列代碼并不會按照預(yù)想運(yùn)行:
a = zeros(100000)
@parallel for i=1:100000
a[i] = i
end
如果不需要,可以省略約簡運(yùn)算符。但此代碼不會初始化 a
的所有元素,因為每個處理器上都只有獨(dú)立的一份兒。應(yīng)避免類似的并行 for 循環(huán)。但是我們可以使用分布式數(shù)組來規(guī)避這種情形,后面我們會講。
如果“外部”變量是只讀的,就可以在并行循環(huán)中使用它:
a = randn(1000)
@parallel (+) for i=1:100000
f(a[randi(end)])
end
有時我們不需要約簡,僅希望將函數(shù)應(yīng)用到某個范圍的整數(shù)(或某個集合的元素)上。這時可以使用 并行映射 pmap
函數(shù)。下例中并行計算幾個大隨機(jī)矩陣的奇異值:
M = {rand(1000,1000) for i=1:10}
pmap(svd, M)
被調(diào)用的函數(shù)需處理大量工作時使用 pmap
,反之,則使用 @parallel for
。
Julia 的平行編程平臺使用任務(wù)(也成為協(xié)程) ,其可在多個計算中切換。每當(dāng)代碼執(zhí)行一個通信操作,例如 fetch
或者 wait
,當(dāng)前任務(wù)便暫停同時調(diào)度器會選擇另一個任務(wù)運(yùn)行。在事件等待完成后,任務(wù)會重新啟動。
對于很多問題,沒必要直接考慮任務(wù)。然而,由于提供了動態(tài)調(diào)度,可以同時等待多個事件。在動態(tài)調(diào)度中,一個程序決定計算什么和在哪計算,這是基于其他工作何時完成的。這是被不可預(yù)知的或不可平衡的工作荷載所需要的,只有當(dāng)他們結(jié)束當(dāng)前任務(wù)我們才能分配更多的工作進(jìn)程。
作為一個例子,考慮計算不同大小的矩陣的奇異值:
M = {rand(800,800), rand(600,600), rand(800,800), rand(600,600)}
pmap(svd, M)
如果一個進(jìn)程要處理 800 x 800 矩陣和另一個 600 x 600 矩陣,我們不會得到很多的可伸縮性。解決方案是讓本地的任務(wù)在他們完成當(dāng)前的任務(wù)時去“喂”每個進(jìn)程中的工作。pmap
的實(shí)現(xiàn)過程中可以看到這個:
function pmap(f, lst)
np = nprocs() # determine the number of processes available
n = length(lst)
results = cell(n)
i = 1
# function to produce the next work item from the queue.
# in this case it's just an index.
nextidx() = (idx=i; i+=1; idx)
@sync begin
for p=1:np
if p != myid() || np == 1
@async begin
while true
idx = nextidx()
if idx > n
break
end
results[idx] = remotecall_fetch(p, f, lst[idx])
end
end
end
end
end
results
end
只有在本地運(yùn)行任務(wù)的過程中,@async
才與 @spawn
類似。我們使用它來為每個流程創(chuàng)建一個“供給”的任務(wù)。每個任務(wù)選擇下一個需要被計算的指數(shù),然后等待它的進(jìn)程完成,接著一直重復(fù)到用完指數(shù)。注意,“供給”任務(wù)只有當(dāng)主要任務(wù)到達(dá) @sync
塊結(jié)束時才開始執(zhí)行,此時它放棄控制并等待所有的本地任務(wù)在從函數(shù)返回之前完成。供給任務(wù)可以通過 nextidx()
共享狀態(tài),因為它們都在相同的進(jìn)程上運(yùn)行。這個過程不需要鎖定,因為線程是實(shí)時進(jìn)行調(diào)度的而不是一成不變。這意味著內(nèi)容的切換只發(fā)生在定義好的時候:在這種情況下,當(dāng) remotecall_fetch
會被調(diào)用。
并行計算綜合使用多個機(jī)器上的內(nèi)存資源,因而可以使用在一個機(jī)器上不能實(shí)現(xiàn)的大數(shù)組。這時,可使用分布式數(shù)組,每個處理器僅對它所擁有的那部分?jǐn)?shù)組進(jìn)行操作。
分布式數(shù)組(或 全局對象 )邏輯上是個單數(shù)組,但它分為很多塊兒,每個處理器上保存一塊兒。但對整個數(shù)組的運(yùn)算與在本地數(shù)組的運(yùn)算是一樣的,并行計算是隱藏的。
分布式數(shù)組是用 DArray
類型來實(shí)現(xiàn)的。 DArray
的元素類型和維度與 Array
一樣。 DArray
的數(shù)據(jù)的分布,是這樣實(shí)現(xiàn)的:它把索引空間在每個維度都分成一些小塊。
一些常用分布式數(shù)組可以使用 d
開頭的函數(shù)來構(gòu)造:
dzeros(100,100,10)
dones(100,100,10)
drand(100,100,10)
drandn(100,100,10)
dfill(x, 100,100,10)
最后一個例子中,數(shù)組的元素由值 x
來初始化。這些函數(shù)自動選取某個分布。如果要指明使用哪個進(jìn)程,如何分布數(shù)據(jù),應(yīng)這樣寫:
dzeros((100,100), [1:4], [1,4])
第二個參數(shù)指定了數(shù)組應(yīng)該在處理器 1 到 4 中創(chuàng)建。劃分含有很多進(jìn)程的數(shù)據(jù)時,人們經(jīng)??吹叫阅苁找孢f減。把 DArrays
放在一個進(jìn)程的子集中,該進(jìn)程允許多個 DArray
同時計算,并且每個進(jìn)程擁有更高比例的通信工作。
第三個參數(shù)指定了一個分布;數(shù)組第 n 個元素指定了應(yīng)該分成多少個塊。在本例中,第一個維度不會分割,而第二個維度將分為四塊。因此每個局部塊的大小為 (100,25)
。注意,分布式數(shù)組必須與進(jìn)程數(shù)量相符。
distribute(a::Array)
可用來將本地數(shù)組轉(zhuǎn)換為分布式數(shù)組。
localpart(a::DArray)
可用來獲取 DArray
本地存儲的部分。
localindexes(a::DArray)
返回本地進(jìn)程所存儲的維度索引值范圍多元組。
convert(Array, a::DArray)
將所有數(shù)據(jù)綜合到本地進(jìn)程上。
使用索引值范圍來索引 DArray
(方括號)時,會創(chuàng)建 SubArray
對象,但不復(fù)制數(shù)據(jù)。
DArray
的構(gòu)造函數(shù)是 darray
,它的聲明如下:
DArray(init, dims[, procs, dist])
init
函數(shù)的參數(shù),是索引值范圍多元組。這個函數(shù)在本地聲名一塊分布式數(shù)組,并用指定索引值來進(jìn)行初始化。 dims
是整個分布式數(shù)組的維度。 procs
是可選的,指明一個存有要使用的進(jìn)程 ID 的向量 。 dist
是一個整數(shù)向量,指明分布式數(shù)組在每個維度應(yīng)該被分成幾塊。
最后倆參數(shù)是可選的,忽略的時候使用默認(rèn)值。
下例演示如果將本地數(shù)組 fill
的構(gòu)造函數(shù)更改為分布式數(shù)組的構(gòu)造函數(shù):
dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...)
此例中 init
函數(shù)僅對它構(gòu)造的本地塊的維度調(diào)用 fill
。
在這個時候,分布式數(shù)組沒有太多的功能。主要功能是通過數(shù)組索引來允許進(jìn)行通信,這對許多問題來說都很方便。作為一個例子,考慮實(shí)現(xiàn)“生活”細(xì)胞自動機(jī),每個單元網(wǎng)格中的細(xì)胞根據(jù)其鄰近的細(xì)胞進(jìn)行更新。每個進(jìn)程需要其本地塊中直接相鄰的細(xì)胞才能計算一個迭代的結(jié)果。下面的代碼可以實(shí)現(xiàn)這個功能:
function life_step(d::DArray)
DArray(size(d),procs(d)) do I
top = mod(first(I[1])-2,size(d,1))+1
bot = mod( last(I[1]) ,size(d,1))+1
left = mod(first(I[2])-2,size(d,2))+1
right = mod( last(I[2]) ,size(d,2))+1
old = Array(Bool, length(I[1])+2, length(I[2])+2)
old[1 , 1 ] = d[top , left] # left side
old[2:end-1, 1 ] = d[I[1], left]
old[end , 1 ] = d[bot , left]
old[1 , 2:end-1] = d[top , I[2]]
old[2:end-1, 2:end-1] = d[I[1], I[2]] # middle
old[end , 2:end-1] = d[bot , I[2]]
old[1 , end ] = d[top , right] # right side
old[2:end-1, end ] = d[I[1], right]
old[end , end ] = d[bot , right]
life_rule(old)
end
end
可以看到,我們使用一系列的索引表達(dá)式來獲取一個本地數(shù)組中的數(shù)組 old
。注意,do
塊語法方便 init
函數(shù)傳遞給 DArray
構(gòu)造函數(shù)。接下來,連續(xù)函數(shù) life_rule
被調(diào)用以提供數(shù)據(jù)的更新規(guī)則,產(chǎn)生所需的 DArray
塊。 life_rule
與 DArray-specific
沒有關(guān)系,但為了完整性,我們在此仍將它列出:
function life_rule(old)
m, n = size(old)
new = similar(old, m-2, n-2)
for j = 2:n-1
for i = 2:m-1
nc = +(old[i-1,j-1], old[i-1,j], old[i-1,j+1],
old[i ,j-1], old[i ,j+1],
old[i+1,j-1], old[i+1,j], old[i+1,j+1])
new[i-1,j-1] = (nc == 3 || nc == 2 && old[i,j])
end
end
new
end
共享陣列使用在許多進(jìn)程中共享內(nèi)存來映射相同數(shù)組的系統(tǒng)。雖然與 DArray
有一些相似之處,但是 SharedArray
的行為是完全不同的。在一個 DArray
中,每個進(jìn)程只能本地訪問一塊數(shù)據(jù),并且兩個進(jìn)程共享同一塊;相比之下,在 SharedArray
中,每個“參與”的進(jìn)程能夠訪問整個數(shù)組。當(dāng)你想要在同一臺機(jī)器上大量數(shù)據(jù)共同訪問兩個或兩個以上的進(jìn)程時, SharedArray
是一個不錯的選擇。
SharedArray
索引(分配和訪問值)與常規(guī)數(shù)組一樣工作,并且是非常高效的,因為其底層內(nèi)存可用于本地進(jìn)程。因此,大多數(shù)算法自然地在 SharedArrays
上運(yùn)行,即使在單進(jìn)程模式中。當(dāng)某個算法必須在一個 Array
輸入的情況下,可以從 SharedArray
檢索底層數(shù)組通過調(diào)用 sdata(S)
取回。對于其他 AbstractArray
類型, sdata
返回對象本身,所以在任何數(shù)組類型下使用 sdata
都是很安全的。
共享數(shù)字構(gòu)造函數(shù)數(shù)的形式:
SharedArray(T::Type, dims::NTuple; init=false, pids=Int[])
創(chuàng)建一個被 pids
進(jìn)程指定的,bitstype 為 T
并且大小為 dims
的共享數(shù)組。與分布式陣列不同,共享數(shù)組只能用于這些參與人員指定的以 pid
命名的參數(shù)(如果在同一個主機(jī)上,創(chuàng)建過程也同樣如此)。
如果一個簽名為 initfn(S::SharedArray)
的 init
函數(shù)被指定,它會被所有參與人員調(diào)用。你可以控制它,每個工人可以在數(shù)組的不同部分運(yùn)行 init
函數(shù),因此進(jìn)行并行的初始化。
這里有一個簡單的例子:
julia> addprocs(3)
3-element Array{Any,1}:
2
3
4
julia> S = SharedArray(Int, (3,4), init = S -> S[localindexes(S)] = myid())
3x4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 3 4 4
julia> S[3,2] = 7
7
julia> S
3x4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 7 4 4
localindexes
提供不相交的一維索引的范圍,它有時方便進(jìn)程之間的任務(wù)交流。當(dāng)然,你可以按你希望的方式來劃分工作:
julia> S = SharedArray(Int, (3,4), init = S -> S[myid()-1:nworkers():length(S)] = myid())
3x4 SharedArray{Int64,2}:
2 2 2 2
3 3 3 3
4 4 4 4
因為所有進(jìn)程都可以訪問底層數(shù)據(jù),你必須小心不要設(shè)置沖突。例如:
@sync begin
for p in workers()
@async begin
remotecall_wait(p, fill!, S, p)
end
end
end
這有可能導(dǎo)致未定義的行為:因為每個進(jìn)程有他自己的 pid
來充滿整個數(shù)組,無論最后執(zhí)行的是哪一個進(jìn)程(任何特定元素 S
)都將保留他的 pid
。
Julia 工作進(jìn)程也可以在任意機(jī)器中產(chǎn)生,讓 Julia 的自然并行功能非常透明地在集群環(huán)境中運(yùn)行。ClusterManager
接口提供了一種方法來指定啟動和管理工作進(jìn)程的手段。例如, ssh
集群也使用 ClusterManager
來實(shí)現(xiàn):
immutable SSHManager <: ClusterManager
launch::Function
manage::Function
machines::AbstractVector
SSHManager(; machines=[]) = new(launch_ssh_workers, manage_ssh_workers, machines)
end
function launch_ssh_workers(cman::SSHManager, np::Integer, config::Dict)
...
end
function manage_ssh_workers(id::Integer, config::Dict, op::Symbol)
...
end
launch_ssh_workers
負(fù)責(zé)實(shí)例化新的 Julia 進(jìn)程并且 manage_ssh_workers
提供了一種方法來管理這些進(jìn)程,例如發(fā)送中斷信號。在運(yùn)行時可以使用 addprocs
添加新進(jìn)程:
addprocs(5, cman=LocalManager())
來指定添加一批進(jìn)程并且 ClusterManager
用于啟動這些進(jìn)程。
腳注
[1]:在這邊文中, MPI 是指 MPI-1 標(biāo)準(zhǔn)。從 MPI-2 開始,MPI 標(biāo)準(zhǔn)委員會引入了一系列新的通信機(jī)制,統(tǒng)稱為遠(yuǎn)程內(nèi)存訪問 (RMA) 。添加 RMA MPI 標(biāo)準(zhǔn)的動機(jī)是改善單方面的溝通模式。最新的 MPI 標(biāo)準(zhǔn)的更多信息,參見 http://www.mpi-forum.org/docs。
更多建議: