英文:
parallelism in Golang loop
问题
我有一个项目,需要在CPU的多个核心上运行它以获得更快的速度。我在Fortran中使用了omplib,但对于Golang的并行性不太熟悉。我尝试了goroutines,但出现了问题,导致结果错误。这是我的代码:
package main
import (
	"bufio"
	"fmt"
	"log"
	"math"
	"math/rand"
	"os"
	"time"
)
const (
	n_particles int     = 2048
	n_steps     int     = 1000000
	dt          float64 = 1.0
	v0          float64 = 0.50
	radius      float64 = 1.0
	f_intensity float64 = 1.8
	scale       float64 = 32.0
	alpha       float64 = 1.0 / 36.0
)
var (
	x      [n_particles + 1]float64
	y      [n_particles + 1]float64
	angles [n_particles + 1]float64
	vx     [n_particles + 1]float64
	vy     [n_particles + 1]float64
	order  [n_steps + 1]float64
)
func main() {
	/////randomizer
	vstart := time.Now()
	rsource := rand.NewSource(time.Now().UnixNano())
	randomizer := rand.New(rsource)
	for i := 0; i <= n_particles; i++ {
		x[i] = (randomizer.Float64()) * scale
		y[i] = (randomizer.Float64()) * scale
		angles[i] = (randomizer.Float64()) * math.Pi * 2
		sin, cos := math.Sincos(angles[i])
		vx[i] = v0 * cos
		vy[i] = v0 * sin
	}
	//////main loop
	for i := 0; i <= n_steps; i++ {
		start := time.Now()
		for j := 0; j <= n_particles; j++ {
			x[j] = x[j] + (vx[j] * dt)
			//x[j] = math.Mod(x[j], scale)
			if x[j] < 0.0 {
				x[j] = x[j] + scale
			}
			if x[j] >= scale {
				x[j] = x[j] - scale
			}
			y[j] = y[j] + (vy[j] * dt)
			//y[j] = math.Mod(x[j], scale)
			if y[j] < 0.0 {
				y[j] = y[j] + scale
			}
			if y[j] >= scale {
				y[j] = y[j] - scale
			}
		}
		type intpos struct {
			x, y int64
		}
		adjacencyIndex := make(map[intpos][]int)
		////getting each boxes particles
		for j := 0; j <= n_particles; j++ {
			// . . .
			ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))                 // getting particle box
			adjacencyIndex[intpos{ix, iy}] = append(adjacencyIndex[intpos{ix, iy}], j) // adding particles to boxes
		}
		/////////
		m_angles := angles
		////particle loop - I WANT FOLLOWING LOOP PARALLEL
		for j := 0; j <= n_particles; j++ {
			sumanglesx := 0.0
			sumanglesy := 0.0
			ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))
			// fxi = math.Floor(x[j])
			// fyi = math.Floor(y[j])
			for dx := -1; dx <= 1; dx++ {
				for dy := -1; dy <= 1; dy++ {
					adjacentParticles := adjacencyIndex[intpos{ix + int64(dx), iy + int64(dy)}]
					for _, k := range adjacentParticles {
						dist := ((x[k] - x[j]) * (x[k] - x[j])) + ((y[k] - y[j]) * (y[k] - y[j]))
						if dist < radius {
							sy, sx := math.Sincos(angles[k])
							if k <= j {
								sumanglesx = sumanglesx + sx
								sumanglesy = sumanglesy + sy
							} else {
								sx = alpha * sx
								sy = alpha * sy
								sumanglesx = sumanglesx + sx
								sumanglesy = sumanglesy + sy
							}
						}
					}
				}
			}
			bsource := rand.NewSource(time.Now().UnixNano())
			bandomizer := rand.New(bsource)
			sumanglesy = sumanglesy
			sumanglesx = sumanglesx
			r_angles := math.Atan2(sumanglesy, sumanglesx)
		}
	}
}
现在我想让以下循环并行运行:
////particle loop - I WANT FOLLOWING LOOP PARALLEL
for j := 0; j <= n_particles; j++ {
	sumanglesx := 0.0
	sumanglesy := 0.0
	ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))
	// fxi = math.Floor(x[j])
	// fyi = math.Floor(y[j])
	for dx := -1; dx <= 1; dx++ {
		for dy := -1; dy <= 1; dy++ {
			adjacentParticles := adjacencyIndex[intpos{ix + int64(dx), iy + int64(dy)}]
			for _, k := range adjacentParticles {
				dist := ((x[k] - x[j]) * (x[k] - x[j])) + ((y[k] - y[j]) * (y[k] - y[j]))
				if dist < radius {
					sy, sx := math.Sincos(angles[k])
					if k <= j {
						sumanglesx = sumanglesx + sx
						sumanglesy = sumanglesy + sy
					} else {
						sx = alpha * sx
						sy = alpha * sy
						sumanglesx = sumanglesx + sx
						sumanglesy = sumanglesy + sy
					}
				}
			}
		}
	}
	bsource := rand.NewSource(time.Now().UnixNano())
	bandomizer := rand.New(bsource)
	sumanglesy = sumanglesy
	sumanglesx = sumanglesx
	r_angles := math.Atan2(sumanglesy, sumanglesx)
}
请注意,我已经在代码中用注释标记了要并行运行的循环部分。
英文:
I have a project and need to run it on multiple cores of an cpu to get more speed . I have used omplib in fortran but I am not familiar with Golang parallelism . I tried goroutines but that went wrong and made a mess and I got false results. This is my code :
package main
import (
	"bufio"
	"fmt"
	"log"
	"math"
	"math/rand"
	"os"
	"time"
)
const (
	n_particles int     = 2048
	n_steps     int     = 1000000
	dt          float64 = 1.0
	v0          float64 = 0.50
	radius      float64 = 1.0
	f_intensity float64 = 1.8
	scale       float64 = 32.0
	alpha       float64 = 1.0 / 36.0
)
var (
	x      [n_particles + 1]float64
	y      [n_particles + 1]float64
	angles [n_particles + 1]float64
	vx     [n_particles + 1]float64
	vy     [n_particles + 1]float64
	order  [n_steps + 1]float64
)
func main() {
	/////randomizer
	vstart := time.Now()
	rsource := rand.NewSource(time.Now().UnixNano())
	randomizer := rand.New(rsource)
	for i := 0; i <= n_particles; i++ {
		x[i] = (randomizer.Float64()) * scale
		y[i] = (randomizer.Float64()) * scale
		angles[i] = (randomizer.Float64()) * math.Pi * 2
		sin, cos := math.Sincos(angles[i])
		vx[i] = v0 * cos
		vy[i] = v0 * sin
	}
	//////main loop
	for i := 0; i <= n_steps; i++ {
		start := time.Now()
		for j := 0; j <= n_particles; j++ {
			x[j] = x[j] + (vx[j] * dt)
			//x[j] = math.Mod(x[j], scale)
			if x[j] < 0.0 {
				x[j] = x[j] + scale
			}
			if x[j] >= scale {
				x[j] = x[j] - scale
			}
			y[j] = y[j] + (vy[j] * dt)
			//y[j] = math.Mod(x[j], scale)
			if y[j] < 0.0 {
				y[j] = y[j] + scale
			}
			if y[j] >= scale {
				y[j] = y[j] - scale
			}
		}
		type intpos struct {
			x, y int64
		}
		adjacencyIndex := make(map[intpos][]int)
		////getting each boxes particles
		for j := 0; j <= n_particles; j++ {
			// . . .
			ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))                 // getting particle box
			adjacencyIndex[intpos{ix, iy}] = append(adjacencyIndex[intpos{ix, iy}], j) // adding particles to boxes
		}
		/////////
		m_angles := angles
		
