读写互斥与通道

huangapple go评论70阅读模式
英文:

Read-write exclusion with channels

问题

我想用Go语言编写一个小型的内存数据库。
读写请求将通过通道传递,并由数据库引擎处理,以确保访问正确执行。

第一个想法是模仿RWMutex的行为。只是它将使用更符合Go语言风格的方式。

这是一个小玩具(虽然有点长)的例子,展示了我想要做的事情。

package main

import (
    "log"
    "math/rand"
    "time"
)

var source *rand.Rand

type ReqType int

const (
    READ = iota
    WRITE
)

type DbRequest struct {
    Type  int              // 请求类型
    RespC chan *DbResponse // 请求响应的通道
    // 这里是内容
}

type DbResponse struct {
    // 这里是响应
}

type Db struct {
    // 这里是数据库
}

func randomWait() {
    time.Sleep(time.Duration(source.Intn(1000)) * time.Millisecond)
}

func (d *Db) readsHandler(in <-chan *DbRequest) {
    for r := range in {
        id := source.Intn(4000000)
        log.Println("读取 ", id, " 开始")
        randomWait()
        log.Println("读取 ", id, " 结束")
        r.RespC <- &DbResponse{}
    }
}

func (d *Db) writesHandler(r *DbRequest) *DbResponse {
    id := source.Intn(4000000)
    log.Println("写入 ", id, " 开始")
    randomWait()
    log.Println("写入 ", id, " 结束")
    return &DbResponse{}
}

func (d *Db) Start(nReaders int) chan *DbRequest {
    in := make(chan *DbRequest, 100)
    reads := make(chan *DbRequest, nReaders)

    // 启动读取器
    for k := 0; k < nReaders; k++ {
        go d.readsHandler(reads)
    }

    go func() {
        for r := range in {
            switch r.Type {
            case READ:
                reads <- r
            case WRITE:
                // 这里我们应该等待所有读取完成(如何实现?)

                r.RespC <- d.writesHandler(r)

                // 这里的writesHandler是阻塞的,
                // 这确保在写入完成之前不会向读取通道中添加额外的读取请求
            }
        }
    }()

    return in
}

func main() {
    seed := time.Now().Unix()
    source = rand.New(rand.NewSource(seed))

    blackhole := make(chan *DbResponse, 100)

    d := Db{}
    rc := d.Start(4)
    wc := time.After(3 * time.Second)

    go func() {
        for {
            <-blackhole
        }
    }()

    for {
        select {
        case <-wc:
            return
        default:
            if source.Intn(2) == 0 {
                rc <- &DbRequest{READ, blackhole}
            } else {
                rc <- &DbRequest{WRITE, blackhole}
            }
        }
    }
}

当然,这个例子展示了读写冲突。

我觉得我在尝试做一些有点邪恶的事情:使用旨在避免共享内存的构造来共享内存...
在这一点上,一个明显的解决方案是在两种类型的请求处理周围添加RWMutex锁,但也许有一种聪明的解决方案只使用goroutines和通道。

英文:

I would like to write a small in-memory database in Go.
Read and write requests would be passed through a channel and processed by the db engine which would ensure the accesses are done properly.

A first idea woud be to mimic the behaviour of RWMutex. Only it would use a more idiomatic go style.

Here is a little toy (although, rather long) example of what I would like to do.

package main

import (
    &quot;log&quot;
    &quot;math/rand&quot;
    &quot;time&quot;
)

var source *rand.Rand

type ReqType int

const (
    READ = iota
    WRITE
)

type DbRequest struct {
    Type  int              // request type
    RespC chan *DbResponse // channel for request response
    // content here
}

type DbResponse struct {
    // response here
}

type Db struct {
    // DB here
}

func randomWait() {
    time.Sleep(time.Duration(source.Intn(1000)) * time.Millisecond)
}

func (d *Db) readsHandler(in &lt;-chan *DbRequest) {
    for r := range in {
        id := source.Intn(4000000)
        log.Println(&quot;read &quot;, id, &quot; starts&quot;)
        randomWait()
        log.Println(&quot;read &quot;, id, &quot; ends&quot;)
        r.RespC &lt;- &amp;DbResponse{}
    }
}

