MassTransit | .NET 分布式应用框架( 二 )

fanoutMassTransit.Demo.OrderCreatedEventExchange和一个与OrderCreatedEvent同名的队列进行消息传输,如下图所示 。

MassTransit | .NET 分布式应用框架

文章插图
核心概念MassTranist 为了实现消息代理的透明化和应用间消息的高效传输,抽象了以下概念,其中消息流转流程如下图所示:
  1. Message:消息契约,定义了消息生产者和消息消费者之间的契约 。
  2. Producer:生产者,发送消息的一方都可以称为生产者 。
  3. SendEndpoint:发送端点,用于将消息内容序列化,并发送到传输模块 。
  4. Transport:传输模块,消息代理透明化的核心,用于和消息代理通信,负责发送和接收消息 。
  5. ReceiveEndpoint:接收端点,用于从传输模块接收消息,反序列化消息内容,并将消息路由到消费者 。
  6. Consumer:消费者,用于消息消费 。

MassTransit | .NET 分布式应用框架

文章插图
从上图可知,本质上还是发布订阅模式的实现,接下来就核心概念进行详解 。
MessageMessage:消息,可以使用class、interface、struct和record来创建,消息作为一个契约,需确保创建后不能篡改,因此应只保留只读属性且不应包含方法和行为 。MassTransit使用的是包含命名空间的完全限定名即typeof(T).FullName来表示特定的消息类型 。因此若在另外的项目中消费同名的消息类型,需确保消息的命名空间相同 。另外需注意消息不应继承,以避免发送基类消息类型造成的不可预期的结果 。为避免此类情况,官方建议使用接口来定义消息 。在MassTransit中,消息主要分为两种类型:
  1. Command:命令,用于告诉服务做什么,命令被发送到指定端点,仅被一个服务接收并执行 。一般以动名词结构命名,如:UpdateAddress、CancelOrder 。
  2. Event:事件,用于告诉服务什么发生了,事件被发布到多个端点,可以被多个服务消费 。一般以过去式结构命名,如:AddressUpdated,OrderCanceled 。
经过MassTransit发送的消息,会使用信封包装,包含一些附加信息,数据结构举例如下:
{"messageId": "6c600000-873b-00ff-9a8f-08da8da85542","requestId": null,"correlationId": null,"conversationId": "6c600000-873b-00ff-9526-08da8da85544","initiatorId": null,"sourceAddress": "rabbitmq://localhost/THINKPAD_MassTransitDemo_bus_ptoyyyr88cyx9s1gbdpe5kniy1?temporary=true","destinationAddress": "rabbitmq://localhost/MassTransit.Demo:OrderCreatedEvent","responseAddress": null,"faultAddress": null,"messageType": ["urn:message:MassTransit.Demo:OrderCreatedEvent"],"message": {"orderId": "fd8a3598-4c3a-4ec9-bbf9-d5f508e1a0d8"},"expirationTime": null,"sentTime": "2022-09-03T12:32:15.0796943Z","headers": {},"host": {"machineName": "THINKPAD","processName": "MassTransit.Demo","processId": 24684,"assembly": "MassTransit.Demo","assemblyVersion": "1.0.0.0","frameworkVersion": "6.0.5","massTransitVersion": "8.0.6.0","operatingSystemVersion": "Microsoft Windows NT 10.0.19044.0"}}从以上消息实例中可以看出一个包装后的消息包含以下核心属性:
  1. messageId:全局唯一的消息ID
  2. messageType:消息类型
  3. message:消息体,也就是具体的消息实例
  4. sourceAddress:消息来源地址
  5. destinationAddress:消息目标地址
  6. responseAddress:响应地址,在请求响应模式中使用
  7. faultAddress:消息异常发送地址,用于存储异常消费消息
  8. headers:消息头,允许应用自定义扩展信息
  9. correlationId:关联Id,在Saga状态机中会用到,用来关联系列事件
  10. host:宿主,消息来源应用的宿主信息
ProducerProducer,生产者,即用于生产消息 。在MassTransit主要借助以下对象进行命令的发送和事件的发布 。

经验总结扩展阅读