
huangapple go评论108阅读模式

Read-write exclusion with channels





  1. package main
  2. import (
  3. "log"
  4. "math/rand"
  5. "time"
  6. )
  7. var source *rand.Rand
  8. type ReqType int
  9. const (
  10. READ = iota
  11. WRITE
  12. )
  13. type DbRequest struct {
  14. Type int // 请求类型
  15. RespC chan *DbResponse // 请求响应的通道
  16. // 这里是内容
  17. }
  18. type DbResponse struct {
  19. // 这里是响应
  20. }
  21. type Db struct {
  22. // 这里是数据库
  23. }
  24. func randomWait() {
  25. time.Sleep(time.Duration(source.Intn(1000)) * time.Millisecond)
  26. }
  27. func (d *Db) readsHandler(in <-chan *DbRequest) {
  28. for r := range in {
  29. id := source.Intn(4000000)
  30. log.Println("读取 ", id, " 开始")
  31. randomWait()
  32. log.Println("读取 ", id, " 结束")
  33. r.RespC <- &DbResponse{}
  34. }
  35. }
  36. func (d *Db) writesHandler(r *DbRequest) *DbResponse {
  37. id := source.Intn(4000000)
  38. log.Println("写入 ", id, " 开始")
  39. randomWait()
  40. log.Println("写入 ", id, " 结束")
  41. return &DbResponse{}
  42. }
  43. func (d *Db) Start(nReaders int) chan *DbRequest {
  44. in := make(chan *DbRequest, 100)
  45. reads := make(chan *DbRequest, nReaders)
  46. // 启动读取器
  47. for k := 0; k < nReaders; k++ {
  48. go d.readsHandler(reads)
  49. }
  50. go func() {
  51. for r := range in {
  52. switch r.Type {
  53. case READ:
  54. reads <- r
  55. case WRITE:
  56. // 这里我们应该等待所有读取完成(如何实现?)
  57. r.RespC <- d.writesHandler(r)
  58. // 这里的writesHandler是阻塞的,
  59. // 这确保在写入完成之前不会向读取通道中添加额外的读取请求
  60. }
  61. }
  62. }()
  63. return in
  64. }
  65. func main() {
  66. seed := time.Now().Unix()
  67. source = rand.New(rand.NewSource(seed))
  68. blackhole := make(chan *DbResponse, 100)
  69. d := Db{}
  70. rc := d.Start(4)
  71. wc := time.After(3 * time.Second)
  72. go func() {
  73. for {
  74. <-blackhole
  75. }
  76. }()
  77. for {
  78. select {
  79. case <-wc:
  80. return
  81. default:
  82. if source.Intn(2) == 0 {
  83. rc <- &DbRequest{READ, blackhole}
  84. } else {
  85. rc <- &DbRequest{WRITE, blackhole}
  86. }
  87. }
  88. }
  89. }




I would like to write a small in-memory database in Go.
Read and write requests would be passed through a channel and processed by the db engine which would ensure the accesses are done properly.

A first idea woud be to mimic the behaviour of RWMutex. Only it would use a more idiomatic go style.