func (d *Db) writesHandler(r *DbRequest) *DbResponse {
    id := source.Intn(4000000)
    log.Println(&quot;write &quot;, id, &quot; starts&quot;)
    randomWait()
    log.Println(&quot;write &quot;, id, &quot; ends&quot;)
    return &amp;DbResponse{}
}

func (d *Db) Start(nReaders int) chan *DbRequest {
    in := make(chan *DbRequest, 100)
    reads := make(chan *DbRequest, nReaders)

    // launch readers
    for k := 0; k &lt; nReaders; k++ {
        go d.readsHandler(reads)
    }

    go func() {
        for r := range in {
            switch r.Type {
            case READ:
                reads &lt;- r
            case WRITE:
                // here we should wait for all reads to
                // be over (how ??)

                r.RespC &lt;- d.writesHandler(r)

                // here writesHandler is blocking,
                // this ensures that no additional
                // read is added in the reads channel
                // before the write is finished
            }
        }
    }()

    return in
}

func main() {
    seed := time.Now().Unix()
    source = rand.New(rand.NewSource(seed))

    blackhole := make(chan *DbResponse, 100)

    d := Db{}
    rc := d.Start(4)
    wc := time.After(3 * time.Second)

    go func() {
        for {
            &lt;-blackhole
        }
    }()

    for {
        select {
        case &lt;-wc:
            return
        default:
            if source.Intn(2) == 0 {
                rc &lt;- &amp;DbRequest{READ, blackhole}
            } else {
                rc &lt;- &amp;DbRequest{WRITE, blackhole}
            }
        }
    }
}

Of course, this example shows read/write conflicts.

I feel like I'm trying to do something a bit evil: sharing memory using constructs designed to avoid it...
At this point, an obvious solution would be to add RWMutex locks around the two types of requests handling but maybe there is a clever solution using only goroutines and channels.

1: http://golang.org/pkg/sync/#RWMutex "sync/RWMutex reference"

答案1

得分: 7

为什么不直接使用RWMutex呢?它已经被优化得非常高效,并且概念上也很简单。只需要将其嵌入到你的Db对象中:

type Db struct {
    sync.RWMutex
    // 这里是DB
}

然后你可以这样调用它:

db := &Db{}
...
db.Lock()
// 进行读写操作
db.Unlock()
...
db.RLock()
// 进行读操作
db.RUnlock()

我不知道使用通道能否获得更好的性能。但是你可以通过无锁技术来获得更好的性能,不过我建议先让你的RWMutex版本运行起来。

另一个并发问题是fmt包向标准输出写入是不线程安全的,最终你会看到乱码输出。可以尝试使用log包代替。你可以将其设置为向标准输出写入,而且没有日志前缀,这样可以确保原子写入。

英文:

Why not just use RWMutex? It's been optimized to be very efficient and it's conceptually simple. Just embed one in your Db object

type Db struct {
    sync.RWMutex
    // DB here
}

and you can call it like

db := &amp;Db{}
...
db.Lock()
// do RW operations
db.Unlock()
...
db.RLock()
// do Read operations
db.RUnlock()

I don't know a way to get better performance using channels. You can however get better performance with lock-free techniques, but I recommend getting your RWMutex version running first.

Another concurrency issue is that fmt package writes to stdout are not thread safe and you will eventually see garbled output. Try the log package instead. You can set it to write
to stdout with no logging prefix and it will ensure atomic writes.

答案2

得分: 0

另一个可能的解决方案是通过通道传递数据库本身,然后只有在持有数据库时才更新它。这意味着您不需要对其进行锁定,因为只有持有者才能对其进行写入,并且内存模型保证对数据库的写入,如果我没记错的话。

英文:

Another possible solution, is to pass the database itself over a channel and then update it only when you hold the database. This means you don't need a lock on it since only the holder may write to it, and the memory model guarantees writes to the database, IIRC.

huangapple
  • 本文由 发表于 2012年12月27日 15:22:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/14050973.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定