如何实现Go并发地管理正在使用的资源的映射或切片,以提高速度?

huangapple go评论76阅读模式
英文:

How to implement Go concurrent map or slice for managing in-use resources faster?

问题

假设你有一个表示资源的结构体,每次只能有一个用户访问该资源。可能看起来像这样:

type Resource struct{
    InUse bool // 或者使用 int32/int64 如果你想使用原子操作
    Resource string // 映射到资源的参数,比如 `/dev/chardeviceXXX`
}

这些资源是有限的,用户将随机并发地请求访问它们,因此你将它们封装在一个管理器中:

type ResourceManager struct{
    Resources []*Resource // 或者使用 map
}

我正在尝试找出管理器创建一个名为 func (m *ResourceManager) GetUnusedResource() (*Resource, error) 的函数的最佳、安全的方式,该函数应该:

  • 遍历所有资源,直到找到一个未被使用的资源
  • 将其标记为已使用,并将 *Resource 返回给调用上下文/Go程
  • 我希望使用锁来避免任何系统级别的锁定(如 flock),并且在 Go 中完成所有操作
  • 还需要一个函数来标记资源不再使用

目前,我在管理器中使用互斥锁来锁定访问,以便在遍历整个切片时进行操作。这是安全的,但我希望通过能够并发地搜索未使用的资源并处理两个 Go 程尝试将同一资源标记为已使用来加快速度。

更新

我特别想知道将 Resource 的 InUse 字段设为 int64,然后使用 atomic.CompareAndSwapInt64 是否能够在找到未使用的资源时立即锁定资源:

func (m *ResourceManager) GetUnusedResource() (*Resource, error) {
    for i := range Resources {
        if atomic.CompareAndSwapInt64(&Resources[i].InUse, 1) {
            return Resources[i], nil
        }
    }
    return nil, errors.New("all resources in use")
}

如果有任何更好的单元测试来测试这个函数,也请提供。

英文:

Image you have a struct that represents a resource that only one user can access at a time. Might look something like this:

type Resource struct{
    InUse bool//or int32/int64 is you want to use atomics
    Resource string //parameters that map to the resource, think `/dev/chardeviceXXX`
}

There are a finite number of these resources and users will request access to them randomly and concurrently so you package them in a manager

type ResourceManager struct{
    Resources []*Resource //or a map 
}

I am trying to figure out the optimal, safe way for the manager to create a function func (m *ResourceManager)GetUnusedResouce()(*Resouce,error) that will:

  • Iterate though all the resources until one that is not InUse is found
  • Mark it as InUse and return the *Resouce to the calling context/goroutine
  • I'd lock to avoid any system level locking (flock) and do this all in Go
  • There also needs to be a function to mark the Resouce are no longer in Use

Right now I use a mutex in the manager to lock access as I iterate through the entire slice. It is safe, but I am hoping to speed this up by being able to search for an used resource concurrently and handle two goroutines trying to mark the same resource as InUse.

Update

I am specifically wondering if making the Resource InUse field an int64 and then using atomic.CompareAndSwapInt64 would allow the Resource manager to lock right when it found an unused resource:

func (m *ResourceManager)GetUnusedResouce()(*Resouce,error){
    for i := range Resources{
        if atomic.CompareAndSwapInt64(&Resouces[i].InUse,1){
            return Resouces[i],nil
        }
    }
    return nil, errors.New("all resouces in use")
}

Any unit tests to better test this would also be appreciated.

答案1

得分: 4

在问题中,GetUnusedResouce函数可能会对所有资源执行比较和交换操作。根据资源数量和应用程序访问模式的不同,通过互斥锁保护的少量操作可能更快。

使用单链表来实现快速的获取和放置操作。

type Resource struct {
    next     *Resource
    Resource string
}

type ResourceManager struct {
    free *Resource
    mu   sync.Mutex
}

// Get从管理器中获取一个空闲资源,如果管理器为空,则返回nil。
func (m *ResourceManager) Get() *Resource {
    m.mu.Lock()
    defer m.mu.Unlock()
    result := m.free
    if m.free != nil {
        m.free = m.free.next
    }
    return result
}

// Put将资源返回到池中。
func (m *ResourceManager) Put(r *Resource) {
    m.mu.Lock()
    defer m.mu.Unlock()
    r.next = m.free
    m.free = r
}

以下是在测试中使用的示例:

