![MassTransit | .NET 分布式应用框架](http://shimg.jingyanzongjie.com/230726/12462911a-3.jpg)
文章插图
从以上类图可以看出,消息的发送主要核心依赖于两个接口:
ISendEndpoint
:提供了Send
方法,用于发送命令 。IPublishEndpoint
:提供了Publish
方法,用于发布事件 。
IBus
、ISendEndpointProvider
和ConsumeContext
进行命令的发送;通过IBus
和IPublishEndpointProvider
进行事件的发布 。具体举例如下:发送命令
- 通过
IBus
发送:
private readonly IBus _bus;public async Task Post(CreateOrderRequest request){//通过以下方式配置对应消息类型的目标地址EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order"));await _bus.Send(request);}
- 通过
ISendEndpointProvider
发送:
private readonly ISendEndpointProvider_sendEndpointProvider;public async Task Post(CreateOrderRequest request){var serviceAddress = new Uri("queue:create-order");var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress);await endpoint.Send(request);}
- 通过
ConsumeContext
发送:
public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest>{public async Task Consume(ConsumeContext<CreateOrderRequest> context){//do something elsevar destinationAddress = new Uri("queue:lock-stock");var command = new LockStockRequest(context.Message.OrderId);await context.Send<LockStockRequest>(destinationAddress, command);// 也可以通过获取`SendEndpoint`发送命令// var endpoint = await context.GetSendEndpoint(destinationAddress);// await endpoint.Send<LockStockRequest>(command);}}
发布事件- 通过
IBus
发布:
private readonly IBus _bus;public async Task Post(CreateOrderRequest request){//do somethingawait _bus.Send(request);}
- 通过
IPublishEndpoint
发布:
private readonly IPublishEndpoint _publishEndpoint;public async Task Post(CreateOrderRequest request){//do somethingvar order = CreateOrder(request);await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));}
- 通过
ConsumeContext
发布:
public class CreateOrderRequestConsumer: IConsumer<CreateOrderRequest>{public async Task Consume(ConsumeContext<CreateOrderRequest> context){、var order = CreateOrder(conext.Message);await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));}}
ConsumerConsumer,消费者,即用于消费消息 。MassTransit 包括多种消费者类型,主要分为无状态和有状态两种消费者类型 。无状态消费者无状态消费者,即消费者无状态,消息消费完毕,消费者就释放 。主要的消费者类型有:
IConsumer<TMessage>
、JobConsumer
、IActivity
和RoutingSlip
等 。其中IConsumer<TMessage>
已经在上面的快速体验
部分举例说明 。而JobConsumer<TMessage>
主要是对IConsumer<TMessage>
的补充,其主要应用场景在于执行耗时任务 。而对于IActivity
和RoutingSlip
则是MassTransit Courier
的核心对象,主要用于实现Saga模式的分布式事务 。MassTransit Courier 实现了Routing Slip模式,通过按需有序组合一系列的Activity,得到一个用来限定消息处理顺序的Routing Slip 。而每个Activity的具体抽象就是IActivity
和IExecuteActivity
。二者的差别在于IActivity
定义了Execute
和Compensate
两个方法,而IExecuteActivitiy
仅定义了
经验总结扩展阅读
- 4 .NET 6学习笔记——如何在.NET 6的Desktop App中使用Windows Runtime API
- 学习ASP.NET Core Blazor编程系列八——数据校验
- 【.NET 6】RabbitMQ延迟消费指南
- 简读《ASP.NET Core技术内幕与项目实战》之3:配置
- .net core-利用PdfSharpCore和SkiaSharp.QrCode 添加PDF二维码页眉
- 云原生分布式 PostgreSQL+Citus 集群在 Sentry 后端的实践
- .net core -利用 BsonDocumentProjectionDefinition 和Lookup 进行 join 关联 MongoDB 查询
- 微服务系列之分布式日志 ELK
- .net lambda表达式合并
- .NET Core C#系列之XiaoFeng.Threading.JobScheduler作业调度