关于ASP.NET Core WebSocket实现集群的思考( 八 )


private async Task AddUserGroup(string user, string group, WebSocket webSocket){//获取群组信息var currentGroup = GroupUser.GetOrAdd(group, new HashSet<string>());lock (currentGroup){//把用户标识加入当前组_ = currentGroup.Add(user);}//每个组的redis频道 , 在每台WebSocket服务器实例只注册一次订阅if (currentGroup.Count == 1){//订阅当前组消息await SubGroupMsg($"{groupPrefix}{group}");}string addMsg = $"user 【{user}】 addto group 【{group}】";byte[] sendByte = Encoding.UTF8.GetBytes(addMsg);await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);//如果有用户加入群组 , 则通知其他群成员ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = user, ToId = group, Msg = addMsg };_redisClient.Publish($"{groupPrefix}{group}", JsonConvert.SerializeObject(channelMsgBody));}用户想要在群组内发消息 , 则必须先加入到一个具体的群组内 , 具体的加入群组的格式如下
{"Method":"UserGroup", "Group":"g1"}Method为UserGroup代表着用户加入群组的业务类型 , Group代表着你要加入的群组唯一标识 。接下来就看下 , 用户发送群组消息的逻辑了
private async Task HandleGroup(string groupId, string userId, WebSocket webSocket, object msgBody){//判断群组是否存在var hasValue = https://www.huyubaike.com/biancheng/GroupUser.TryGetValue(groupId, out var users);if (!hasValue){byte[] sendByte = Encoding.UTF8.GetBytes($"group【{groupId}】 not exists");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}//只有加入到当前群组 , 才能在群组内发送消息if (!users.Contains(userId)){byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{userId}】 not in 【{groupId}】");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msgBody}");//发送群组消息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msgBody.ToString() };_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));}加入群组之后则可以发送和接收群组内的消息了 , 给群组发送消息的格式如下
{"Method":"Group", "Group":"g1", "MsgBody":"Hi All"}Method为Group代表着用户加入群组的业务类型 , Group则代表你要发送到具体的群组的唯一标识 , MsgBody则是发送到群组内的消息 。最后再来看下订阅群组内消息的情况 , 也就是处理群组消息的逻辑
private async Task SubGroupMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) =>{//接收群组订阅消息ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");//获取当前服务器实例中当前群组的所有用户连接GroupUser.TryGetValue(msgBody.ToId, out var currentGroup);foreach (var user in currentGroup){if (user == msgBody.FromId){continue;}//通过群组内的用户标识去用户集合获取用户集合里的用户唯一连接发送消息if (UserConnection.TryGetValue(user, out var targetSocket) && targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{currentGroup.Remove(user);}}});_disposables.TryAdd(channel, sub);}全员消息处理全员消息处理相对来说思路比较简单 , 因为当服务启动的时候就会监听redis的全员消息频道 , 这样的话具体的实现也就只包含发送和接收全员消息了 , 首先看一下全员消息发送的逻辑

经验总结扩展阅读