在Golang循环中的并行性

huangapple go评论80阅读模式
英文:

parallelism in Golang loop

问题

我有一个项目,需要在CPU的多个核心上运行它以获得更快的速度。我在Fortran中使用了omplib,但对于Golang的并行性不太熟悉。我尝试了goroutines,但出现了问题,导致结果错误。这是我的代码:

package main

import (
	"bufio"
	"fmt"
	"log"
	"math"
	"math/rand"
	"os"
	"time"
)

const (
	n_particles int     = 2048
	n_steps     int     = 1000000
	dt          float64 = 1.0
	v0          float64 = 0.50
	radius      float64 = 1.0
	f_intensity float64 = 1.8
	scale       float64 = 32.0
	alpha       float64 = 1.0 / 36.0
)

var (
	x      [n_particles + 1]float64
	y      [n_particles + 1]float64
	angles [n_particles + 1]float64
	vx     [n_particles + 1]float64
	vy     [n_particles + 1]float64
	order  [n_steps + 1]float64
)

func main() {
	/////randomizer
	vstart := time.Now()
	rsource := rand.NewSource(time.Now().UnixNano())
	randomizer := rand.New(rsource)

	for i := 0; i <= n_particles; i++ {
		x[i] = (randomizer.Float64()) * scale
		y[i] = (randomizer.Float64()) * scale
		angles[i] = (randomizer.Float64()) * math.Pi * 2
		sin, cos := math.Sincos(angles[i])
		vx[i] = v0 * cos
		vy[i] = v0 * sin
	}
	//////main loop
	for i := 0; i <= n_steps; i++ {
		start := time.Now()

		for j := 0; j <= n_particles; j++ {
			x[j] = x[j] + (vx[j] * dt)
			//x[j] = math.Mod(x[j], scale)
			if x[j] < 0.0 {
				x[j] = x[j] + scale
			}
			if x[j] >= scale {
				x[j] = x[j] - scale
			}
			y[j] = y[j] + (vy[j] * dt)
			//y[j] = math.Mod(x[j], scale)
			if y[j] < 0.0 {
				y[j] = y[j] + scale
			}
			if y[j] >= scale {
				y[j] = y[j] - scale
			}

		}
		type intpos struct {
			x, y int64
		}
		adjacencyIndex := make(map[intpos][]int)
		////getting each boxes particles
		for j := 0; j <= n_particles; j++ {
			// . . .
			ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))                 // getting particle box
			adjacencyIndex[intpos{ix, iy}] = append(adjacencyIndex[intpos{ix, iy}], j) // adding particles to boxes
		}
		/////////
		m_angles := angles

		////particle loop - I WANT FOLLOWING LOOP PARALLEL

		for j := 0; j <= n_particles; j++ {

			sumanglesx := 0.0
			sumanglesy := 0.0
			ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))
			// fxi = math.Floor(x[j])
			// fyi = math.Floor(y[j])

			for dx := -1; dx <= 1; dx++ {
				for dy := -1; dy <= 1; dy++ {
					adjacentParticles := adjacencyIndex[intpos{ix + int64(dx), iy + int64(dy)}]

					for _, k := range adjacentParticles {

						dist := ((x[k] - x[j]) * (x[k] - x[j])) + ((y[k] - y[j]) * (y[k] - y[j]))

						if dist < radius {

							sy, sx := math.Sincos(angles[k])

							if k <= j {
								sumanglesx = sumanglesx + sx
								sumanglesy = sumanglesy + sy
							} else {
								sx = alpha * sx
								sy = alpha * sy
								sumanglesx = sumanglesx + sx
								sumanglesy = sumanglesy + sy
							}
						}
					}
				}
			}
			bsource := rand.NewSource(time.Now().UnixNano())
			bandomizer := rand.New(bsource)
			sumanglesy = sumanglesy
			sumanglesx = sumanglesx
			r_angles := math.Atan2(sumanglesy, sumanglesx)
		}
	}
}