func TestResourceManager(t *testing.T) {

    // 将空闲资源添加到管理器中。
    var m ResourceManager
    m.Put(&Resource{Resource: "/dev/a"})
    m.Put(&Resource{Resource: "/dev/b"})

    // 测试是否可以从池中获取所有资源。

    ra := m.Get()
    rb := m.Get()
    if ra.Resource > rb.Resource {
        // 对ra、rb进行排序,使测试独立于顺序。
        ra, rb = rb, ra
    }
    if ra == nil || ra.Resource != "/dev/a" {
        t.Errorf("ra is %v, want /dev/a", ra)
    }
    if rb == nil || rb.Resource != "/dev/b" {
        t.Errorf("rb is %v, want /dev/b", rb)
    }

    // 检查空池。

    r := m.Get()
    if r != nil {
        t.Errorf("r is %v, want nil", r)
    }

    // 返回一个资源并再次尝试。

    m.Put(ra)
    ra = m.Get()
    if ra == nil || ra.Resource != "/dev/a" {
        t.Errorf("ra is %v, want /dev/a", ra)
    }
    r = m.Get()
    if r != nil {
        t.Errorf("r is %v, want nil", r)
    }

}

[ playground 上运行测试](https://play.golang.org/p/BAuCVrPW-mv)。

如果对资源数量有一个已知的合理上限可以使用通道这种方法利用了运行时高度优化的通道实现

```go
type Resource struct {
    Resource string
}

type ResourceManager struct {
    free chan *Resource
}

// Get从管理器中获取一个空闲资源,如果管理器为空,则返回nil。
func (m *ResourceManager) Get() *Resource {
    select {
    case r := <-m.free:
        return r
    default:
        return nil
    }
}

// Put将资源返回到池中。
func (m *ResourceManager) Put(r *Resource) {
    m.free <- r
}

// NewResourceManager返回一个最多可以容纳n个空闲资源的管理器。
func NewResourceManager(n int) *ResourceManager {
    return &ResourceManager{free: make(chan *Resource, n)}
}

使用上面的TestResourceManager函数测试此实现,但将var m ResourceManager替换为m := NewResourceManager(4)

在 Go playground 上运行测试

英文:

The GetUnusedResouce function in the question can potentially execute compare and swap operations for all resources. Depending on the number of resources and the application access pattern, it can be quicker to execute a small number of operations protected by a mutex.

Use a singly linked list to implement fast get and put operations.

type Resource struct {
next     *Resource
Resource string
}
type ResourceManager struct {
free *Resource
mu   sync.Mutex
}
// Get gets a free resource from the manager or returns
// nil when the manager is empty.
func (m *ResourceManager) Get() *Resource {
m.mu.Lock()
defer m.mu.Unlock()
result := m.free
if m.free != nil {
m.free = m.free.next
}
return result
}
// Put returns a resource to the pool.
func (m *ResourceManager) Put(r *Resource) {
m.mu.Lock()
defer m.mu.Unlock()
r.next = m.free
m.free = r
}

Here's an example use in a test:

func TestResourceManager(t *testing.T) {
// Add free resources to a manager.
var m ResourceManager
m.Put(&amp;Resource{Resource: &quot;/dev/a&quot;})
m.Put(&amp;Resource{Resource: &quot;/dev/b&quot;})
// Test that we can get all resources from the pool.
ra := m.Get()
rb := m.Get()
if ra.Resource &gt; rb.Resource {
// Sort ra, rb to make test independent of order.
ra, rb = rb, ra
}
if ra == nil || ra.Resource != &quot;/dev/a&quot; {
t.Errorf(&quot;ra is %v, want /dev/a&quot;, ra)
}
if rb == nil || rb.Resource != &quot;/dev/b&quot; {
t.Errorf(&quot;rb is %v, want /dev/b&quot;, rb)
}
// Check for empty pool.
r := m.Get()
if r != nil {
t.Errorf(&quot;r is %v, want nil&quot;, r)
}
// Return one resource and try again.
m.Put(ra)
ra = m.Get()
if ra == nil || ra.Resource != &quot;/dev/a&quot; {
t.Errorf(&quot;ra is %v, want /dev/a&quot;, ra)
}
r = m.Get()
if r != nil {
t.Errorf(&quot;r is %v, want nil&quot;, r)
}
}

Run the test on the playground.

Use a channel if there's a known reasonable bound on the number of resources. This approach takes advantage of the runtime's highly optimized channel implementation.

type Resource struct {
Resource string
}
type ResourceManager struct {
free chan *Resource
}
// Get gets a free resource from the manager or returns
// nil when the manager is empty.
func (m *ResourceManager) Get() *Resource {
select {
case r := &lt;-m.free:
return r
default:
return nil
}
}
// Put returns a resource to the pool.
func (m *ResourceManager) Put(r *Resource) {
m.free &lt;- r
}
// NewResourceManager returns a manager that can hold up to
// n free resources.
func NewResourceManager(n int) *ResourceManager {
return &amp;ResourceManager{free: make(chan *Resource, n)}
}

Test this implementation using the TestResourceManager function above, but replace var m ResourceManager with m := NewResourceManager(4).

Run the test on the Go playground.

答案2

得分: 1

给定资源是否正在使用不是Resource本身的属性,而是ResourceManager的属性。

实际上,没有必要跟踪正在使用的资源(除非你有某种在问题中未提到的原因)。当资源被释放时,可以将正在使用的资源简单地放回池中。

以下是使用通道的可能实现。不需要单个互斥锁,也不需要任何原子CAS。

package main

import (
    fmt "fmt"
    "time"
)

type Resource struct {
    Data string
}

type ResourceManager struct {
    resources []*Resource
    closeCh   chan struct{}
    acquireCh chan *Resource
    releaseCh chan *Resource
}

func NewResourceManager() *ResourceManager {
    r := &ResourceManager{
        closeCh:   make(chan struct{}),
        acquireCh: make(chan *Resource),
        releaseCh: make(chan *Resource),
    }
    go r.run()
    return r
}

func (r *ResourceManager) run() {
    defer close(r.acquireCh)
    for {
        if len(r.resources) > 0 {
            select {
            case r.acquireCh <- r.resources[len(r.resources)-1]:
                r.resources = r.resources[:len(r.resources)-1]
            case res := <-r.releaseCh:
                r.resources = append(r.resources, res)
            case <-r.closeCh:
                return
            }
        } else {
            select {
            case res := <-r.releaseCh:
                r.resources = append(r.resources, res)
            case <-r.closeCh:
                return
            }
        }
    }
}

func (r *ResourceManager) AcquireResource() *Resource {
    return <-r.acquireCh
}

func (r *ResourceManager) ReleaseResource(res *Resource) {
    r.releaseCh <- res
}

func (r *ResourceManager) Close() {
    close(r.closeCh)
}

// small demo below ...

func test(id int, r *ResourceManager) {
    for {
        res := r.AcquireResource()
        fmt.Printf("test %d: %s\n", id, res.Data)
        time.Sleep(time.Millisecond)
        r.ReleaseResource(res)
    }
}

func main() {
    r := NewResourceManager()
    r.ReleaseResource(&Resource{"Resource A"}) // initial setup
    r.ReleaseResource(&Resource{"Resource B"}) // initial setup
    go test(1, r)
    go test(2, r)
    go test(3, r) // 3 consumers, but only 2 resources ...
    time.Sleep(time.Second)
    r.Close()
}
英文:

Whether or not a given resource is in-use is not a property of the Resource itself, but of the ResourceManager.

In fact, there is no need to keep track of in-use resources (unless you need to for some reason not mentioned in the question). An in-use resource can be simply put back into the pool when it is released.

Here's a possible implementation using channels. Not a single mutex, nor any atomic CAS needed.

package main
import (
fmt &quot;fmt&quot;
&quot;time&quot;
)
type Resource struct {
Data string
}
type ResourceManager struct {
resources []*Resource
closeCh   chan struct{}
acquireCh chan *Resource
releaseCh chan *Resource
}
func NewResourceManager() *ResourceManager {
r := &amp;ResourceManager{
closeCh:   make(chan struct{}),
acquireCh: make(chan *Resource),
releaseCh: make(chan *Resource),
}
go r.run()
return r
}
func (r *ResourceManager) run() {
defer close(r.acquireCh)
for {
if len(r.resources) &gt; 0 {
select {
case r.acquireCh &lt;- r.resources[len(r.resources)-1]:
r.resources = r.resources[:len(r.resources)-1]
case res := &lt;-r.releaseCh:
r.resources = append(r.resources, res)
case &lt;-r.closeCh:
return
}
} else {
select {
case res := &lt;-r.releaseCh:
r.resources = append(r.resources, res)
case &lt;-r.closeCh:
return
}
}
}
}
func (r *ResourceManager) AcquireResource() *Resource {
return &lt;-r.acquireCh
}
func (r *ResourceManager) ReleaseResource(res *Resource) {
r.releaseCh &lt;- res
}
func (r *ResourceManager) Close() {
close(r.closeCh)
}
// small demo below ...
func test(id int, r *ResourceManager) {
for {
res := r.AcquireResource()
fmt.Printf(&quot;test %d: %s\n&quot;, id, res.Data)
time.Sleep(time.Millisecond)
r.ReleaseResource(res)
}
}
func main() {
r := NewResourceManager()
r.ReleaseResource(&amp;Resource{&quot;Resource A&quot;}) // initial setup
r.ReleaseResource(&amp;Resource{&quot;Resource B&quot;}) // initial setup
go test(1, r)
go test(2, r)
go test(3, r) // 3 consumers, but only 2 resources ...
time.Sleep(time.Second)
r.Close()
}

huangapple
  • 本文由 发表于 2021年10月20日 23:01:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/69648448.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定