Now I want following loop run in parallel :
////particle loop - I WANT FOLLOWING LOOP PARALLEL
for j := 0; j <= n_particles; j++ {
sumanglesx := 0.0
sumanglesy := 0.0
ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))
// fxi = math.Floor(x[j])
// fyi = math.Floor(y[j])
for dx := -1; dx <= 1; dx++ {
for dy := -1; dy <= 1; dy++ {
adjacentParticles := adjacencyIndex[intpos{ix + int64(dx), iy + int64(dy)}]
for _, k := range adjacentParticles {
dist := ((x[k] - x[j]) * (x[k] - x[j])) + ((y[k] - y[j]) * (y[k] - y[j]))
if dist < radius {
sy, sx := math.Sincos(angles[k])
if k <= j {
sumanglesx = sumanglesx + sx
sumanglesy = sumanglesy + sy
} else {
sx = alpha * sx
sy = alpha * sy
sumanglesx = sumanglesx + sx
sumanglesy = sumanglesy + sy
}
}
}
}
}
bsource := rand.NewSource(time.Now().UnixNano())
bandomizer := rand.New(bsource)
sumanglesy = sumanglesy
sumanglesx = sumanglesx
r_angles := math.Atan2(sumanglesy, sumanglesx)
}
}
}
I specified one loop which should run parallelly .
答案1
得分: 1
这里有两种尝试的方法:https://play.golang.org/p/O1uB2zzJEC5
我更喜欢使用通道的方法,因为对我来说更符合Go的习惯用法,并且可以更容易地处理panic、错误条件等。
英文:
Here are two approaches to try out: https://play.golang.org/p/O1uB2zzJEC5
package main
import (
	"fmt"
	"sync"
)
func main() {
  waitGroupApproach()
  channelApproach()
}
func waitGroupApproach() {
	fmt.Println("waitGroupApproach")
	var waitgroup sync.WaitGroup
	
	result_table := make([]int, 6, 6)
	
	for j := 0; j <= 5; j++ {
		waitgroup.Add(1)
		
		go func(index int) {
			fmt.Println(index) // try putting here `j` instea of `index`
			result_table[index] = index*2
		
			waitgroup.Done()
		}(j) // you have to put any for-loop variables into closure
		// because otherwsie all routines inside will likely get the last j == n_particles + 1
		// as they will likely run after the loop has finished
	}
	
	fmt.Println("waiting")
	waitgroup.Wait()
	// process results further
	fmt.Println("finished")
	fmt.Println(result_table)
}
func channelApproach() {
	fmt.Println("\nchannelApproach")
	
	type intpos struct {
            x, y, index int
        }
	results := make(chan intpos)
	// initialize routines
	for j := 0; j <= 5; j++ {
		go func(index int) {
			// do processing
			results <- intpos{index*2, index*3, index}			
		}(j)
	}
	fmt.Println("Waiting..")
	
	// collect results, iterate the same number of times
	result_table := make([]int, 6)
	for j := 0; j <= 5; j++ {
		r := <- results
		// watch out order, migth not be the same as in invocation, 
		// so that's why I store j in results as well
		fmt.Println(r.index, r.x, r.y)
		result_table[r.index] = r.x
	}
	fmt.Println("Finished..")
	fmt.Println(result_table)
}
I prefer the channel approach because it's more go idiomatic to me and it allows to easier handle panic, error conditions, etc.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论