Go writer with buffer and timeout

huangapple go评论75阅读模式
英文:

Go writer with buffer and timeout

问题

我想使用Go语言从SQS发送请求到AWS。有一些单事件模式下的示例,使用v2 SDK中的sqs.SendMessageInput,但我想批量发送。我已经创建了一个接口,以便将实现细节与我的其他代码分离,类似于以下内容:

type UserRepository interface {
   Save(context.Context, User) error
}

如你所见,这个接口并不特定于SQS,可以很容易地替换为例如Postgres的实现。我希望保持接口尽可能简洁。

使用SQS时,我能想到一些考虑事项,可能还有一些我没有考虑到的:

  1. 每隔nSave之后需要发送一个批次。
  2. 可配置的超时时间后需要发送批次。
  3. 在上下文取消后需要发送批次。
  4. 在调用方完成保存所有用户对象后需要发送批次。

请注意,除了发送到SQS之外,这个设计也可以类似地用于写入控制台,这是我在示例中创建的。

这个设计是否可行,或者我总是需要在接口中有一个Close函数?

代码的问题在于最后4个(nr_of_items % batch_size)将不会被“保存”。

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"sync"
	"time"
)

type LoggingBufferedUserRepository struct {
	buffer        []string
	bufferSize    int
	bufferTimeout time.Duration
	mutex         sync.Mutex
	closeChan     chan struct{}
}

func NewLoggingBufferedUserRepository(
	ctx context.Context, bufferSize int, bufferTimeout time.Duration,
) *LoggingBufferedUserRepository {
	client := &LoggingBufferedUserRepository{
		bufferSize:    bufferSize,
		bufferTimeout: bufferTimeout,
		closeChan:     make(chan struct{}),
	}

	go client.bufferMonitor(ctx)
	return client
}

func (c *LoggingBufferedUserRepository) SendMessage(ctx context.Context, input string) {
	c.mutex.Lock()
	defer c.mutex.Unlock()
	c.buffer = append(c.buffer, input)
	if len(c.buffer) >= c.bufferSize {
		go c.flush(ctx, c.buffer)
		c.buffer = []string{}
	}
	return
}

func (c *LoggingBufferedUserRepository) flush(ctx context.Context, buffer []string) {
	if len(buffer) == 0 {
		return
	}

	// 这是实际的批量“保存”:
	fmt.Printf("flushing buffer, size=%d, cid=%s, buffer=%v\n", len(buffer), ctx.Value("cid"), buffer)
}

func (c *LoggingBufferedUserRepository) bufferMonitor(ctx context.Context) {
	timeout := time.NewTimer(c.bufferTimeout)
	for {
		select {
		case <-timeout.C:
			c.flush(ctx, c.buffer)
			c.buffer = []string{}
		}

		timeout.Reset(c.bufferTimeout)
	}
}

func main() {
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
	defer stop()
	g := NewLoggingBufferedUserRepository(ctx, 10, 1*time.Second)

	wg := &sync.WaitGroup{}
	for i := 0; i < 15; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			fmt.Printf("sending message %d\n", i)
			g.SendMessage(ctx, fmt.Sprintf("a%d", i))
		}(i)
		time.Sleep(100 * time.Millisecond)
	}
	wg.Wait()
	fmt.Println("done")
}
英文:

I would like to send requests to AWS from SQS using Go. There are examples of doing this in single event mode, using sqs.SendMessageInput from the v2 SDK, but I would like to batch send instead. I have created an interface to abstract away the implementation details from the rest of my code, along the lines of this:

type UserRepository interface {
   Save(context.Context, User) error
}

As you can see, the interface is not SQS specific and could easily be replaced by e.g. a Postgres implementation. I would really like to keep the interface as clean as possible.

Using SQS there are some considerations that I can think of and probably also some that I didn't.
A batch needs to be sent:

  1. after every nth Save
  2. after a configurable timeout
  3. after context cancellation
  4. after the calling site is done with saving all User objects

Note that instead of sending to SQS this should also work similarly for writing to console, which is what I have created in my example.

Is this design possible or will I always have to have a Close function in my interface?

The problem with the code is that the last 4 (nr_of_items % batch_size) will not be "saved."

package main

