How to start & stop heartbeat per session using context.WithCancel?

huangapple go评论107阅读模式
英文:

How to start & stop heartbeat per session using context.WithCancel?

问题

我目前正在为TypeDB实现Golang客户端,并且在他们基于会话的心跳约定方面遇到了困难。通常,你会为每个客户端实现心跳,所以相对容易,只需在后台运行一个goroutine,并每隔几秒发送一个心跳。

然而,TypeDB选择在每个会话上实现心跳(他们称之为pulse)。这意味着,每次创建新会话时,我都必须使用单独的GoRoutine开始监视该会话。相反,如果客户端关闭会话,我必须停止监视。特别丑陋的是,我还必须定期检查是否有停滞的会话。有一个GitHub问题建议切换到每个客户端的心跳,但没有预计完成时间,所以我必须使会话心跳正常工作,以防止服务端会话终止。

到目前为止,我的解决方案是:

  1. 创建一个新的会话
  2. 打开该会话并检查错误
  3. 如果没有错误,将会话添加到以会话ID为键的哈希映射中

目前这个解决方案似乎有效。代码只是为了上下文而在这里:

https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session.go

对于监视每个会话,我正在考虑两个问题:

1)在多个goroutine上关闭通道有点棘手,可能会导致竞争条件。

2)我需要一种错误组来捕获心跳失败,例如服务器关闭或网络链接错误。

考虑到所有这些,我认为context.WithCancel可能是一个安全且合理的解决方案。

到目前为止,我想到的是:

  1. 将全局上下文作为参数传递给心跳函数
  2. 为每个调用心跳的会话创建一个新的带有取消功能的上下文
  3. 在一个GoRoutine中运行心跳,直到取消被调用(通过stopMonitoring)或发生错误

对我来说不太清楚的是,如何跟踪每个跟踪会话返回的取消函数,以确保关闭与会话匹配的正确GoRoutine?

谢谢你给出解决方案的任何提示。

代码:

func (s SessionManager) startMonitorSession(sessionID []byte) {
	// 如何跟踪每个会话的goroutine
}

func (s SessionManager) stopMonitorSession(sessionID []byte) {
	// 如何调用正确的取消函数来停止与会话匹配的GoRoutine?
}

func (s SessionManager) runHeartbeat(ctx context.Context, sessionID []byte) context.CancelFunc {
	// 创建一个新的上下文,使用原始上下文的取消函数
	ctx, cancel := context.WithCancel(ctx)
	go func() {
		select {
		case <-ctx.Done():
			fmt.Println("Stopped monitoring session:")
		default:
			err := s.sendPulseRequest(sessionID)
			// 如果此操作返回错误
			// 使用上面创建的本地上下文取消所有操作
			if err != nil {
				cancel()
			}
			fmt.Println("done")
		}
	}()

	// 返回取消函数以便在以后的阶段关闭调用站点
	return cancel
}

func (s SessionManager) sendPulseRequest(sessionID []byte) error {
	mtd := "sendPulse: "

	req := requests.GetSessionPulseReq(sessionID)
	res, pulseErr := s.client.client.SessionPulse(s.client.ctx, req)
	if pulseErr != nil {
		dbgPrint(mtd, "Heartbeat error. Close session")
		return pulseErr
	}
	if res.Alive == false {
		dbgPrint(mtd, "Server not alive anymore. Close session")
		closeErr := s.CloseSession(sessionID)
		if closeErr != nil {
			return closeErr
		}
	}

	// 没有错误
	return nil
}

更新:

感谢评论,我成功解决了大部分问题,通过将会话和CancelFunc封装在一个专用的结构体TypeDBSession中。这样,停止函数只需从结构体中获取CancelFunc,调用它并停止监视的GoRoutine。经过一些调整,测试似乎通过了,尽管目前这还不是并发安全的。

话虽如此,这是一个不容易解决的问题。再次感谢评论!

如果有人愿意提出一些代码改进的建议,特别是关于如何使其并发安全,请随时在这里发表评论或提交GitHub问题/PR。

SessionType:

https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session_type.go

SessionMonitoring:

https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session_monitor.go

测试:

https://github.com/marvin-hansen/typedb-client-go/tree/main/test/client/session

英文:

I'm implementing currently the Golang client for TypeDB and struggle with their session based heartbeat convention. Usually, you implement heartbeat per client so that's relatively easy, just run a gorountine in the background and send a heartbeat every few seconds.

