MassTransit | .NET 分布式应用框架( 三 )


MassTransit | .NET 分布式应用框架

文章插图
从以上类图可以看出,消息的发送主要核心依赖于两个接口:
  1. ISendEndpoint:提供了Send方法,用于发送命令 。
  2. IPublishEndpoint:提供了Publish方法,用于发布事件 。
但基于上图的继承体系,可以看出通过IBusISendEndpointProviderConsumeContext进行命令的发送;通过IBusIPublishEndpointProvider进行事件的发布 。具体举例如下:
发送命令
  1. 通过IBus发送:
private readonly IBus _bus;public async Task Post(CreateOrderRequest request){//通过以下方式配置对应消息类型的目标地址EndpointConvention.Map<CreateOrderRequest>(new Uri("queue:create-order"));await _bus.Send(request);}
  1. 通过ISendEndpointProvider发送:
private readonly ISendEndpointProvider_sendEndpointProvider;public async Task Post(CreateOrderRequest request){var serviceAddress = new Uri("queue:create-order");var endpoint = await _sendEndpointProvider.GetSendEndpoint(serviceAddress);await endpoint.Send(request);}
  1. 通过ConsumeContext发送:
public class CreateOrderRequestConsumer:IConsumer<CreateOrderRequest>{public async Task Consume(ConsumeContext<CreateOrderRequest> context){//do something elsevar destinationAddress = new Uri("queue:lock-stock");var command = new LockStockRequest(context.Message.OrderId);await context.Send<LockStockRequest>(destinationAddress, command);// 也可以通过获取`SendEndpoint`发送命令// var endpoint = await context.GetSendEndpoint(destinationAddress);// await endpoint.Send<LockStockRequest>(command);}}发布事件
  1. 通过IBus发布:
private readonly IBus _bus;public async Task Post(CreateOrderRequest request){//do somethingawait _bus.Send(request);}
  1. 通过IPublishEndpoint发布:
private readonly IPublishEndpoint _publishEndpoint;public async Task Post(CreateOrderRequest request){//do somethingvar order = CreateOrder(request);await _publishEndpoint.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));}
  1. 通过ConsumeContext发布:
public class CreateOrderRequestConsumer: IConsumer<CreateOrderRequest>{public async Task Consume(ConsumeContext<CreateOrderRequest> context){、var order = CreateOrder(conext.Message);await context.Publish<OrderCreatedEvent>(new OrderCreateEvent(order.Id));}}ConsumerConsumer,消费者,即用于消费消息 。MassTransit 包括多种消费者类型,主要分为无状态和有状态两种消费者类型 。
无状态消费者无状态消费者,即消费者无状态,消息消费完毕,消费者就释放 。主要的消费者类型有:IConsumer<TMessage>JobConsumerIActivityRoutingSlip等 。其中IConsumer<TMessage>已经在上面的快速体验部分举例说明 。而JobConsumer<TMessage>主要是对IConsumer<TMessage>的补充,其主要应用场景在于执行耗时任务 。而对于IActivityRoutingSlip则是MassTransit Courier的核心对象,主要用于实现Saga模式的分布式事务 。MassTransit Courier 实现了Routing Slip模式,通过按需有序组合一系列的Activity,得到一个用来限定消息处理顺序的Routing Slip 。而每个Activity的具体抽象就是IActivityIExecuteActivity 。二者的差别在于IActivity定义了ExecuteCompensate两个方法,而IExecuteActivitiy仅定义了

经验总结扩展阅读