Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.4k views
in Technique[技术] by (71.8m points)

朱莉娅(Julia)中带有突变操作的for循环多线程

我试图运行下面提到的代码for loop使用@threads了多线程,但是结果总是不匹配描绘输出数量的错误。

using Combinatorics, Base.Threads
import Base.Threads.@threads
func(x, y)
result = Float64[]
a = Float64[]
@threads for c in combinations(1:n, 2)
        a, b = c
        result = func(a, b)
        push!(result, result)
        push!(a, a)     
     end

在处理过程中观察到的错误:

DimensionMismatch("column :result has length 60 and column :a has length 50")

请提出一种方法,以确保错过循环中没有的I / O。


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

在您的示例中不起作用的一件事是,所有线程都可能同时(通过 push!)对相同的向量(result和a)进行了变异,从而允许发生竞争条件。

解决这个问题的一种方法是拥有一个向量集合(每个线程一个)。每个线程仅修改自己的向量(由标识threadid())。

使用这种技术,您的示例的简化版本可能如下所示:

# The function we want to apply to each element
julia> f(x) = 2x+1
f (generic function with 1 method)

# Two collections of vectors (one vector for each thread)
# that will hold the results for each thread
julia> results = [Float64[] for _ in 1:Threads.nthreads()];
julia> as      = [Float64[] for _ in 1:Threads.nthreads()]
8-element Vector{Vector{Float64}}:
 []
 []
 []
 []
 []
 []
 []
 []

julia> Threads.@threads for a in 1:10
           result = f(a)

           # Each thread only ever mutates its own result vector: 
           #    results[Threads.threadid()]
           push!(results[Threads.threadid()], result)
           push!(as[Threads.threadid()],      a)
       end

请注意,您将获得结果的集合,该结果由产生结果的线程的ID索引。

# Now you get a collection of results, indexed by the id of the thread which produced them
julia> results
8-element Vector{Vector{Float64}}:
 [3.0, 5.0]  # These results have been produced by thread #1
 [7.0, 9.0]
 [11.0]
 [13.0]
 [15.0]
 [17.0]
 [19.0]
 [21.0]
julia> as
8-element Vector{Vector{Float64}}:
 [1.0, 2.0]
 [3.0, 4.0]
 [5.0]
 [6.0]
 [7.0]
 [8.0]
 [9.0]
 [10.0]

最后,因此,您需要以某种方式连接或展平所有结果向量,以便将所有线程特定的结果合并为一个。一种方法是连接所有结果(这将分配一个新的大向量来保存所有结果):

julia> reduce(vcat, results)
10-element Vector{Float64}:
  3.0
  5.0
  7.0
  9.0
 11.0
 13.0
 15.0
 17.0
 19.0
 21.0

julia> reduce(vcat, as)
10-element Vector{Float64}:
  1.0
  2.0
  3.0
  4.0
  5.0
  6.0
  7.0
  8.0
  9.0
 10.0

另一种方法是直接迭代嵌套的结果,将它们动态地展平(以免分配两倍的内存来以平面方式存储它们):

julia> using Base.Iterators: flatten

julia> for r in flatten(results)
           println(r)
       end
3.0
5.0
7.0
9.0
11.0
13.0
15.0
17.0
19.0
21.0

julia> for (a, r) in zip(flatten(as), flatten(results))
           println("$a -> $r")
       end
1.0 -> 3.0
2.0 -> 5.0
3.0 -> 7.0
4.0 -> 9.0
5.0 -> 11.0
6.0 -> 13.0
7.0 -> 15.0
8.0 -> 17.0
9.0 -> 19.0
10.0 -> 21.0

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...