英文:
Running a consumer and api on port golang
问题
我有一个Go API项目,其中我还运行一个工作程序(RabbitMQ)。我刚刚发现一个问题,我的工作程序和我的HTTP监听和服务不能一起工作。一旦我运行工作程序,API的端口就无法访问。
以下是我的代码示例:
app.go
func (a *App) StartWorker() {
connection, err := amqp091.Dial(os.Getenv("AMQP_URL"))
if err != nil {
panic(err)
}
defer connection.Close()
consumer, err := events.NewConsumer(connection, database.GetDatabase(a.Database))
if err != nil {
panic(err)
}
consumer.Listen(os.Args[1:])
}
func (a *App) Run(addr string) {
logs := log.New(os.Stdout, "my-service", log.LstdFlags)
server := &http.Server{
Addr: addr,
Handler: a.Router,
ErrorLog: logs,
IdleTimeout: 120 * time.Second, // max time for connections using TCP Keep-Alive
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
}
go func() {
if err := server.ListenAndServe(); err != nil {
logs.Fatal(err)
}
}()
// trap sigterm or interrupt and gracefully shutdown the server
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
signal.Notify(c, os.Kill)
sig := <-c
logs.Println("Received terminate, graceful shutdown", sig)
tc, _ := context.WithTimeout(context.Background(), 30*time.Second)
server.Shutdown(tc)
}
consumer.go
// NewConsumer返回一个新的Consumer
func NewConsumer(conn *amqp.Connection, db *mongo.Database) (Consumer, error) {
consumer := Consumer{
conn: conn,
db: db,
}
err := consumer.setup()
if err != nil {
return Consumer{}, err
}
return consumer, nil
}
// Listen将监听所有新的队列发布,并将它们打印到控制台。
func (consumer *Consumer) Listen(topics []string) error {
ch, err := consumer.conn.Channel()
if err != nil {
return err
}
defer ch.Close()
if err != nil {
return err
}
msgs, err := ch.Consume("update.package.rating", "", true, false, false, false, nil)
if err != nil {
return err
}
forever := make(chan bool)
go func() {
for msg := range msgs {
switch msg.RoutingKey {
case "update.package.rating":
worker.RatePackage(packageRepo.NewPackagesRepository(consumer.db), msg.Body)
}
// 确认接收到的事件
log.Printf("Received a message: %s", msg.Body)
}
}()
log.Printf("[*] Waiting for message [Exchange, Queue][%s, %s]. To exit press CTRL+C", getExchangeName(), "update.package.rating")
<-forever
return nil
}
main.go
func main() {
start := app.App{}
start.StartApp()
start.StartWorker()
start.Run(":3006")
}
端口3006无法访问。
我正在使用gin-gonic来处理我的HTTP请求。
欢迎提供任何帮助。
英文:
I have a go api project where I also run a worker (RabbitMQ). I just discovered a problem that my worker and my http listen and serve do not work together. The moment I run the worker, the port of api is not reached.
Here is what my code looks like.
> app.go
func (a *App) StartWorker() {
connection, err := amqp091.Dial(os.Getenv("AMQP_URL"))
if err != nil {
panic(err)
}
defer connection.Close()
consumer, err := events.NewConsumer(connection, database.GetDatabase(a.Database))
if err != nil {
panic(err)
}
consumer.Listen(os.Args[1:])
}
func (a *App) Run(addr string) {
logs := log.New(os.Stdout, "my-service", log.LstdFlags)
server := &http.Server{
Addr: addr,
Handler: a.Router,
ErrorLog: logs,
IdleTimeout: 120 * time.Second, // max time for connections using TCP Keep-Alive
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
}
go func() {
if err := server.ListenAndServe(); err != nil {
logs.Fatal(err)
}
}()
// trap sigterm or interrupt and gracefully shutdown the server
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)
signal.Notify(c, os.Kill)
sig := <-c
logs.Println("Recieved terminate, graceful shutdown", sig)
tc, _ := context.WithTimeout(context.Background(), 30*time.Second)
server.Shutdown(tc)
}
here is my
> consumer.go
// NewConsumer returns a new Consumer
func NewConsumer(conn *amqp.Connection, db *mongo.Database) (Consumer, error) {
consumer := Consumer{
conn: conn,
db: db,
}
err := consumer.setup()
if err != nil {
return Consumer{}, err
}
return consumer, nil
}
// Listen will listen for all new Queue publications
// and print them to the console.
func (consumer *Consumer) Listen(topics []string) error {
ch, err := consumer.conn.Channel()
if err != nil {
return err
}
defer ch.Close()
if err != nil {
return err
}
msgs, err := ch.Consume("update.package.rating", "", true, false, false, false, nil)
if err != nil {
return err
}
forever := make(chan bool)
go func() {
for msg := range msgs {
switch msg.RoutingKey {
case "update.package.rating":
worker.RatePackage(packageRepo.NewPackagesRepository(consumer.db), msg.Body)
}
// acknowledege received event
log.Printf("Received a message: %s", msg.Body)
}
}()
log.Printf("[*] Waiting for message [Exchange, Queue][%s, %s]. To exit press CTRL+C", getExchangeName(), "update.package.rating")
<-forever
return nil
}
> main.go
func main() {
start := app.App{}
start.StartApp()
start.StartWorker()
start.Run(":3006")
}
the port 3006 is not reached.
I am using gin-gonic to serve my http request.
Any help is welcomed.
答案1
得分: 0
我在使用gin框架时遇到了类似的问题。通过在go协程中运行我的消费者来解决了这个问题。我像下面这样调用了我的消费者:
go notificationCallback.ConsumeBankTransaction()
现在服务器和RabbitMQ消费者都能够无缝运行。我仍在监控性能,以确定它是否足够强大和可靠。
英文:
I had a similar problem while using gin framework.Solved the issue by running my consumer inside a go routine.I invoked my consumer like below.
go notificationCallback.ConsumeBankTransaction()
and both the server and the rabbitmq consumer run seamlessly.Still monitoring performance to see if it is robust and resilient enough.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论