TypeDB, however, chose to implement heartbeat (they call it pulse) on a per session base. which means, every time a new session gets created, I have to start monitoring that session with a separate GoRoutine. Conversely, if the client closes a session, I have to stop the monitoring. What's particularly ugly, I also have to check for stalled session every once in a while. There is is GH issue to switch over to per client heartbeat, but no ETA so I have to make session heartbeat work to prevent serve side session termination.

So far, my solution:

  1. Create a new session
  2. Open that session & check for error
  3. If no error, add session to a hashmap keyed by session ID

This seems to work for now. Code, just for context is here:

https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session.go

For monitoring each session, I am mulling over two issues:

  1. Chanel close over multiple gorountines is a bit tricky and may lead to race conditions.

  2. I would need some kind of error group to catch heartbeat failures i.e. in case the server shuts down or a network link error.

With all that in mind, I believe a context.WithCancel might be safe & sane solution.

What I came up so far is this:

  1. Pass the global context as parameter to the heartbeat function
  2. Create a new context WithCancel for each session calling heartbeat
  3. Run heartbeat in a GoRoutine until either cancel gets called (by stopMonitoring) or or error occurs

What's not so clear to me is, how do I track all the cancel functions returned from each tracked session as to ensure I am closing the right GoRotuine matching the session to close ?

Thank you for any hint to solve this.

The code:


func (s SessionManager) startMonitorSession(sessionID []byte) {
	// How do I track each goRoutine per session

}

func (s SessionManager) stopMonitorSession(sessionID []byte) {
	// How do I call the correct cancel function to stop the GoRoutine matching the session?
}

func (s SessionManager) runHeartbeat(ctx context.Context, sessionID []byte) context.CancelFunc {

	// Create a new context, with its cancellation function from the original context
	ctx, cancel := context.WithCancel(ctx)
	go func() {
		select {
		case &lt;-ctx.Done():
			fmt.Println(&quot;Stopped monitoring session: &quot;)
		default:
			err := s.sendPulseRequest(sessionID)
			// If this operation returns an error
			// cancel all operations using this local context created above
			if err != nil {
				cancel()
			}
			fmt.Println(&quot;done&quot;)
		}
	}()

	// return cancel function for call site to close at a later stage
	return cancel
}

func (s SessionManager) sendPulseRequest(sessionID []byte) error {
	mtd := &quot;sendPulse: &quot;

	req := requests.GetSessionPulseReq(sessionID)
	res, pulseErr := s.client.client.SessionPulse(s.client.ctx, req)
	if pulseErr != nil {
		dbgPrint(mtd, &quot;Heartbeat error. Close session&quot;)
		return pulseErr
	}
	if res.Alive == false {
		dbgPrint(mtd, &quot;Server not alive anymore. Close session&quot;)
		closeErr := s.CloseSession(sessionID)
		if closeErr != nil {
			return closeErr
		}
	}

	// no error
	return nil
}

Update:

Thanks to the comment(s) I managed to solve the bulk of the issue by wrapping session & CancelFunc in a dedicated struct, called TypeDBSession.

That way, the stop function simply pulls the CancelFunc from the struct, calls it, and stops the monitoring GoRoutine. With some more tweaking, tests seems to pass although this is not concurrency safe for the time being.

That being said, this was a non-trivial issue to solve. Again, but thanks to the comments!

If any one is open to suggesting some code improvements especially w.r.t to make this concurrency safe, feel free to comment here or fill a GH issue / PR.

SessionType:

https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session_type.go

SessionMonitoring:

https://github.com/marvin-hansen/typedb-client-go/blob/main/src/client/v2/manager_session_monitor.go

Tests:

https://github.com/marvin-hansen/typedb-client-go/tree/main/test/client/session

答案1

得分: 1

我的建议:

  1. 你可能需要重复运行心跳。使用一个带有 time.Ticker 的 for 循环来包裹 select 语句。
  2. 存储一个映射关系,将会话 ID 映射到 func(),以跟踪所有可取消的上下文。也许你应该将 ID 转换为字符串。
英文:

My two cents:

  1. You may need run the hearbeat repeatedly. Use a for with a time.Ticker around the select
  2. Store a map session id —> func() to track all cancellable context. Perhaps you should convert the id to string

huangapple
  • 本文由 发表于 2022年4月7日 11:35:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/71776040.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定