
文章插图
从以上类图可以看出,消息的发送主要核心依赖于两个接口:
- ISendEndpoint:提供了- Send方法,用于发送命令 。
- IPublishEndpoint:提供了- Publish方法,用于发布事件 。
IBus、ISendEndpointProvider和ConsumeContext进行命令的发送;通过IBus和IPublishEndpointProvider进行事件的发布 。具体举例如下:发送命令
- 通过IBus发送:
private readonly IBus _bus;public async Task Post(CreateOrderRequest request){//通过以下方式配置对应消息类型的目标地址EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order"));await _bus.Send(request);}- 通过ISendEndpointProvider发送:
private readonly ISendEndpointProvider_sendEndpointProvider;public async Task Post(CreateOrderRequest request){var serviceAddress = new Uri("queue:create-order");var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress);await endpoint.Send(request);}- 通过ConsumeContext发送:
public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest>{public async Task Consume(ConsumeContext<CreateOrderRequest> context){//do something elsevar destinationAddress = new Uri("queue:lock-stock");var command = new LockStockRequest(context.Message.OrderId);await context.Send<LockStockRequest>(destinationAddress, command);// 也可以通过获取`SendEndpoint`发送命令// var endpoint = await context.GetSendEndpoint(destinationAddress);// await endpoint.Send<LockStockRequest>(command);}}发布事件- 通过IBus发布:
private readonly IBus _bus;public async Task Post(CreateOrderRequest request){//do somethingawait _bus.Send(request);}- 通过IPublishEndpoint发布:
private readonly IPublishEndpoint _publishEndpoint;public async Task Post(CreateOrderRequest request){//do somethingvar order = CreateOrder(request);await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));}- 通过ConsumeContext发布:
public class CreateOrderRequestConsumer: IConsumer<CreateOrderRequest>{public async Task Consume(ConsumeContext<CreateOrderRequest> context){、var order = CreateOrder(conext.Message);await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));}}ConsumerConsumer,消费者,即用于消费消息 。MassTransit 包括多种消费者类型,主要分为无状态和有状态两种消费者类型 。无状态消费者无状态消费者,即消费者无状态,消息消费完毕,消费者就释放 。主要的消费者类型有:
IConsumer<TMessage>、JobConsumer、IActivity和RoutingSlip等 。其中IConsumer<TMessage>已经在上面的快速体验部分举例说明 。而JobConsumer<TMessage>主要是对IConsumer<TMessage>的补充,其主要应用场景在于执行耗时任务 。而对于IActivity和RoutingSlip则是MassTransit Courier的核心对象,主要用于实现Saga模式的分布式事务 。MassTransit Courier 实现了Routing Slip模式,通过按需有序组合一系列的Activity,得到一个用来限定消息处理顺序的Routing Slip 。而每个Activity的具体抽象就是IActivity和IExecuteActivity 。二者的差别在于IActivity定义了Execute和Compensate两个方法,而IExecuteActivitiy仅定义了
		  	经验总结扩展阅读
- 4 .NET 6学习笔记——如何在.NET 6的Desktop App中使用Windows Runtime API
- 学习ASP.NET Core Blazor编程系列八——数据校验
- 【.NET 6】RabbitMQ延迟消费指南
- 简读《ASP.NET Core技术内幕与项目实战》之3:配置
- .net core-利用PdfSharpCore和SkiaSharp.QrCode 添加PDF二维码页眉
- 云原生分布式 PostgreSQL+Citus 集群在 Sentry 后端的实践
- .net core -利用 BsonDocumentProjectionDefinition 和Lookup 进行 join 关联 MongoDB 查询
- 微服务系列之分布式日志 ELK
- .net lambda表达式合并
- .NET Core C#系列之XiaoFeng.Threading.JobScheduler作业调度

 
   
   
   
   
   
   
   
   
   
   
   
  