Here is a little toy (although, rather long) example of what I would like to do.

  1. package main
  2. import (
  3. &quot;log&quot;
  4. &quot;math/rand&quot;
  5. &quot;time&quot;
  6. )
  7. var source *rand.Rand
  8. type ReqType int
  9. const (
  10. READ = iota
  11. WRITE
  12. )
  13. type DbRequest struct {
  14. Type int // request type
  15. RespC chan *DbResponse // channel for request response
  16. // content here
  17. }
  18. type DbResponse struct {
  19. // response here
  20. }
  21. type Db struct {
  22. // DB here
  23. }
  24. func randomWait() {
  25. time.Sleep(time.Duration(source.Intn(1000)) * time.Millisecond)
  26. }
  27. func (d *Db) readsHandler(in &lt;-chan *DbRequest) {
  28. for r := range in {
  29. id := source.Intn(4000000)
  30. log.Println(&quot;read &quot;, id, &quot; starts&quot;)
  31. randomWait()
  32. log.Println(&quot;read &quot;, id, &quot; ends&quot;)
  33. r.RespC &lt;- &amp;DbResponse{}
  34. }
  35. }
  36. func (d *Db) writesHandler(r *DbRequest) *DbResponse {
  37. id := source.Intn(4000000)
  38. log.Println(&quot;write &quot;, id, &quot; starts&quot;)
  39. randomWait()
  40. log.Println(&quot;write &quot;, id, &quot; ends&quot;)
  41. return &amp;DbResponse{}
  42. }
  43. func (d *Db) Start(nReaders int) chan *DbRequest {
  44. in := make(chan *DbRequest, 100)
  45. reads := make(chan *DbRequest, nReaders)
  46. // launch readers
  47. for k := 0; k &lt; nReaders; k++ {
  48. go d.readsHandler(reads)
  49. }
  50. go func() {
  51. for r := range in {
  52. switch r.Type {
  53. case READ:
  54. reads &lt;- r
  55. case WRITE:
  56. // here we should wait for all reads to
  57. // be over (how ??)
  58. r.RespC &lt;- d.writesHandler(r)
  59. // here writesHandler is blocking,
  60. // this ensures that no additional
  61. // read is added in the reads channel
  62. // before the write is finished
  63. }
  64. }
  65. }()
  66. return in
  67. }
  68. func main() {
  69. seed := time.Now().Unix()
  70. source = rand.New(rand.NewSource(seed))
  71. blackhole := make(chan *DbResponse, 100)
  72. d := Db{}
  73. rc := d.Start(4)
  74. wc := time.After(3 * time.Second)
  75. go func() {
  76. for {
  77. &lt;-blackhole
  78. }
  79. }()
  80. for {
  81. select {
  82. case &lt;-wc:
  83. return
  84. default:
  85. if source.Intn(2) == 0 {
  86. rc &lt;- &amp;DbRequest{READ, blackhole}
  87. } else {
  88. rc &lt;- &amp;DbRequest{WRITE, blackhole}
  89. }
  90. }
  91. }
  92. }

Of course, this example shows read/write conflicts.

I feel like I'm trying to do something a bit evil: sharing memory using constructs designed to avoid it...
At this point, an obvious solution would be to add RWMutex locks around the two types of requests handling but maybe there is a clever solution using only goroutines and channels.

1: http://golang.org/pkg/sync/#RWMutex "sync/RWMutex reference"


得分: 7


  1. type Db struct {
  2. sync.RWMutex
  3. // 这里是DB
  4. }


  1. db := &Db{}
  2. ...
  3. db.Lock()
  4. // 进行读写操作
  5. db.Unlock()
  6. ...
  7. db.RLock()
  8. // 进行读操作
  9. db.RUnlock()




Why not just use RWMutex? It's been optimized to be very efficient and it's conceptually simple. Just embed one in your Db object

  1. type Db struct {
  2. sync.RWMutex
  3. // DB here
  4. }

and you can call it like

  1. db := &amp;Db{}
  2. ...
  3. db.Lock()
  4. // do RW operations
  5. db.Unlock()
  6. ...
  7. db.RLock()
  8. // do Read operations
  9. db.RUnlock()

I don't know a way to get better performance using channels. You can however get better performance with lock-free techniques, but I recommend getting your RWMutex version running first.

Another concurrency issue is that fmt package writes to stdout are not thread safe and you will eventually see garbled output. Try the log package instead. You can set it to write
to stdout with no logging prefix and it will ensure atomic writes.


得分: 0



Another possible solution, is to pass the database itself over a channel and then update it only when you hold the database. This means you don't need a lock on it since only the holder may write to it, and the memory model guarantees writes to the database, IIRC.

  • 本文由 发表于 2012年12月27日 15:22:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/14050973.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
