在golang上运行一个消费者和API端口。

huangapple go评论81阅读模式
英文:

Running a consumer and api on port golang

问题

我有一个Go API项目,其中我还运行一个工作程序(RabbitMQ)。我刚刚发现一个问题,我的工作程序和我的HTTP监听和服务不能一起工作。一旦我运行工作程序,API的端口就无法访问。

以下是我的代码示例:

app.go

func (a *App) StartWorker() {

    connection, err := amqp091.Dial(os.Getenv("AMQP_URL"))
    if err != nil {
        panic(err)
    }
    defer connection.Close()

    consumer, err := events.NewConsumer(connection, database.GetDatabase(a.Database))
    if err != nil {
        panic(err)
    }
    consumer.Listen(os.Args[1:])

}

func (a *App) Run(addr string) {
    logs := log.New(os.Stdout, "my-service", log.LstdFlags)

    server := &http.Server{
        Addr:         addr,
        Handler:      a.Router,
        ErrorLog:     logs,
        IdleTimeout:  120 * time.Second, // max time for connections using TCP Keep-Alive
        ReadTimeout:  5 * time.Second,
        WriteTimeout: 10 * time.Second,
    }

    go func() {
        if err := server.ListenAndServe(); err != nil {
            logs.Fatal(err)
        }
    }()

    // trap sigterm or interrupt and gracefully shutdown the server
    c := make(chan os.Signal)
    signal.Notify(c, os.Interrupt)
    signal.Notify(c, os.Kill)

    sig := <-c
    logs.Println("Received terminate, graceful shutdown", sig)
    tc, _ := context.WithTimeout(context.Background(), 30*time.Second)
    server.Shutdown(tc)
}

consumer.go

// NewConsumer返回一个新的Consumer
func NewConsumer(conn *amqp.Connection, db *mongo.Database) (Consumer, error) {
    consumer := Consumer{
        conn: conn,
        db:   db,
    }
    err := consumer.setup()
    if err != nil {
        return Consumer{}, err
    }

    return consumer, nil
}

// Listen将监听所有新的队列发布,并将它们打印到控制台。
func (consumer *Consumer) Listen(topics []string) error {

    ch, err := consumer.conn.Channel()
    if err != nil {
        return err
    }

    defer ch.Close()

    if err != nil {
        return err
    }
    msgs, err := ch.Consume("update.package.rating", "", true, false, false, false, nil)

    if err != nil {
        return err
    }

    forever := make(chan bool)
    go func() {
        for msg := range msgs {
            switch msg.RoutingKey {
            case "update.package.rating":
                worker.RatePackage(packageRepo.NewPackagesRepository(consumer.db), msg.Body)
            }
            // 确认接收到的事件
            log.Printf("Received a message: %s", msg.Body)
        }
    }()

    log.Printf("[*] Waiting for message [Exchange, Queue][%s, %s]. To exit press CTRL+C", getExchangeName(), "update.package.rating")
    <-forever
    return nil
}

main.go

func main() {
    start := app.App{}
    start.StartApp()
    start.StartWorker()
    start.Run(":3006")
}

端口3006无法访问。

我正在使用gin-gonic来处理我的HTTP请求。

欢迎提供任何帮助。

英文:

I have a go api project where I also run a worker (RabbitMQ). I just discovered a problem that my worker and my http listen and serve do not work together. The moment I run the worker, the port of api is not reached.

Here is what my code looks like.

> app.go

func (a *App) StartWorker() {

	connection, err := amqp091.Dial(os.Getenv(&quot;AMQP_URL&quot;))
	if err != nil {
		panic(err)
	}
	defer connection.Close()

	consumer, err := events.NewConsumer(connection, database.GetDatabase(a.Database))
	if err != nil {
		panic(err)
	}
	consumer.Listen(os.Args[1:])

}

func (a *App) Run(addr string) {
	logs := log.New(os.Stdout, &quot;my-service&quot;, log.LstdFlags)

	server := &amp;http.Server{
		Addr:         addr,
		Handler:      a.Router,
		ErrorLog:     logs,
		IdleTimeout:  120 * time.Second, // max time for connections using TCP Keep-Alive
		ReadTimeout:  5 * time.Second,
		WriteTimeout: 10 * time.Second,
	}

	go func() {
		if err := server.ListenAndServe(); err != nil {
			logs.Fatal(err)
		}
	}()

	// trap sigterm or interrupt and gracefully shutdown the server
	c := make(chan os.Signal)
	signal.Notify(c, os.Interrupt)
	signal.Notify(c, os.Kill)

	sig := &lt;-c
	logs.Println(&quot;Recieved terminate, graceful shutdown&quot;, sig)
	tc, _ := context.WithTimeout(context.Background(), 30*time.Second)
	server.Shutdown(tc)
}

here is my

> consumer.go

// NewConsumer returns a new Consumer
func NewConsumer(conn *amqp.Connection, db *mongo.Database) (Consumer, error) {
	consumer := Consumer{
		conn: conn,
		db:   db,
	}
	err := consumer.setup()
	if err != nil {
		return Consumer{}, err
	}

	return consumer, nil
}

// Listen will listen for all new Queue publications
// and print them to the console.
func (consumer *Consumer) Listen(topics []string) error {

	ch, err := consumer.conn.Channel()
	if err != nil {
		return err
	}

	defer ch.Close()

	if err != nil {
		return err
	}
	msgs, err := ch.Consume(&quot;update.package.rating&quot;, &quot;&quot;, true, false, false, false, nil)

	if err != nil {
		return err
	}

	forever := make(chan bool)
	go func() {
		for msg := range msgs {
			switch msg.RoutingKey {
			case &quot;update.package.rating&quot;:
				worker.RatePackage(packageRepo.NewPackagesRepository(consumer.db), msg.Body)
			}
			// acknowledege received event
			log.Printf(&quot;Received a message: %s&quot;, msg.Body)
		}
	}()

	log.Printf(&quot;[*] Waiting for message [Exchange, Queue][%s, %s]. To exit press CTRL+C&quot;, getExchangeName(), &quot;update.package.rating&quot;)
	&lt;-forever
	return nil
}

> main.go

func main() {
	start := app.App{}
	start.StartApp()
	start.StartWorker()
	start.Run(&quot;:3006&quot;)
}

the port 3006 is not reached.

I am using gin-gonic to serve my http request.

Any help is welcomed.

答案1

得分: 0

我在使用gin框架时遇到了类似的问题。通过在go协程中运行我的消费者来解决了这个问题。我像下面这样调用了我的消费者:

go notificationCallback.ConsumeBankTransaction()

现在服务器和RabbitMQ消费者都能够无缝运行。我仍在监控性能,以确定它是否足够强大和可靠。

英文:

I had a similar problem while using gin framework.Solved the issue by running my consumer inside a go routine.I invoked my consumer like below.

go notificationCallback.ConsumeBankTransaction()

and both the server and the rabbitmq consumer run seamlessly.Still monitoring performance to see if it is robust and resilient enough.

huangapple
  • 本文由 发表于 2022年12月28日 00:05:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/74931552.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定