同时使用多个AWS地区与MassTransit.AmazonSqs

huangapple go评论149阅读模式
英文:

Use multiple AWS regions with MassTransit.AmazonSqs simultaneously

问题

我可以配置MassTransit与Amazon SQS以同时使用多个AWS区域吗?例如,在欧洲爱尔兰设置主题,但订阅该主题的队列将位于美国东部。

有关我的应用程序架构的其他信息以帮助理解动机:
我有一个分布式系统,跨足AWS区域的数百个微服务(假设欧洲爱尔兰和美国东部)。我在这些区域都设置了VPC和Kubernetes集群,我计划将应用程序扩展到将来的其他区域。该应用程序本身使用CQRS模式与事件源,其中“command”实例位于欧洲爱尔兰区域。我们称之为“中央”。
微服务需要通过事件进行通信。目前,这些事件由RabbitMQ发送,也由MassTransit进行编排 - 我希望将传输从RabbitMQ更换为Amazon SQS/SNS,但保留MassTransit中的主题、队列和事件编排。
应用程序架构的背后思想是调用与“命令”相关的端点,例如,在“中央”实例上创建或更新实体,然后将创建/更新事件发布到事件总线,在“中央”以及所有其他区域(目前仅限美国东部)中进行消耗。
我有数百种消息类型,在这些集群中有数十个k8s命名空间。重要的是,如果添加新区域,我不能将现有区域停机进行维护,以向已运行并使用SQS进行通信的那些服务添加新的配置值。

我应该如何解决这个问题?

我尝试在“中央”设置主题,并手动绑定来自美国东部区域的队列,但未能使其完全在MassTransit中正常工作 - 尽管我收到了消费者的消息,但由于某种原因它们被跳过了(我在日志中看到“SKIP”事件)。

发布者设置:

(代码部分,不翻译)

消费者设置:

(代码部分,不翻译)

消费者日志(已编辑名称空间):
同时使用多个AWS地区与MassTransit.AmazonSqs

提前感谢!

英文:

Can I configure MassTransit with Amazon SQS to use multiple AWS regions at the same time? E.g. to set up a topic in region eu-west-1, but the queue which subscribes to this topic would be in us-east-1.

Additional information regarding the architecture of my application to help understand the motive:
I have a distributed system containing hundreds of microservices across AWS regions (let's say eu-west-1 and us-east-1). I have a VPC and a Kubernetes cluster set up in both of these regions, and I'm looking to expand the application to other regions in the future. The app itself uses the CQRS pattern with event sourcing, where the "command" instances are located in the eu-west-1 region. Let's call this "central".
The microservices need to communicate with events. These events are right now sent by RabbitMQ, also orchestrated by MassTransit - I'd like to exchange the transport from RabbitMQ to Amazon SQS/SNS, but keep the topic, queue and event orchestration in MassTransit.
The idea behind the architecture of the application was to call "command" related endpoints, e.g. creating or updating an entity, on the "central" instance, which then publish a create/update event to the event bus, which is consumed in both the "central", and also all other regions (currently only us-east-1).
I have several hundreds of message types, tens of k8s namespaces in these clusters. It is important, that in case of a new region is added, I can't take the existing ones down for maintenance, to add new configuration values to those services already running and using SQS to communicate.

How should I approach this issue?

I tried to set up the topics in "central", and bind the queues from the us-east-1 region manually, but had no luck to make it fully work with MassTransit - although I'm getting the messages in the consumers, but they're for some reason skipped (I see the "SKIP" event in the logs).

Publisher setup:

public class Program
    {
        public static async Task Main(string[] args)
        {
            await CreateHostBuilder(args).Build().RunAsync();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((_, services) =>
                {
                    var ns = "namespace";
                    services.AddMassTransit(x =>
                    {
                        x.SetEndpointNameFormatter(new FifoEndpointNameFormatter(new KebabCaseEndpointNameFormatter(ns, false)));
                        x.UsingAmazonSqs((context, cfg) =>
                        {
                            cfg.MessageTopology.SetEntityNameFormatter(new FifoEntityNameFormatter());
                            cfg.PublishTopology.TopicAttributes[QueueAttributeName.ContentBasedDeduplication] = "true";

                            cfg.Host("eu-west-1", configurator =>
                            {
                                configurator.AccessKey("*****");
                                configurator.SecretKey("*****");
                                configurator.Scope(ns, true);
                            });
                            cfg.Publish<Bye>();
                            cfg.Publish<Hello>();
                            cfg.UsePublishFilter(typeof(PublishFilter<>), context);
                        });
                    });
                    services.AddHostedService<Worker>();
                });
    }

Consumer setup:

public class Program
    {
        public static async Task Main(string[] args)
        {
            await CreateHostBuilder(args).Build().RunAsync();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((_, services) =>
                {
                    var ns = "namespace";
                    services.AddMassTransit(x =>
                    {
                        x.SetEndpointNameFormatter(
                            new FifoEndpointNameFormatter(new KebabCaseEndpointNameFormatter(ns, false)));

                        x.AddConsumer<HelloConsumer>();
                        x.AddConsumer<ByeConsumer>();

                        var qnf = new QueueNameFormatter(new FifoEntityNameFormatter());
                        qnf.Namespace = ns;
                        x.UsingAmazonSqs((context, cfg) =>
                        {
                            cfg.UseConsumeFilter(typeof(ConsumeFilter<>), context);
                            cfg.MessageTopology.SetEntityNameFormatter(new FifoEntityNameFormatter());
                            cfg.Host("us-east-1", configurator =>
                            {
                                configurator.AccessKey("*****");
                                configurator.SecretKey("*****");
                                configurator.Scope(ns, true);
                            });

                            cfg.ReceiveEndpoint("namespace_Hello.fifo", configurator =>
                            {
                                configurator.ConfigureConsumeTopology = false;
                                
                                configurator.ConfigureConsumer<HelloConsumer>(context);
                                configurator.QueueAttributes[QueueAttributeName.FifoQueue] = true;
                                configurator.QueueAttributes[QueueAttributeName.ContentBasedDeduplication] = true;
                                configurator.DiscardSkippedMessages();
                                configurator.DiscardFaultedMessages();
                                configurator.PublishFaults = false;
                            });

                            cfg.ReceiveEndpoint("namespace_Bye.fifo", configurator =>
                            {
                                configurator.ConfigureConsumeTopology = false;
                                
                                configurator.ConfigureConsumer<ByeConsumer>(context);
                                configurator.QueueAttributes[QueueAttributeName.FifoQueue] = true;
                                configurator.QueueAttributes[QueueAttributeName.ContentBasedDeduplication] = true;
                                configurator.DiscardSkippedMessages();
                                configurator.DiscardFaultedMessages();
                                configurator.PublishFaults = false;
                            });
                        });
                    });
                });
    }

Consumer logs (redacted namespaces):
同时使用多个AWS地区与MassTransit.AmazonSqs

Thanks in advance!

答案1

得分: 1

你无法这样做,因为在给定的总线内无法指定区域外的地址。如果您自己创建了SNS订阅,您可以使发布的消息发送到其他地区,但这不是MassTransit会为您执行的操作。

英文:

You cannot, as there is no way to specify out-of-region addresses within a given bus. If you created the SNS subscriptions yourself, you could make it so that published messages would go to the other region, but that isn't something MassTransit is going to do for you.

huangapple
  • 本文由 发表于 2023年7月3日 20:51:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/76604913.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定