import (
	&quot;context&quot;
	&quot;fmt&quot;
	&quot;os&quot;
	&quot;os/signal&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

type LoggingBufferedUserRepository struct {
	buffer        []string
	bufferSize    int
	bufferTimeout time.Duration
	mutex         sync.Mutex
	closeChan     chan struct{}
}

func NewLoggingBufferedUserRepository(
	ctx context.Context, bufferSize int, bufferTimeout time.Duration,
) *LoggingBufferedUserRepository {
	client := &amp;LoggingBufferedUserRepository{
		bufferSize:    bufferSize,
		bufferTimeout: bufferTimeout,
		closeChan:     make(chan struct{}),
	}

	go client.bufferMonitor(ctx)
	return client
}

func (c *LoggingBufferedUserRepository) SendMessage(ctx context.Context, input string) {
	c.mutex.Lock()
	defer c.mutex.Unlock()
	c.buffer = append(c.buffer, input)
	if len(c.buffer) &gt;= c.bufferSize {
		go c.flush(ctx, c.buffer)
		c.buffer = []string{}
	}
	return
}

func (c *LoggingBufferedUserRepository) flush(ctx context.Context, buffer []string) {
	if len(buffer) == 0 {
		return
	}

	// This is the actual batch &#39;save&#39;:
	fmt.Printf(&quot;flushing buffer, size=%d, cid=%s, buffer=$%v\n&quot;, len(buffer), ctx.Value(&quot;cid&quot;), buffer)
}

func (c *LoggingBufferedUserRepository) bufferMonitor(ctx context.Context) {
	timeout := time.NewTimer(c.bufferTimeout)
	for {
		select {
		case &lt;-timeout.C:
			c.flush(ctx, c.buffer)
			c.buffer = []string{}
		}

		timeout.Reset(c.bufferTimeout)
	}
}

func main() {
	ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
	defer stop()
	g := NewLoggingBufferedUserRepository(ctx, 10, 1*time.Second)

	wg := &amp;sync.WaitGroup{}
	for i := 0; i &lt; 15; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			fmt.Printf(&quot;sending message %d\n&quot;, i)
			g.SendMessage(ctx, fmt.Sprintf(&quot;a%d&quot;, i))
		}(i)
		time.Sleep(100 * time.Millisecond)
	}
	wg.Wait()
	fmt.Println(&quot;done&quot;)
}

答案1

得分: 1

有多种方法可以做到这一点。一般来说,不建议基于超时来刷新这种缓冲实现,因为如果出现故障,你无法控制会发生什么。

一种方法是进行显式的批量操作:

type UserRepository interface {
   // 保存单个用户
   Save(context.Context, User) error
   // 批量保存
   SaveBatch(context.Context) UserBatch
}

其中

type UserBatch interface {
   Save(context.Context,User) error
   Flush(context.Context) error
   Close(context.Context) error
}
英文:

There are multiple ways of doing this. In general, it is not advisable to flush such buffered implementations based on timeout, because you have no control over what will happen if things fail.

One way is to make an explicit batch operation:

type UserRepository interface {
// Save single user
Save(context.Context, User) error
// Save batch
SaveBatch(context.Context) UserBatch
}

where

type UserBatch interface {
Save(context.Context,User) error
Flush(context.Context) error
Close(context.Context) error
}

答案2

得分: 1

UserRepository接口是否支持批量操作而无需关闭函数?

是的,一种解决方案是在接口中添加一个不同的Flush函数,调用者可以使用该函数显式地刷新任何剩余的缓冲项。你的接口如下所示:

type UserRepository interface {
    Save(context.Context, User) error
    Flush(context.Context) error
    SaveAll(context.Context) error
}

一个实现这个UserRepository接口的示例,可以参考以下代码:playground

// for SO testing, by JS

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type User struct {
	ID   int
	Name string
}
type UserRepository interface {
	Save(context.Context, User) error
	Flush(context.Context) error
	SaveAll(context.Context) error
}

type LoggingBufferedUserRepository struct {
	buffer        []string
	bufferSize    int
	bufferTimeout time.Duration
	mutex         sync.Mutex
	// we don't need close here now
}

// add the item to the buffer and flushes the buffer if it reaches the configured size
func (c *LoggingBufferedUserRepository) Save(ctx context.Context, user User) error {
	// implement save functionality
	fmt.Printf("Save called %v: %v\n", user.Name, user.ID)
	return nil
}

// can - Flush function immediately flushes any buffered items
func (c *LoggingBufferedUserRepository) Flush(ctx context.Context) error {
	// implement Flush functionality
	fmt.Printf("Flush called\n")
	return nil
}

// waits for any pending items to be saved before returning, and flushes any remaining buffered items if there are any
func (c *LoggingBufferedUserRepository) SaveAll(ctx context.Context) error {
	// implement SaveAll functionality
	fmt.Printf("SaveAll called\n")
	return nil
}

func (c *LoggingBufferedUserRepository) flush(ctx context.Context) error {
	if len(c.buffer) == 0 {
		return nil
	}

	// This is your actual batch 'save':
	fmt.Printf("flushing buffer, size=%d, cid=%s, buffer=$%v\n", len(c.buffer), ctx.Value("cid"), c.buffer)
	c.buffer = []string{}

	return nil
}

