文章插图
发送所有人发送给所有用户的逻辑比较简单 , 不用考虑到用户限制 , 只要用户连接到了WebSocket集群则都可以接收到这个消息 , 大致工作方式如下图所示

文章插图
这个比较简单 , 咱们直接看实现代码 , 首先是定义一个地址 , 用于发布消息
//把用户注册进去[HttpGet("/chat/all/{id}")]public async Task ChatAll(string id){if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"all user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();await _socketHandler.HandleAll(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}
具体的实现逻辑还是在HandleGroup类里 , 是HandleAll方法 , 看一下具体实现public class WebSocketHandler:IDisposable{private readonly UserConnection AllConnection = new();private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly string all = "all";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleAll(string id, WebSocket webSocket){await _lock.WaitAsync();//把用户加入用户集合_ = AllConnection.GetOrAdd(id, webSocket);//WebSocket集群中的每个服务只定义一次if (AllConnection.Count == 1){await SubAllMsg(all);}_lock.Release();var buffer = new byte[1024 * 4];//阻塞接收信息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');_logger.LogInformation($"user {id} send:{msg}");//获取接收信息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msg };//把消息通过redis发布到集群中的其他服务_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//用户退出则删除集合中的当前用户信息_ = AllConnection.TryRemove(id, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);}private async Task SubAllMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) => {ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");//接收到消息后遍历用户集合把消息发送给所有用户foreach (var user in AllConnection){//如果包含当前用户跳过if (user.Key == msgBody.FromId){continue;}if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});_disposables.TryAdd(channel, sub);}}
效果在这里就不展示了 , 和群组的效果是类似的 , 只是一个是部分用户 , 一个是全部的用户 。整合到一起上面我们分别展示了一对一、群组、所有人的场景 , 但是实际使用的时候 , 每个用户只需要注册到WebSocket集群一次也就是保持一个连接即可 , 而不是一对一一个连接、注册群组一个连接、所有消息的时候一个连接 。所以我们需要把上面的演示整合一下 , 一个用户只需要连接到WebSocket集群一次即可 , 至于发送给谁 , 加入什么群组 , 接收全部消息等都是连接后通过一些标识区分的 , 而不必每个类型的操作都注册一次 , 就和微信和QQ一样我只要登录了即可 , 至于其他操作都是靠数据标识区分的 。接下来咱们就整合一下代码达到这个效果 , 大致的思路是
经验总结扩展阅读
- 关于童年回忆的美词美句 描写童年句子唯美
- 中元节不能洗澡吗
- 关于晚睡的文案短句 无奈熬夜的说说心情
- 关于熬夜的俏皮话 适合熬夜发朋友圈的句子伤感
- 关于入门深度学习mnist数据集前向计算的记录
- 2022年11月11日购物节是黄道吉日吗
- 梅西淘宝直播在哪里看几点开始
- 《ASP.NET Core技术内幕与项目实战》精简集-目录
- 华为开发者大会HDC2022:HMS Core 持续创新,与开发者共创美好数智生活
- 上 学习ASP.NET Core Blazor编程系列十——路由