当到达预定时间时,如何触发 API 调用?

huangapple go评论86阅读模式
英文:

How to trigger an Api call when it's the scheduled time

问题

我正在构建一个简单的平台,允许Twitter用户按照自己选择的时间表发布推文。例如,有人可能希望在午夜12:00准确地发布推文到他/她的Twitter账户,但是该平台允许他们在下午4:00之前创建推文,并在预定时间自动发布到他们的时间线上。我对如何实现这一点感到困惑,因为负责此操作的Api端点必须手动调用才能发生此事件。我考虑使用cron作业每分钟运行并检查所有预定的推文,如果到了创建推文的时间,则调用端点,但我觉得这样做很麻烦且容易出错。请问是否有更多的编程方式来实现这一点?

附注:我正在尝试使用golang实现这个功能...

英文:

I'm building a simple platform to allow Twitter users to post to their Twitter accounts on a time schedule of their choosing. So for example someone might want to make a post to his/her Twitter account at exactly 12:00 am midnight, but the platform allows them to create the post by 4:00 pm and automatically post to their timeline when it's the scheduled time. I'm confused as to how to implement this since the Api endpoint responsible for this action has to be called manually for this event to take place, I thought of using cron jobs to run every minute and check for all the scheduled posts, call the endpoint if it's time to create the post, but I feel this is cumbersome and error-prone. Are there any more programmatic ways of implementing this, please?

Ps: I'm trying to do this in golang...

答案1

得分: 2

你可以使用消息队列的发布/订阅模型。这将使你的解决方案更加健壮和分布式。在任何分布式环境中,我总是尽量避免使用sleep。

你的系统将有三个组件:
生产者:负责调度和将消息写入消息队列。
消息队列:你可以使用RabbitMQ(带有延迟消息交换)或ActiveMQ(内置调度功能)。消息队列将负责处理调度问题,因为消息只会在特定延迟时间后传递给消费者。
消费者:消费者将从消息队列中读取数据,并执行相应的操作。

高级设计(HLD)如下所示:
当到达预定时间时,如何触发 API 调用?

你可以使用以下包来处理RabbitMQ:https://pkg.go.dev/github.com/streadway/amqp
以及使用以下包来处理ActiveMQ:https://pkg.go.dev/github.com/go-stomp/stomp

这是一个简单的示例,我使用了AWS ActiveMQ。

package main

import (
	"crypto/tls"
	"flag"
	"fmt"
	"github.com/go-stomp/stomp/v3"
	"os"
	"time"
)

const defaultPort = ":61613"

var serverAddr = flag.String("server", "b-50ad5529-0347-4308-af59-f6265d68d290-1.mq.us-east-1.amazonaws.com:61614", "STOMP server endpoint")
var messageCount = flag.Int("count", 2, "Number of messages to send/receive")
var queueName = flag.String("queue", "/queue/client_test", "Destination queue")
var helpFlag = flag.Bool("help", false, "Print help text")
var stop = make(chan bool)

// 这些是与RabbitMQ一起使用的默认选项
var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
	stomp.ConnOpt.Login("activemquser", "activemqpassword"),
}

func main() {
	flag.Parse()
	if *helpFlag {
		fmt.Fprintf(os.Stderr, "Usage of %s\n", os.Args[0])
		flag.PrintDefaults()
		os.Exit(1)
	}

	subscribed := make(chan bool)
	go recvMessages(subscribed)

	// 等待接收者订阅
	<-subscribed

	go sendMessages()

	<-stop
	<-stop

}

func sendMessages() {
	defer func() {
		stop <- true
	}()

	netConn, err := tls.Dial("tcp", *serverAddr, &tls.Config{})
	if err != nil {
		println("cannot connect to server", err.Error())
	}

	conn, err := stomp.Connect(netConn, options...)
	if err != nil {
		println("cannot connect to server", err.Error())
	}

	for i := 1; i <= *messageCount; i++ {
		text := fmt.Sprintf("Message #%d", i)
		fmt.Println("sending message", text, " ", time.Now())
		// 延迟15秒发送消息
		err = conn.Send(*queueName, "text/plain",
			[]byte(text), stomp.SendOpt.Header("AMQ_SCHEDULED_DELAY", "15000"))
		if err != nil {
			println("failed to send to server", err)
			return
		}
		// 每个消息之间间隔3秒
		time.Sleep(3 * time.Second)
	}
	println("sender finished")
}

func recvMessages(subscribed chan bool) {
	defer func() {
		stop <- true
	}()

	netConn, err := tls.Dial("tcp", *serverAddr, &tls.Config{})
	if err != nil {
		println("cannot connect to server", err.Error())
	}

	conn, err := stomp.Connect(netConn, options...)
	if err != nil {
		println("cannot connect to server", err.Error())
	}

	sub, err := conn.Subscribe(*queueName, stomp.AckAuto)
	if err != nil {
		println("cannot subscribe to", *queueName, err.Error())
		return
	}
	close(subscribed)

	for i := 1; i <= *messageCount; i++ {
		msg := <-sub.C
		expectedText := fmt.Sprintf("Message #%d", i)
		actualText := string(msg.Body)
		fmt.Println("got message", actualText, " ", time.Now())
		if expectedText != actualText {
			println("Expected:", expectedText)
			println("Actual:", actualText)
		}
	}
	println("receiver finished")

}
英文:

You can use a Pub/Consumer model with a message queue. It will make your solution more robust and distributed. I always try not to use sleep in any kind of distributed environment.

Your system will have 3 components: <br>
Producer: This will take care of scheduling and writing messages to a message queue. <br>
Message Queue: You can use RabbitMQ(with delayMessageExchange) or ActiveMQ(it has in-built scheduling). Your message queue will take care of the scheduling problem. Because the message will be delivered only after a specific time delay to the consumer. <br>
Consumer: The consumer will read data from the message queue and perform operations as the functionality

HLD will look like this:
当到达预定时间时,如何触发 API 调用?

You can use https://pkg.go.dev/github.com/streadway/amqp this package for RabbitMQ
and https://pkg.go.dev/github.com/go-stomp/stomp this package for ActiveMQ

Here is an simple example, I have used AWS ActiveMQ.

package main
import (
&quot;crypto/tls&quot;
&quot;flag&quot;
&quot;fmt&quot;
&quot;github.com/go-stomp/stomp/v3&quot;
&quot;os&quot;
&quot;time&quot;
)
const defaultPort = &quot;:61613&quot;
var serverAddr = flag.String(&quot;server&quot;, &quot;b-50ad5529-0347-4308-af59-f6265d68d290-1.mq.us-east-1.amazonaws.com:61614&quot;, &quot;STOMP server endpoint&quot;) 
var messageCount = flag.Int(&quot;count&quot;, 2, &quot;Number of messages to send/receive&quot;)
var queueName = flag.String(&quot;queue&quot;, &quot;/queue/client_test&quot;, &quot;Destination queue&quot;)
var helpFlag = flag.Bool(&quot;help&quot;, false, &quot;Print help text&quot;)
var stop = make(chan bool)
// these are the default options that work with RabbitMQ
var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
stomp.ConnOpt.Login(&quot;activemquser&quot;, &quot;activemqpassword&quot;),
}
func main() {
flag.Parse()
if *helpFlag {
fmt.Fprintf(os.Stderr, &quot;Usage of %s\n&quot;, os.Args[0])
flag.PrintDefaults()
os.Exit(1)
}
subscribed := make(chan bool)
go recvMessages(subscribed)
// wait until we know the receiver has subscribed
&lt;-subscribed
go sendMessages()
&lt;-stop
&lt;-stop
}
func sendMessages() {
defer func() {
stop &lt;- true
}()
netConn, err := tls.Dial(&quot;tcp&quot;, *serverAddr, &amp;tls.Config{})
if err != nil {
println(&quot;cannot connect to server&quot;, err.Error())
}
conn, err := stomp.Connect(netConn, options...)
if err != nil {
println(&quot;cannot connect to server&quot;, err.Error())
}
for i := 1; i &lt;= *messageCount; i++ {
text := fmt.Sprintf(&quot;Message #%d&quot;, i)
fmt.Println(&quot;sending message &quot;, text, &quot; &quot;, time.Now())
// scheduling a message with 15 seconds delay
err = conn.Send(*queueName, &quot;text/plain&quot;,
[]byte(text), stomp.SendOpt.Header(&quot;AMQ_SCHEDULED_DELAY&quot;, &quot;15000&quot;))
if err != nil {
println(&quot;failed to send to server&quot;, err)
return
}
// schedule each message after 3 secs
time.Sleep(3 * time.Second)
}
println(&quot;sender finished&quot;)
}
func recvMessages(subscribed chan bool) {
defer func() {
stop &lt;- true
}()
netConn, err := tls.Dial(&quot;tcp&quot;, *serverAddr, &amp;tls.Config{})
if err != nil {
println(&quot;cannot connect to server&quot;, err.Error())
}
conn, err := stomp.Connect(netConn, options...)
if err != nil {
println(&quot;cannot connect to server&quot;, err.Error())
}
sub, err := conn.Subscribe(*queueName, stomp.AckAuto)
if err != nil {
println(&quot;cannot subscribe to&quot;, *queueName, err.Error())
return
}
close(subscribed)
for i := 1; i &lt;= *messageCount; i++ {
msg := &lt;-sub.C
expectedText := fmt.Sprintf(&quot;Message #%d&quot;, i)
actualText := string(msg.Body)
fmt.Println(&quot;got message&quot;, actualText, &quot; &quot;, time.Now())
if expectedText != actualText {
println(&quot;Expected:&quot;, expectedText)
println(&quot;Actual:&quot;, actualText)
}
}
println(&quot;receiver finished&quot;)
}

huangapple
  • 本文由 发表于 2022年8月1日 22:43:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/73195465.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定