func main() {
	repo := &LoggingBufferedUserRepository{
		bufferSize:    5,
		bufferTimeout: 1 * time.Second,
	}

	users := []User{
		{ID: 1, Name: "Alice"},
		{ID: 2, Name: "Bob"},
		{ID: 3, Name: "Charlie"},
		{ID: 4, Name: "Dave"},
		{ID: 5, Name: "Eve"},
	}

	for _, user := range users {
		if err := repo.Save(context.Background(), user); err != nil {
			fmt.Printf("error saving user %v: %v\n", user, err)
		}
	}

	fmt.Printf("Waiting for flushing... (5 seconds)\n")
	time.Sleep(5 * time.Second)

	// Save remaining items
	if err := repo.SaveAll(context.Background()); err != nil {
		fmt.Printf("error saving remaining items: %v\n", err)
	}

	// Flush remaining items
	if err := repo.Flush(context.Background()); err != nil {
		fmt.Printf("error flushing remaining items: %v\n", err)
	}
}

这段代码的输出应该是:

Save called Alice: 1
Save called Bob: 2
Save called Charlie: 3
Save called Dave: 4
Save called Eve: 5
Waiting for flushing... (5 seconds)
SaveAll called
Flush called
Program exited.
英文:

> Can UserRepository interface support the batch operations without
> needing a close function?

Yes, one solution is to have a different Flush function in your interface that the caller can use to explicitly flush any remaining buffered items. Your interface:

type UserRepository interface {
    Save(context.Context, User) error
    Flush(context.Context) error
    SaveAll(context.Context) error
}

One example to implement this UserRepository for your likings, could be something like playground:

// for SO testing, by JS

package main

import (
	&quot;context&quot;
	&quot;fmt&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

type User struct {
	ID   int
	Name string
}
type UserRepository interface {
	Save(context.Context, User) error
	Flush(context.Context) error
	SaveAll(context.Context) error
}

type LoggingBufferedUserRepository struct {
	buffer        []string
	bufferSize    int
	bufferTimeout time.Duration
	mutex         sync.Mutex
	// we don&#39;t need close here now
}

// add the item to the buffer and flushes the buffer if it reaches the configured size
func (c *LoggingBufferedUserRepository) Save(ctx context.Context, user User) error {
	// implement save functionality
	fmt.Printf(&quot;Save called %v: %v\n&quot;, user.Name, user.ID)
	return nil
}

// can - Flush function immediately flushes any buffered items
func (c *LoggingBufferedUserRepository) Flush(ctx context.Context) error {
	// implement Flush functionality
	fmt.Printf(&quot;Flush called\n&quot;)
	return nil
}

// waits for any pending items to be saved before returning, and flushes any remaining buffered items if there are any
func (c *LoggingBufferedUserRepository) SaveAll(ctx context.Context) error {
	// implement SaveAll functionality
	fmt.Printf(&quot;SaveAll called\n&quot;)
	return nil
}

func (c *LoggingBufferedUserRepository) flush(ctx context.Context) error {
	if len(c.buffer) == 0 {
		return nil
	}

	// This is your actual batch &#39;save&#39;:
	fmt.Printf(&quot;flushing buffer, size=%d, cid=%s, buffer=$%v\n&quot;, len(c.buffer), ctx.Value(&quot;cid&quot;), c.buffer)
	c.buffer = []string{}

	return nil
}

func main() {
	repo := &amp;LoggingBufferedUserRepository{
		bufferSize:    5,
		bufferTimeout: 1 * time.Second,
	}

	users := []User{
		{ID: 1, Name: &quot;Alice&quot;},
		{ID: 2, Name: &quot;Bob&quot;},
		{ID: 3, Name: &quot;Charlie&quot;},
		{ID: 4, Name: &quot;Dave&quot;},
		{ID: 5, Name: &quot;Eve&quot;},
	}

	for _, user := range users {
		if err := repo.Save(context.Background(), user); err != nil {
			fmt.Printf(&quot;error saving user %v: %v\n&quot;, user, err)
		}
	}

	fmt.Printf(&quot;Waiting for flushing... (5 seconds)\n&quot;)
	time.Sleep(5 * time.Second)

	// Save remaining items
	if err := repo.SaveAll(context.Background()); err != nil {
		fmt.Printf(&quot;error saving remaining items: %v\n&quot;, err)
	}

	// Flush remaining items
	if err := repo.Flush(context.Background()); err != nil {
		fmt.Printf(&quot;error flushing remaining items: %v\n&quot;, err)
	}
}

The output of this should be:

Save called Alice: 1
Save called Bob: 2
Save called Charlie: 3
Save called Dave: 4
Save called Eve: 5
Waiting for flushing... (5 seconds)
SaveAll called
Flush called
Program exited.

huangapple
  • 本文由 发表于 2023年5月6日 22:02:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/76189309.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定