优化了节点连接

This commit is contained in:
fengjiayi
2024-10-28 15:21:08 +08:00
parent f20cfb755c
commit 561b6d764f
28 changed files with 295 additions and 165 deletions

View File

@@ -123,7 +123,10 @@ namespace Serein.Library.Network.WebSocketCommunication.Handle
catch (Exception ex)
{
Console.WriteLine($"error in ws : {ex.Message}{Environment.NewLine}json value:{jsonObject}");
return;
}
finally
{
context.Handle = true;
}
}

View File

@@ -11,7 +11,7 @@ namespace Serein.Library.Network.WebSocketCommunication.Handle
/// <summary>
/// 消息处理上下文
/// </summary>
public class WebSocketMsgContext : IDisposable
public class WebSocketMsgContext /*: IDisposable*/
{
public WebSocketMsgContext(Func<string, Task> sendAsync)
{
@@ -31,7 +31,14 @@ namespace Serein.Library.Network.WebSocketCommunication.Handle
/// <summary>
/// 标记是否已经处理,如果是,则提前退出
/// </summary>
public bool Handle { get; set; }
public bool Handle { get => _handle; set{
if(value)
{
Dispose();
_handle = value;
}
} }
public bool _handle = false;
/// <summary>
/// 消息本体JObject

View File

@@ -201,13 +201,18 @@ namespace Serein.Library.Network.WebSocketCommunication.Handle
/// </summary>
/// <param name="context">此次请求的上下文</param>
/// <returns></returns>
public async Task HandleAsync(WebSocketMsgContext context)
public void Handle(WebSocketMsgContext context)
{
foreach (var module in MyHandleModuleDict.Values)
{
await module.HandleAsync(context);
if (context.Handle)
{
return;
}
_ = module.HandleAsync(context);
}
}

View File

@@ -11,23 +11,25 @@ using System.Threading.Tasks;
namespace Serein.Library.Network.WebSocketCommunication
{
public class MsgQueueUtil
/// <summary>
/// 消息处理工具
/// </summary>
public class MsgHandleUtil
{
public ConcurrentQueue<string> Msgs = new ConcurrentQueue<string>();
private readonly Channel<string> _msgChannel;
public MsgQueueUtil()
{
_msgChannel = CreateChannel();
}
private Channel<string> CreateChannel()
/// <summary>
/// 初始化优先容器
/// </summary>
/// <param name="capacity"></param>
public MsgHandleUtil(int capacity = 100)
{
return Channel.CreateBounded<string>(new BoundedChannelOptions(100)
_msgChannel = Channel.CreateBounded<string>(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
});
}
/// <summary>
/// 等待消息
@@ -35,27 +37,45 @@ namespace Serein.Library.Network.WebSocketCommunication
/// <returns></returns>
public async Task<string> WaitMsgAsync()
{
var state = await _msgChannel.Reader.ReadAsync();
return state;
}
public void WriteMsg(string msg)
{
//Msgs.Enqueue(msg);
Console.WriteLine($"{DateTime.Now}{msg}{Environment.NewLine}");
_ = _msgChannel.Writer.WriteAsync(msg);
// 检查是否可以读取消息
if (await _msgChannel.Reader.WaitToReadAsync())
{
return await _msgChannel.Reader.ReadAsync();
}
return null; // 若通道关闭则返回null
}
public bool TryGetMsg(out string msg)
/// <summary>
/// 写入消息
/// </summary>
/// <param name="msg">消息内容</param>
/// <returns>是否写入成功</returns>
public async Task<bool> WriteMsgAsync(string msg)
{
return Msgs.TryDequeue(out msg);
try
{
await _msgChannel.Writer.WriteAsync(msg);
return true;
}
catch (ChannelClosedException)
{
// Channel 已关闭
return false;
}
}
/// <summary>
/// 尝试关闭通道,停止写入消息
/// </summary>
public void CloseChannel()
{
_msgChannel.Writer.Complete();
}
}
public class SocketExtension
{
/// <summary>

View File

@@ -76,7 +76,7 @@ namespace Serein.Library.Network.WebSocketCommunication
private async Task ReceiveAsync()
{
var msgQueueUtil = new MsgQueueUtil();
var msgQueueUtil = new MsgHandleUtil();
_ = Task.Run(async () =>
{
await HandleMsgAsync(_client, msgQueueUtil);
@@ -102,7 +102,7 @@ namespace Serein.Library.Network.WebSocketCommunication
} while (!result.EndOfMessage); // 判断是否已经收到完整消息
var message = receivedMessage.ToString();
msgQueueUtil.WriteMsg(message);
await msgQueueUtil.WriteMsgAsync(message);
receivedMessage.Clear(); // 清空 StringBuilder 为下一条消息做准备
// 处理收到的完整消息
if (result.MessageType == WebSocketMessageType.Close)
@@ -126,8 +126,7 @@ namespace Serein.Library.Network.WebSocketCommunication
}
public async Task HandleMsgAsync(WebSocket webSocket,
MsgQueueUtil msgQueueUtil)
public async Task HandleMsgAsync(WebSocket webSocket, MsgHandleUtil msgQueueUtil)
{
async Task sendasync(string text)
{
@@ -136,11 +135,15 @@ namespace Serein.Library.Network.WebSocketCommunication
while (true)
{
var message = await msgQueueUtil.WaitMsgAsync(); // 有消息时通知
using (var context = new WebSocketMsgContext(sendasync))
{
context.JsonObject = JObject.Parse(message);
await MsgHandleHelper.HandleAsync(context); // 处理消息
}
var context = new WebSocketMsgContext(sendasync);
context.JsonObject = JObject.Parse(message);
MsgHandleHelper.Handle(context); // 处理消息
//using (var context = new WebSocketMsgContext(sendasync))
//{
// context.JsonObject = JObject.Parse(message);
// await MsgHandleHelper.HandleAsync(context); // 处理消息
//}
//_ = Task.Run(() => {
// JObject json = JObject.Parse(message);

View File

@@ -180,7 +180,7 @@ namespace Serein.Library.Network.WebSocketCommunication
return;
}
var msgQueueUtil = new MsgQueueUtil();
var msgQueueUtil = new MsgHandleUtil();
_ = Task.Run(async () =>
{
await HandleMsgAsync(webSocket,msgQueueUtil, authorizedHelper);
@@ -219,7 +219,7 @@ namespace Serein.Library.Network.WebSocketCommunication
// 完整消息已经接收到,准备处理
var message = receivedMessage.ToString(); // 获取消息文本
receivedMessage.Clear(); // 清空 StringBuilder 为下一条消息做准备
msgQueueUtil.WriteMsg(message); // 处理消息
await msgQueueUtil.WriteMsgAsync(message); // 处理消息
}
catch (Exception ex)
{
@@ -231,7 +231,7 @@ namespace Serein.Library.Network.WebSocketCommunication
public async Task HandleMsgAsync(WebSocket webSocket,
MsgQueueUtil msgQueueUtil,
MsgHandleUtil msgQueueUtil,
WebSocketAuthorizedHelper authorizedHelper)
{
async Task sendasync(string text)
@@ -254,18 +254,21 @@ namespace Serein.Library.Network.WebSocketCommunication
return;
}
}
var context = new WebSocketMsgContext(sendasync);
context.JsonObject = JObject.Parse(message);
MsgHandleHelper.Handle(context); // 处理消息
using (var context = new WebSocketMsgContext(sendasync))
{
context.JsonObject = JObject.Parse(message);
await MsgHandleHelper.HandleAsync(context); // 处理消息
}
//using (var context = new WebSocketMsgContext(sendasync))
//{
// context.JsonObject = JObject.Parse(message);
// await MsgHandleHelper.Handle(context); // 处理消息
//}
//_ = Task.Run(() => {
//});
//});
}
}