现在我想让以下循环并行运行:

////particle loop - I WANT FOLLOWING LOOP PARALLEL

for j := 0; j <= n_particles; j++ {

	sumanglesx := 0.0
	sumanglesy := 0.0
	ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))
	// fxi = math.Floor(x[j])
	// fyi = math.Floor(y[j])

	for dx := -1; dx <= 1; dx++ {
		for dy := -1; dy <= 1; dy++ {
			adjacentParticles := adjacencyIndex[intpos{ix + int64(dx), iy + int64(dy)}]

			for _, k := range adjacentParticles {

				dist := ((x[k] - x[j]) * (x[k] - x[j])) + ((y[k] - y[j]) * (y[k] - y[j]))

				if dist < radius {

					sy, sx := math.Sincos(angles[k])

					if k <= j {
						sumanglesx = sumanglesx + sx
						sumanglesy = sumanglesy + sy
					} else {
						sx = alpha * sx
						sy = alpha * sy
						sumanglesx = sumanglesx + sx
						sumanglesy = sumanglesy + sy
					}
				}
			}
		}
	}
	bsource := rand.NewSource(time.Now().UnixNano())
	bandomizer := rand.New(bsource)
	sumanglesy = sumanglesy
	sumanglesx = sumanglesx
	r_angles := math.Atan2(sumanglesy, sumanglesx)
}

请注意,我已经在代码中用注释标记了要并行运行的循环部分。

英文:

I have a project and need to run it on multiple cores of an cpu to get more speed . I have used omplib in fortran but I am not familiar with Golang parallelism . I tried goroutines but that went wrong and made a mess and I got false results. This is my code :

package main

import (
	&quot;bufio&quot;
	&quot;fmt&quot;
	&quot;log&quot;
	&quot;math&quot;
	&quot;math/rand&quot;
	&quot;os&quot;

	&quot;time&quot;
)

const (
	n_particles int     = 2048
	n_steps     int     = 1000000
	dt          float64 = 1.0
	v0          float64 = 0.50
	radius      float64 = 1.0
	f_intensity float64 = 1.8
	scale       float64 = 32.0
	alpha       float64 = 1.0 / 36.0
)

var (
	x      [n_particles + 1]float64
	y      [n_particles + 1]float64
	angles [n_particles + 1]float64
	vx     [n_particles + 1]float64
	vy     [n_particles + 1]float64
	order  [n_steps + 1]float64
)

func main() {
	/////randomizer
	vstart := time.Now()
	rsource := rand.NewSource(time.Now().UnixNano())
	randomizer := rand.New(rsource)

	for i := 0; i &lt;= n_particles; i++ {
		x[i] = (randomizer.Float64()) * scale
		y[i] = (randomizer.Float64()) * scale
		angles[i] = (randomizer.Float64()) * math.Pi * 2
		sin, cos := math.Sincos(angles[i])
		vx[i] = v0 * cos
		vy[i] = v0 * sin
	}
	//////main loop
	for i := 0; i &lt;= n_steps; i++ {
		start := time.Now()

		for j := 0; j &lt;= n_particles; j++ {
			x[j] = x[j] + (vx[j] * dt)
			//x[j] = math.Mod(x[j], scale)
			if x[j] &lt; 0.0 {
				x[j] = x[j] + scale
			}
			if x[j] &gt;= scale {
				x[j] = x[j] - scale
			}
			y[j] = y[j] + (vy[j] * dt)
			//y[j] = math.Mod(x[j], scale)
			if y[j] &lt; 0.0 {
				y[j] = y[j] + scale
			}
			if y[j] &gt;= scale {
				y[j] = y[j] - scale
			}

		}
		type intpos struct {
			x, y int64
		}
		adjacencyIndex := make(map[intpos][]int)
		////getting each boxes particles
		for j := 0; j &lt;= n_particles; j++ {
			// . . .
			ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))                 // getting particle box
			adjacencyIndex[intpos{ix, iy}] = append(adjacencyIndex[intpos{ix, iy}], j) // adding particles to boxes
		}
		/////////
		m_angles := angles
		

