Communication between microservices using MassTransit and RabbitMQ.

huangapple go评论57阅读模式
英文:

Communication between microservices using MassTransit and Rabbitmq

问题

In .Net,我有一个包含2个微服务(API)和一个共享层(Shared Models)的解决方案。我想使用MassTransit和RabbitMQ从一个微服务发送数据到另一个微服务。在实现代码后,逻辑正常运行,消息成功发布,消费者也能正常接收。

现在,我正在尝试更改项目的架构。在这种情况下,我有3个不同的解决方案,每个解决方案都有一个API项目,我将共享的模型放在不同的解决方案中,每个微服务都可以使用程序集来访问它。现在,我在这种情况下遇到的主要问题是消息成功发布,但消费者未能接收。NuGet Packages的版本是相同的。

这是我的Consumer微服务的依赖注入:

public static IServiceCollection AddMessageBroker(this IServiceCollection services, ConfigurationManager configuration)
{
    MessageBrokerSettings messageBrokerSettings = new();
    configuration.Bind(MessageBrokerSettings.SectionName, messageBrokerSettings);
    
    services.AddSingleton(Options.Create(messageBrokerSettings));
    
    services.AddMassTransit(busConfigurator =>
    {
        busConfigurator.AddConsumer<GetCompanyIdsConsumer>();
    
        busConfigurator.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cur =>
        {
            cur.Host(new Uri(messageBrokerSettings.Host), h =>
            {
                h.Username(messageBrokerSettings.UserName);
                h.Password(messageBrokerSettings.Password);
            });
    
            cur.ReceiveEndpoint("companyQueue2", oq =>
            {
                oq.PrefetchCount = 20;
                oq.UseMessageRetry(r => r.Interval(2, 100));
                oq.ConfigureConsumer<GetCompanyIdsConsumer>(provider);
            });
        }));
    });
    
    services.AddTransient<IEventBus, EventBus>();
    
    return services;
}

这是Publisher微服务的依赖注入:

public static IServiceCollection AddMessageBroker(this IServiceCollection services, ConfigurationManager configuration)
{
    MessageBrokerSettings messageBrokerSettings = new();
    configuration.Bind(MessageBrokerSettings.SectionName, messageBrokerSettings);
    
    services.AddSingleton(Options.Create(messageBrokerSettings));
    
    services.AddMassTransit(busConfigurator =>
    {
        busConfigurator.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cur =>
        {
            cur.Host(new Uri(messageBrokerSettings.Host), h =>
            {
                h.Username(messageBrokerSettings.UserName);
                h.Password(messageBrokerSettings.Password);
            });
        }));
    
        services.AddMassTransitHostedService();
    });
    
    services.AddTransient<IEventBus, EventBus>();
    
    return services;
}

Consumer类:

public class GetCompanyIdsConsumer : IConsumer<Message>
{
    public async Task Consume(ConsumeContext<Message> context)
    {
        await Task.Run(() => { var obj = context.Message; });
    }
}

Publisher方法:

await _eventBus.PublishAsync(message, uri, cancellationToken);

是否有任何建议?每个API都放在不同的解决方案中是否是正确的方法?

我想使用MassTransit和Rabbit MQ在微服务之间发送消息,对于共享解决方案,它有效,但对于不同的解决方案却无效。

Note: I've provided the translated code and relevant information as per your request. If you have further questions or need assistance with specific issues, please feel free to ask.

英文:

In .Net I had a solution with 2 microservice (API) and one shared layer (Shared Models). I wanted to send some data from one microservice to another using MassTransit and RabbitMQ. After implementing the code the logic worked, the message was published successfully and the consumer revoked.

Now I am trying to change a architecture of my project. In this case I have 3 different solutions each solution has an API project also I took my shared models in different solution and each microservice can reach it using assemblies. Now the main problem I have in this situation the message is published successfully but the consumer is not revoked. The versions of NuGet Packages are same.

This is my Dependency Injection for Consumer Microservice:

public static IServiceCollection AddMessageBroker(this IServiceCollection services, ConfigurationManager configuration)
{
MessageBrokerSettings messageBrokerSettings = new();

            configuration.Bind(MessageBrokerSettings.SectionName, messageBrokerSettings);
    
            services.AddSingleton(Options.Create(messageBrokerSettings));
    
            services.AddMassTransit(busConfigurator =&gt;
            {
                busConfigurator.AddConsumer&lt;GetCompanyIdsConsumer&gt;();
    
                busConfigurator.AddBus(provider =&gt; Bus.Factory.CreateUsingRabbitMq(cur =&gt;
                {
                    cur.Host(new Uri(messageBrokerSettings.Host), h =&gt;
                    {
                        h.Username(messageBrokerSettings.UserName);
                        h.Password(messageBrokerSettings.Password);
                    });
    
                    cur.ReceiveEndpoint(&quot;companyQueue2&quot;, oq =&gt;
                    {
                        oq.PrefetchCount = 20;
                        oq.UseMessageRetry(r =&gt; r.Interval(2, 100));
                        oq.ConfigureConsumer&lt;GetCompanyIdsConsumer&gt;(provider);
                    });
                }));
            });
    
            services.AddTransient&lt;IEventBus, EventBus&gt;();
    
            return services;
        }

And this is Dependency Injection for publisher microservice:

public static IServiceCollection AddMessageBroker(this IServiceCollection services, ConfigurationManager configuration)
{
MessageBrokerSettings messageBrokerSettings = new();

            configuration.Bind(MessageBrokerSettings.SectionName, messageBrokerSettings);
    
            services.AddSingleton(Options.Create(messageBrokerSettings));
    
            services.AddMassTransit(busConfigurator =&gt;
            {
    
                busConfigurator.AddBus(provider =&gt; Bus.Factory.CreateUsingRabbitMq(cur =&gt;
                {
                    cur.Host(new Uri(messageBrokerSettings.Host), h =&gt;
                    {
                        h.Username(messageBrokerSettings.UserName);
                        h.Password(messageBrokerSettings.Password);
                    });
                }));
    
                services.AddMassTransitHostedService();
    
            });
    
            services.AddTransient&lt;IEventBus, EventBus&gt;();
    
            return services;
        }

Consumer Class:

    public class GetCompanyIdsConsumer : IConsumer&lt;Message&gt;
    {
        public async Task Consume(ConsumeContext&lt;Message&gt; context)
        {
            await Task.Run(() =&gt; { var obj = context.Message; });
        }
    }

Publisher method:
await _eventBus.PublishAsync(message, uri, cancellationToken);

Do you have any suggestions? Is it a right approach to have each API in different solutions?

I wanted to send messages between microservices using MassTransit and Rabbit MQ for shared solution it worked but for different solutions it is not working.

答案1

得分: 0

消息必须具有相同的命名空间和类型名称,正如文档中非常清楚地解释的那样

英文:

As explained very clearly in the documentation the messages must have the same namespace and type name.

huangapple
  • 本文由 发表于 2023年5月14日 01:25:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/76244059.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定