diff --git a/src/DistributedArrays.jl b/src/DistributedArrays.jl index e26e6f5..306fabe 100644 --- a/src/DistributedArrays.jl +++ b/src/DistributedArrays.jl @@ -16,7 +16,7 @@ import Primes: factor # DArray exports export DArray, SubDArray, SubOrDArray, @DArray -export dzeros, dones, dfill, drand, drandn, distribute, localpart, localindices, ppeval +export dzeros, dones, dfill, drand, drandn, distribute, localpart, localindices, ppeval,daccumulate! # non-array distributed data export ddata, gather diff --git a/src/darray.jl b/src/darray.jl index c4b12b1..2d582c5 100644 --- a/src/darray.jl +++ b/src/darray.jl @@ -852,3 +852,139 @@ function Random.rand!(A::DArray, ::Type{T}) where T end end + +function daccumulateindep!(op,darray1::DArray{T},darray2::DArray{T}, procsarray,dimis,dimstuble) where{T} + + noprocs=length(procsarray) + accarray=Array{T,length(dimstuble)}(undef,(dimstuble...)) + www=Array{UnitRange{Int32},2}(undef,(noprocs,length(dimstuble))) + #= + in this function we assume that the procs in procs array are only partioned along the axis we will acumulate, so we wil + 1-make every process accumulate its own copy in parallel and emit its last array array along the accumulation axis + 2-gather erery last array of every process and accumulate it, to know whate is the reslut of the acumulation before each process + 3-knowing the last reslut before each processwe , it is easy to compute the final answer in ervery element, we can even parallelize + the operation on eeach element on the same process + =# + + map([i for i in 1:noprocs]) do i + + for (ind,v) in enumerate(dimstuble) + if ind==1 + www[i,ind]=i:i + else + www[i,ind]=1:v + end + end + + end + + + asyncmap([i for i in 1:length(procsarray)]) do pindex + accarray[www[pindex,:]...]=remotecall_fetch(procsarray[pindex]) do + DistributedArrays.makelocal(darray1, (localindices(darray2)...)) + src=localpart(darray2) + dest=localpart(darray1) + #cumsum!(dest,src,dims=dimis) + accumulate!(op,dest,src,dims=dimis) + selector=Array{UnitRange{Int32},1}(undef,length(size(dest))) + for i in [i for i in 1:length(size(dest))] + if i==dimis + selector[i]=size(dest)[i]:size(dest)[i] + else + selector[i]=1:size(dest)[i] + end + end + + return dropdims(dest[selector...],dims=dimis) + + end + + + + + end + #cumsum!(accarray,accarray,dims=1) + accumulate!(op,accarray,accarray,dims=1) + + selec=[Colon() for i in 1:length(dimstuble)-1] + asyncmap([i for i in 2:length(procsarray)]) do pindx + remotecall_fetch(procsarray[pindx],accarray[pindx-1,selec...]) do x + + + dest=localpart(darray1) + + #=asyncmap([i for i in 1:axes(myarray,dimis).stop]) do i + myselec=[selec[1:dimis-1]...,i,selec[dimis:end]...] + myarray[myselec...]+=x + end=# + if typeof(x)<:Array + newshape=size(x) + newshape=(newshape[1:dimis-1]...,1,newshape[dimis:end]...) + x=reshape(x,newshape) + end + broadcast!(op,dest,dest,x) + + + end + end + + + + + end + + +function daccumulate!(op,darray1::DArray,darray2::DArray,ind) + axes(darray1) == axes(darray2) || throw(DimensionMismatch("shape of ar1 must match ar2")) + #= + we will split the array into independent sets, becasue of the workers were partioned by any simentions other than + the one we will accumalte along, we can process thoes pations independently, + so we will iterate over thoes sets by dfs + =# + myprocs=procs(darray2) # the procs we will work on + dimlma=size(myprocs) # array of numper of partions in each dimension + nodim=length(dimlma) # numper of dimnsions + dimia=Array{UnitRange{Int32},1}(undef,length(dimlma)) # track which index on every dimesion we work on + dimia[ind]=1:dimlma[ind] # we will accumulate throught the indsth dimension + independesets=[] + function dfs(d) + + if d==nodim+1 + wprocs=myprocs[dimia...] + wprocs=reshape(wprocs,(length(wprocs))) #the array of procss we will work on + nwork=length(wprocs) + # println(wprocs) + #println(typeof(wprocs)) + + sample=darray2.indices[dimia...][1] + #println(dimia) + t1= ind==1 ? () : sample[1:ind-1] + t2= ind==nodim ? () : sample[ind+1:nodim] + accdim=map([1:nwork,t1...,t2...]) do r + return r.stop-r.start+1 + end + #println(accdim) + + #@async daccumulateindep!(op,darray1,darray2,wprocs,ind,accdim) + p=[] + push!(p,wprocs) + push!(p,accdim) + push!(independesets,p) + elseif d==ind + dfs(d+1) + + + else for i in 1:dimlma[d] + dimia[d]=i:i + dfs(d+1) + end + end + + + + end + dfs(1) + asyncmap( independesets) do x + daccumulateindep!(op,darray1,darray2,x[1],ind,x[2]) + end + end diff --git a/test/darray.jl b/test/darray.jl index 4a8d3df..4edd761 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -2,6 +2,8 @@ using Test, LinearAlgebra, SpecialFunctions using Statistics: mean using SparseArrays: nnz using Random +import Primes: factor + @everywhere using SparseArrays: sprandn @testset "test distribute and other constructors" begin @@ -1048,6 +1050,47 @@ end close(d) end +function testaccum(desdim,procslist) + u=length(procslist) + factorrs=factor(Vector,u) + ndims=length(desdim) + partions=fill(1,ndims) + + + function dfs(ind,start) + + if ind==length(factorrs)+1 + for i in 1:ndims + oa=fill(0,desdim) + cumsum!(oa,fill(1,desdim),dims=i) + c=(partions...,) + da=dfill(0,(desdim...,),procslist,c) + + daccumulate!(+,da,dfill(1,desdim,procslist,c),i) + + oda=convert(Array,da) + @test oda==oa + #println("pass") + close(da) + end + return + end + for i in start:length(partions) + partions[i]*=factorrs[ind] + dfs(ind+1,ind!=length(factorrs)&&factorrs[ind+1]==factorrs[ind] ? i : 1) + partions[i]/=factorrs[ind] + end + end + dfs(1,1) + +end +@testset "test daccumulat!" begin + testaccum((100,),workers()) + testaccum((10,10),workers()) + testaccum((10,10,10),workers()) + testaccum((10,10,10,10),workers()) +end + check_leaks() d_closeall()