关于ASP.NET Core WebSocket实现集群的思考( 七 )

这里涉及到了ChannelData类是用于接收客户端消息的类模板 , 具体定义如下
public class ChannelData{//消息类型 比如一对一 群组 全员public string Method { get; set; }//群组标识public string Group { get; set; }//消息体public object MsgBody { get; set; }}类中并不会包含当前用户信息 , 因为连接到当前服务的时候已经提供了客户端唯一标识 。结合上面的处理代码我们可以看出 , 客户端用户连接到WebSocket实例之后 , 先注册当前用户的redis订阅频道并且当前实例仅注册一次全员消息的redis频道 , 用于处理非当前实例注册客户端的一对一消息处理和全员消息处理 , 然后等待接收客户端消息 , 根据客户端消息的消息类型来判断是进行一对一、群组、或者全员的消息类型处理 , 它的工作流程入下图所示

关于ASP.NET Core WebSocket实现集群的思考

文章插图
由代码和上面的流程图可知 , 它根据不同的标识去处理不同类型的消息 , 接下来我们可以看下每种消息类型的处理方式 。
一对一处理【关于ASP.NET Core WebSocket实现集群的思考】首先是一对一的消息处理情况 , 看一下具体的处理逻辑 , 首先是一对一发布消息
private async Task HandleOne(string id, object msg, WebSocketReceiveResult receiveResult) {MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(JsonConvert.SerializeObject(msg));byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");_logger.LogInformation($"user {id} send:{msgBody.Msg}");//判断目标用户是否在当前WebSocket服务器if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果不在当前服务器 , 则直接把消息发布到具体的用户频道去 , 由具体用户去订阅ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));}}接下来是用于处理订阅其他用户发送过来消息的逻辑 , 这个和整合之前的逻辑是一致的 , 在当前服务器中找到用户对应的连接 , 发送消息
private async Task SubMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) =>{ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ = UserConnection.TryRemove(msgBody.FromId, out _);}}});//把订阅实例加入集合_disposables.TryAdd(channel, sub);}如果给某个用户发送消息则可以使用如下的消息格式
{"Method":"One", "MsgBody":{"Id":"2","Msg":"Hello"}}Method为One代表着是私聊一对一的情况 , 消息体内Id为要发送给的具体用户标识和消息体 。
群组处理接下来看群组处理方式 , 这个和之前的逻辑是有出入的 , 首先是用户要先加入到某个群组然后才能接收群组消息或者在群组中发送消息 , 之前是一个用户对应多个连接 , 整合了之后集群中每个用户只关联唯一的一个WebSocket连接 , 首先看用户加入群组的逻辑

经验总结扩展阅读