关于ASP.NET Core WebSocket实现集群的思考( 三 )

这里涉及到几个辅助相关的类 , 其中UserConnection类是存储注册到当前服务的连接 , MsgBody类用来接受客户端发送过来的消息 , ChannelMsgBody是用来发送redis频道的相关消息 , 因为要把相关消息通过redis发布出去 , 咱们列一下这几个类的相关代码
//注册到当前服务的连接public class UserConnection : IEnumerable<KeyValuePair<string, WebSocket>>{//存储用户唯一标识和WebSocket的对应关系private ConcurrentDictionary<string, WebSocket> _users = new ConcurrentDictionary<string, WebSocket>();//当前服务的用户数量public int Count => _users.Count;public WebSocket GetOrAdd(string userId, WebSocket webSocket){return _users.GetOrAdd(userId, webSocket);}public bool TryGetValue(string userId, out WebSocket webSocket){return _users.TryGetValue(userId, out webSocket);}public bool TryRemove(string userId, out WebSocket webSocket){return _users.TryRemove(userId, out webSocket);}public void Clear(){_users.Clear();}public IEnumerator<KeyValuePair<string, WebSocket>> GetEnumerator(){return _users.GetEnumerator();}IEnumerator IEnumerable.GetEnumerator(){return this.GetEnumerator();}}//客户端消息public class MsgBody{//目标用户标识public string Id { get; set; }//要发送的消息public string Msg { get; set; }}//频道订阅消息public class ChannelMsgBody{//用户标识public string FromId { get; set; }//目标用户标识 , 也就是要发送给谁public string ToId { get; set; }//要发送的消息public string Msg { get; set; }}这样的话关于一对一发送消息的相关逻辑就实现完成了 , 启动两个Server端 , 由于nginx默认的负载均衡策略是轮训 , 所以注册两个用户的话会被分发到不同的服务里去

关于ASP.NET Core WebSocket实现集群的思考

文章插图
关于ASP.NET Core WebSocket实现集群的思考

文章插图
Postman连接三个连接唯一标识分别是1、2、3 , 模拟一下消息发送 , 效果如下,发送效果
关于ASP.NET Core WebSocket实现集群的思考

文章插图
接收效果
关于ASP.NET Core WebSocket实现集群的思考

文章插图
群组发送上面我们展示了一对一发送的情况 , 接下来我们来看一下 , 群组发送的情况 。群组发送的话就是只要大家都加入一个群组 , 只要客户端在群组里发送一条消息 , 则注册到当前群组内的所有客户端都可以收到消息 。相对于一对一的情况就是如果当前WebSocket服务端如果存在用户加入某个群组 , 则当前当前WebSocket服务端则可以订阅一个group:群组唯一标识的redis频道 , 集群中的其他WebSocket服务器通过这个redis频道接收群组消息 , 通过一张图描述一下
关于ASP.NET Core WebSocket实现集群的思考

文章插图
群组的实现方式相对于一对一要简单一点
  • 发送端可以不用考虑当前服务中的客户端连接 , 一股脑的交给redis把消息发布出去
  • 如果有WebSocket服务中的用户订阅了当前分组则可以接受消息 , 获取组内的用户循环发送消息
展示一下代码实现的方式,首先是定义一个action用于表示群组的相关场景
//包含两个标识一个是组别标识一个是注册到组别的用户[HttpGet("/chat/group/{groupId}/{userId}")]public async Task ChatGroup(string groupId, string userId){if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"group:{groupId} user:{userId}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();//调用HandleGroup处理群组相关的消息await _socketHandler.HandleGroup(groupId, userId, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}

经验总结扩展阅读