Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add accumulate! function for arbitrary number of dimensions #213

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/DistributedArrays.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import Primes: factor

# DArray exports
export DArray, SubDArray, SubOrDArray, @DArray
export dzeros, dones, dfill, drand, drandn, distribute, localpart, localindices, ppeval
export dzeros, dones, dfill, drand, drandn, distribute, localpart, localindices, ppeval,daccumulate!

# non-array distributed data
export ddata, gather
Expand Down
136 changes: 136 additions & 0 deletions src/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -852,3 +852,139 @@ function Random.rand!(A::DArray, ::Type{T}) where T
end
end


function daccumulateindep!(op,darray1::DArray{T},darray2::DArray{T}, procsarray,dimis,dimstuble) where{T}

noprocs=length(procsarray)
accarray=Array{T,length(dimstuble)}(undef,(dimstuble...))
www=Array{UnitRange{Int32},2}(undef,(noprocs,length(dimstuble)))
#=
in this function we assume that the procs in procs array are only partioned along the axis we will acumulate, so we wil
1-make every process accumulate its own copy in parallel and emit its last array array along the accumulation axis
2-gather erery last array of every process and accumulate it, to know whate is the reslut of the acumulation before each process
3-knowing the last reslut before each processwe , it is easy to compute the final answer in ervery element, we can even parallelize
the operation on eeach element on the same process
=#

map([i for i in 1:noprocs]) do i

for (ind,v) in enumerate(dimstuble)
if ind==1
www[i,ind]=i:i
else
www[i,ind]=1:v
end
end

end


asyncmap([i for i in 1:length(procsarray)]) do pindex
accarray[www[pindex,:]...]=remotecall_fetch(procsarray[pindex]) do
DistributedArrays.makelocal(darray1, (localindices(darray2)...))
src=localpart(darray2)
dest=localpart(darray1)
#cumsum!(dest,src,dims=dimis)
accumulate!(op,dest,src,dims=dimis)
selector=Array{UnitRange{Int32},1}(undef,length(size(dest)))
for i in [i for i in 1:length(size(dest))]
if i==dimis
selector[i]=size(dest)[i]:size(dest)[i]
else
selector[i]=1:size(dest)[i]
end
end

return dropdims(dest[selector...],dims=dimis)

end




end
#cumsum!(accarray,accarray,dims=1)
accumulate!(op,accarray,accarray,dims=1)

selec=[Colon() for i in 1:length(dimstuble)-1]
asyncmap([i for i in 2:length(procsarray)]) do pindx
remotecall_fetch(procsarray[pindx],accarray[pindx-1,selec...]) do x


dest=localpart(darray1)

#=asyncmap([i for i in 1:axes(myarray,dimis).stop]) do i
myselec=[selec[1:dimis-1]...,i,selec[dimis:end]...]
myarray[myselec...]+=x
end=#
if typeof(x)<:Array
newshape=size(x)
newshape=(newshape[1:dimis-1]...,1,newshape[dimis:end]...)
x=reshape(x,newshape)
end
broadcast!(op,dest,dest,x)


end
end




end


function daccumulate!(op,darray1::DArray,darray2::DArray,ind)
axes(darray1) == axes(darray2) || throw(DimensionMismatch("shape of ar1 must match ar2"))
#=
we will split the array into independent sets, becasue of the workers were partioned by any simentions other than
the one we will accumalte along, we can process thoes pations independently,
so we will iterate over thoes sets by dfs
=#
myprocs=procs(darray2) # the procs we will work on
dimlma=size(myprocs) # array of numper of partions in each dimension
nodim=length(dimlma) # numper of dimnsions
dimia=Array{UnitRange{Int32},1}(undef,length(dimlma)) # track which index on every dimesion we work on
dimia[ind]=1:dimlma[ind] # we will accumulate throught the indsth dimension
independesets=[]
function dfs(d)

if d==nodim+1
wprocs=myprocs[dimia...]
wprocs=reshape(wprocs,(length(wprocs))) #the array of procss we will work on
nwork=length(wprocs)
# println(wprocs)
#println(typeof(wprocs))

sample=darray2.indices[dimia...][1]
#println(dimia)
t1= ind==1 ? () : sample[1:ind-1]
t2= ind==nodim ? () : sample[ind+1:nodim]
accdim=map([1:nwork,t1...,t2...]) do r
return r.stop-r.start+1
end
#println(accdim)

#@async daccumulateindep!(op,darray1,darray2,wprocs,ind,accdim)
p=[]
push!(p,wprocs)
push!(p,accdim)
push!(independesets,p)
elseif d==ind
dfs(d+1)


else for i in 1:dimlma[d]
dimia[d]=i:i
dfs(d+1)
end
end



end
dfs(1)
asyncmap( independesets) do x
daccumulateindep!(op,darray1,darray2,x[1],ind,x[2])
end
end
43 changes: 43 additions & 0 deletions test/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ using Test, LinearAlgebra, SpecialFunctions
using Statistics: mean
using SparseArrays: nnz
using Random
import Primes: factor

@everywhere using SparseArrays: sprandn

@testset "test distribute and other constructors" begin
Expand Down Expand Up @@ -1048,6 +1050,47 @@ end
close(d)
end

function testaccum(desdim,procslist)
u=length(procslist)
factorrs=factor(Vector,u)
ndims=length(desdim)
partions=fill(1,ndims)


function dfs(ind,start)

if ind==length(factorrs)+1
for i in 1:ndims
oa=fill(0,desdim)
cumsum!(oa,fill(1,desdim),dims=i)
c=(partions...,)
da=dfill(0,(desdim...,),procslist,c)

daccumulate!(+,da,dfill(1,desdim,procslist,c),i)

oda=convert(Array,da)
@test oda==oa
#println("pass")
close(da)
end
return
end
for i in start:length(partions)
partions[i]*=factorrs[ind]
dfs(ind+1,ind!=length(factorrs)&&factorrs[ind+1]==factorrs[ind] ? i : 1)
partions[i]/=factorrs[ind]
end
end
dfs(1,1)

end
@testset "test daccumulat!" begin
testaccum((100,),workers())
testaccum((10,10),workers())
testaccum((10,10,10),workers())
testaccum((10,10,10,10),workers())
end

check_leaks()

d_closeall()
Expand Down