英文:
How to trigger an Api call when it's the scheduled time
问题
我正在构建一个简单的平台,允许Twitter用户按照自己选择的时间表发布推文。例如,有人可能希望在午夜12:00准确地发布推文到他/她的Twitter账户,但是该平台允许他们在下午4:00之前创建推文,并在预定时间自动发布到他们的时间线上。我对如何实现这一点感到困惑,因为负责此操作的Api端点必须手动调用才能发生此事件。我考虑使用cron作业每分钟运行并检查所有预定的推文,如果到了创建推文的时间,则调用端点,但我觉得这样做很麻烦且容易出错。请问是否有更多的编程方式来实现这一点?
附注:我正在尝试使用golang实现这个功能...
英文:
I'm building a simple platform to allow Twitter users to post to their Twitter accounts on a time schedule of their choosing. So for example someone might want to make a post to his/her Twitter account at exactly 12:00 am midnight, but the platform allows them to create the post by 4:00 pm and automatically post to their timeline when it's the scheduled time. I'm confused as to how to implement this since the Api endpoint responsible for this action has to be called manually for this event to take place, I thought of using cron jobs to run every minute and check for all the scheduled posts, call the endpoint if it's time to create the post, but I feel this is cumbersome and error-prone. Are there any more programmatic ways of implementing this, please?
Ps: I'm trying to do this in golang...
答案1
得分: 2
你可以使用消息队列的发布/订阅模型。这将使你的解决方案更加健壮和分布式。在任何分布式环境中,我总是尽量避免使用sleep。
你的系统将有三个组件:
生产者:负责调度和将消息写入消息队列。
消息队列:你可以使用RabbitMQ(带有延迟消息交换)或ActiveMQ(内置调度功能)。消息队列将负责处理调度问题,因为消息只会在特定延迟时间后传递给消费者。
消费者:消费者将从消息队列中读取数据,并执行相应的操作。
你可以使用以下包来处理RabbitMQ:https://pkg.go.dev/github.com/streadway/amqp
以及使用以下包来处理ActiveMQ:https://pkg.go.dev/github.com/go-stomp/stomp
这是一个简单的示例,我使用了AWS ActiveMQ。
package main
import (
"crypto/tls"
"flag"
"fmt"
"github.com/go-stomp/stomp/v3"
"os"
"time"
)
const defaultPort = ":61613"
var serverAddr = flag.String("server", "b-50ad5529-0347-4308-af59-f6265d68d290-1.mq.us-east-1.amazonaws.com:61614", "STOMP server endpoint")
var messageCount = flag.Int("count", 2, "Number of messages to send/receive")
var queueName = flag.String("queue", "/queue/client_test", "Destination queue")
var helpFlag = flag.Bool("help", false, "Print help text")
var stop = make(chan bool)
// 这些是与RabbitMQ一起使用的默认选项
var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
stomp.ConnOpt.Login("activemquser", "activemqpassword"),
}
func main() {
flag.Parse()
if *helpFlag {
fmt.Fprintf(os.Stderr, "Usage of %s\n", os.Args[0])
flag.PrintDefaults()
os.Exit(1)
}
subscribed := make(chan bool)
go recvMessages(subscribed)
// 等待接收者订阅
<-subscribed
go sendMessages()
<-stop
<-stop
}
func sendMessages() {
defer func() {
stop <- true
}()
netConn, err := tls.Dial("tcp", *serverAddr, &tls.Config{})
if err != nil {
println("cannot connect to server", err.Error())
}
conn, err := stomp.Connect(netConn, options...)
if err != nil {
println("cannot connect to server", err.Error())
}
for i := 1; i <= *messageCount; i++ {
text := fmt.Sprintf("Message #%d", i)
fmt.Println("sending message", text, " ", time.Now())
// 延迟15秒发送消息
err = conn.Send(*queueName, "text/plain",
[]byte(text), stomp.SendOpt.Header("AMQ_SCHEDULED_DELAY", "15000"))
if err != nil {
println("failed to send to server", err)
return
}
// 每个消息之间间隔3秒
time.Sleep(3 * time.Second)
}
println("sender finished")
}
func recvMessages(subscribed chan bool) {
defer func() {
stop <- true
}()
netConn, err := tls.Dial("tcp", *serverAddr, &tls.Config{})
if err != nil {
println("cannot connect to server", err.Error())
}
conn, err := stomp.Connect(netConn, options...)
if err != nil {
println("cannot connect to server", err.Error())
}
sub, err := conn.Subscribe(*queueName, stomp.AckAuto)
if err != nil {
println("cannot subscribe to", *queueName, err.Error())
return
}
close(subscribed)
for i := 1; i <= *messageCount; i++ {
msg := <-sub.C
expectedText := fmt.Sprintf("Message #%d", i)
actualText := string(msg.Body)
fmt.Println("got message", actualText, " ", time.Now())
if expectedText != actualText {
println("Expected:", expectedText)
println("Actual:", actualText)
}
}
println("receiver finished")
}
英文:
You can use a Pub/Consumer model with a message queue. It will make your solution more robust and distributed. I always try not to use sleep in any kind of distributed environment.
Your system will have 3 components: <br>
Producer: This will take care of scheduling and writing messages to a message queue. <br>
Message Queue: You can use RabbitMQ(with delayMessageExchange) or ActiveMQ(it has in-built scheduling). Your message queue will take care of the scheduling problem. Because the message will be delivered only after a specific time delay to the consumer. <br>
Consumer: The consumer will read data from the message queue and perform operations as the functionality
You can use https://pkg.go.dev/github.com/streadway/amqp this package for RabbitMQ
and https://pkg.go.dev/github.com/go-stomp/stomp this package for ActiveMQ
Here is an simple example, I have used AWS ActiveMQ.
package main
import (
"crypto/tls"
"flag"
"fmt"
"github.com/go-stomp/stomp/v3"
"os"
"time"
)
const defaultPort = ":61613"
var serverAddr = flag.String("server", "b-50ad5529-0347-4308-af59-f6265d68d290-1.mq.us-east-1.amazonaws.com:61614", "STOMP server endpoint")
var messageCount = flag.Int("count", 2, "Number of messages to send/receive")
var queueName = flag.String("queue", "/queue/client_test", "Destination queue")
var helpFlag = flag.Bool("help", false, "Print help text")
var stop = make(chan bool)
// these are the default options that work with RabbitMQ
var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
stomp.ConnOpt.Login("activemquser", "activemqpassword"),
}
func main() {
flag.Parse()
if *helpFlag {
fmt.Fprintf(os.Stderr, "Usage of %s\n", os.Args[0])
flag.PrintDefaults()
os.Exit(1)
}
subscribed := make(chan bool)
go recvMessages(subscribed)
// wait until we know the receiver has subscribed
<-subscribed
go sendMessages()
<-stop
<-stop
}
func sendMessages() {
defer func() {
stop <- true
}()
netConn, err := tls.Dial("tcp", *serverAddr, &tls.Config{})
if err != nil {
println("cannot connect to server", err.Error())
}
conn, err := stomp.Connect(netConn, options...)
if err != nil {
println("cannot connect to server", err.Error())
}
for i := 1; i <= *messageCount; i++ {
text := fmt.Sprintf("Message #%d", i)
fmt.Println("sending message ", text, " ", time.Now())
// scheduling a message with 15 seconds delay
err = conn.Send(*queueName, "text/plain",
[]byte(text), stomp.SendOpt.Header("AMQ_SCHEDULED_DELAY", "15000"))
if err != nil {
println("failed to send to server", err)
return
}
// schedule each message after 3 secs
time.Sleep(3 * time.Second)
}
println("sender finished")
}
func recvMessages(subscribed chan bool) {
defer func() {
stop <- true
}()
netConn, err := tls.Dial("tcp", *serverAddr, &tls.Config{})
if err != nil {
println("cannot connect to server", err.Error())
}
conn, err := stomp.Connect(netConn, options...)
if err != nil {
println("cannot connect to server", err.Error())
}
sub, err := conn.Subscribe(*queueName, stomp.AckAuto)
if err != nil {
println("cannot subscribe to", *queueName, err.Error())
return
}
close(subscribed)
for i := 1; i <= *messageCount; i++ {
msg := <-sub.C
expectedText := fmt.Sprintf("Message #%d", i)
actualText := string(msg.Body)
fmt.Println("got message", actualText, " ", time.Now())
if expectedText != actualText {
println("Expected:", expectedText)
println("Actual:", actualText)
}
}
println("receiver finished")
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论