Now I want following loop run in parallel :

////particle loop - I WANT FOLLOWING LOOP PARALLEL
for j := 0; j &lt;= n_particles; j++ {
sumanglesx := 0.0
sumanglesy := 0.0
ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))
// fxi = math.Floor(x[j])
// fyi = math.Floor(y[j])
for dx := -1; dx &lt;= 1; dx++ {
for dy := -1; dy &lt;= 1; dy++ {
adjacentParticles := adjacencyIndex[intpos{ix + int64(dx), iy + int64(dy)}]
for _, k := range adjacentParticles {
dist := ((x[k] - x[j]) * (x[k] - x[j])) + ((y[k] - y[j]) * (y[k] - y[j]))
if dist &lt; radius {
sy, sx := math.Sincos(angles[k])
if k &lt;= j {
sumanglesx = sumanglesx + sx
sumanglesy = sumanglesy + sy
} else {
sx = alpha * sx
sy = alpha * sy
sumanglesx = sumanglesx + sx
sumanglesy = sumanglesy + sy
}
}
}
}
}
bsource := rand.NewSource(time.Now().UnixNano())
bandomizer := rand.New(bsource)
sumanglesy = sumanglesy
sumanglesx = sumanglesx
r_angles := math.Atan2(sumanglesy, sumanglesx)
}
}
}

I specified one loop which should run parallelly .

答案1

得分: 1

这里有两种尝试的方法:https://play.golang.org/p/O1uB2zzJEC5

我更喜欢使用通道的方法,因为对我来说更符合Go的习惯用法,并且可以更容易地处理panic、错误条件等。

英文:

Here are two approaches to try out: https://play.golang.org/p/O1uB2zzJEC5

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

func main() {
  waitGroupApproach()
  channelApproach()
}

func waitGroupApproach() {
	fmt.Println(&quot;waitGroupApproach&quot;)
	var waitgroup sync.WaitGroup
	
	result_table := make([]int, 6, 6)
	
	for j := 0; j &lt;= 5; j++ {
		waitgroup.Add(1)
		
		go func(index int) {
			fmt.Println(index) // try putting here `j` instea of `index`
			result_table[index] = index*2
		
			waitgroup.Done()
		}(j) // you have to put any for-loop variables into closure
		// because otherwsie all routines inside will likely get the last j == n_particles + 1
		// as they will likely run after the loop has finished
	}
	
	fmt.Println(&quot;waiting&quot;)
	waitgroup.Wait()
	// process results further
	fmt.Println(&quot;finished&quot;)
	fmt.Println(result_table)
}

func channelApproach() {
	fmt.Println(&quot;\nchannelApproach&quot;)
	
	type intpos struct {
            x, y, index int
        }

	results := make(chan intpos)

	// initialize routines
	for j := 0; j &lt;= 5; j++ {
		go func(index int) {
			// do processing
			results &lt;- intpos{index*2, index*3, index}			
		}(j)
	}
	fmt.Println(&quot;Waiting..&quot;)
	
	// collect results, iterate the same number of times
	result_table := make([]int, 6)
	for j := 0; j &lt;= 5; j++ {
		r := &lt;- results
		// watch out order, migth not be the same as in invocation, 
		// so that&#39;s why I store j in results as well
		fmt.Println(r.index, r.x, r.y)
		result_table[r.index] = r.x
	}
	fmt.Println(&quot;Finished..&quot;)
	fmt.Println(result_table)
}

I prefer the channel approach because it's more go idiomatic to me and it allows to easier handle panic, error conditions, etc.

huangapple
  • 本文由 发表于 2021年10月10日 19:51:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/69514900.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定