mirror of
https://gitee.com/langsisi_admin/serein-flow
synced 2026-03-03 08:10:47 +08:00
修改了很多
This commit is contained in:
@@ -1,10 +1,13 @@
|
||||
using Serein.Library.Network.WebSocketCommunication.Handle;
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.IO.Compression;
|
||||
using System.IO;
|
||||
using System.Net.WebSockets;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using static System.Net.Mime.MediaTypeNames;
|
||||
|
||||
namespace Serein.Library.Network.WebSocketCommunication
|
||||
{
|
||||
@@ -38,7 +41,6 @@ namespace Serein.Library.Network.WebSocketCommunication
|
||||
{
|
||||
try
|
||||
{
|
||||
|
||||
await _client.ConnectAsync(new Uri(uri), CancellationToken.None);
|
||||
_ = ReceiveAsync();
|
||||
return true;
|
||||
@@ -58,8 +60,12 @@ namespace Serein.Library.Network.WebSocketCommunication
|
||||
/// <returns></returns>
|
||||
public async Task SendAsync(string message)
|
||||
{
|
||||
var buffer = Encoding.UTF8.GetBytes(message);
|
||||
await _client.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);
|
||||
Console.WriteLine("发送消息");
|
||||
await Task.Delay(2000);
|
||||
await SocketExtension.SendAsync(this._client, message); // 回复客户端
|
||||
Console.WriteLine();
|
||||
//var buffer = Encoding.UTF8.GetBytes(message);
|
||||
//await _client.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -68,13 +74,21 @@ namespace Serein.Library.Network.WebSocketCommunication
|
||||
/// <returns></returns>
|
||||
private async Task ReceiveAsync()
|
||||
{
|
||||
var buffer = new byte[1024];
|
||||
|
||||
var msgQueueUtil = new MsgQueueUtil();
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
await HandleMsgAsync(_client, msgQueueUtil);
|
||||
});
|
||||
|
||||
|
||||
var receivedMessage = new StringBuilder(); // 用于拼接长消息
|
||||
|
||||
while (_client.State == WebSocketState.Open)
|
||||
{
|
||||
try
|
||||
{
|
||||
var buffer = new byte[1024];
|
||||
WebSocketReceiveResult result;
|
||||
|
||||
do
|
||||
@@ -86,21 +100,22 @@ namespace Serein.Library.Network.WebSocketCommunication
|
||||
receivedMessage.Append(partialMessage);
|
||||
|
||||
} while (!result.EndOfMessage); // 判断是否已经收到完整消息
|
||||
|
||||
var message = receivedMessage.ToString();
|
||||
msgQueueUtil.WriteMsg(message);
|
||||
receivedMessage.Clear(); // 清空 StringBuilder 为下一条消息做准备
|
||||
// 处理收到的完整消息
|
||||
if (result.MessageType == WebSocketMessageType.Close)
|
||||
{
|
||||
await _client.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None);
|
||||
}
|
||||
else
|
||||
{
|
||||
var completeMessage = receivedMessage.ToString();
|
||||
_ = MsgHandleHelper.HandleMsgAsync(SendAsync, completeMessage); // 处理消息
|
||||
//Debug.WriteLine($"Received: {completeMessage}");
|
||||
}
|
||||
|
||||
// 清空 StringBuilder 为下一条消息做准备
|
||||
receivedMessage.Clear();
|
||||
//else
|
||||
//{
|
||||
// var completeMessage = receivedMessage.ToString();
|
||||
// MsgHandleHelper.HandleMsg(SendAsync, completeMessage); // 处理消息,如果方法入参是需要发送消息委托时,将 SendAsync 作为委托参数提供
|
||||
// //Debug.WriteLine($"Received: {completeMessage}");
|
||||
//}
|
||||
|
||||
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -110,65 +125,86 @@ namespace Serein.Library.Network.WebSocketCommunication
|
||||
}
|
||||
|
||||
|
||||
/* #region 消息处理
|
||||
private readonly string ThemeField;
|
||||
private readonly ConcurrentDictionary<string, HandldConfig> ThemeConfigs = new ConcurrentDictionary<string, HandldConfig>();
|
||||
|
||||
public async Task HandleSocketMsg(string jsonStr)
|
||||
public async Task HandleMsgAsync(WebSocket webSocket,
|
||||
MsgQueueUtil msgQueueUtil)
|
||||
{
|
||||
JObject json;
|
||||
try
|
||||
{
|
||||
json = JObject.Parse(jsonStr);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
await SendAsync(_client, ex.Message);
|
||||
return;
|
||||
}
|
||||
// 获取到消息
|
||||
string themeName = json[ThemeField]?.ToString();
|
||||
if (!ThemeConfigs.TryGetValue(themeName, out var handldConfig))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
object dataValue;
|
||||
if (string.IsNullOrEmpty(handldConfig.DataField))
|
||||
while (true)
|
||||
{
|
||||
dataValue = json.ToObject(handldConfig.DataType);
|
||||
}
|
||||
else
|
||||
{
|
||||
dataValue = json[handldConfig.DataField].ToObject(handldConfig.DataType);
|
||||
}
|
||||
await handldConfig.Invoke(dataValue, SendAsync);
|
||||
}
|
||||
|
||||
public void AddConfig(string themeName, Type dataType, MsgHandler msgHandler)
|
||||
{
|
||||
if (!ThemeConfigs.TryGetValue(themeName, out var handldConfig))
|
||||
{
|
||||
handldConfig = new HandldConfig
|
||||
var message = await msgQueueUtil.WaitMsgAsync(); // 有消息时通知
|
||||
//if (!msgQueueUtil.TryGetMsg(out var message)) // 获取消息
|
||||
//{
|
||||
// return;
|
||||
//}
|
||||
// 消息处理
|
||||
MsgHandleHelper.HandleMsg(async (text) =>
|
||||
{
|
||||
DataField = themeName,
|
||||
DataType = dataType
|
||||
};
|
||||
ThemeConfigs.TryAdd(themeName, handldConfig);
|
||||
await SocketExtension.SendAsync(webSocket, text); // 回复客户端,处理方法中入参如果需要发送消息委托,则将该回调方法作为委托参数传入
|
||||
}, message); // 处理消息
|
||||
|
||||
}
|
||||
handldConfig.HandldAsync += msgHandler;
|
||||
}
|
||||
public void RemoteConfig(string themeName, MsgHandler msgHandler)
|
||||
{
|
||||
if (ThemeConfigs.TryGetValue(themeName, out var handldConfig))
|
||||
|
||||
|
||||
/* #region 消息处理
|
||||
private readonly string ThemeField;
|
||||
private readonly ConcurrentDictionary<string, HandldConfig> ThemeConfigs = new ConcurrentDictionary<string, HandldConfig>();
|
||||
|
||||
public async Task HandleSocketMsg(string jsonStr)
|
||||
{
|
||||
handldConfig.HandldAsync -= msgHandler;
|
||||
if (!handldConfig.HasSubscribers)
|
||||
JObject json;
|
||||
try
|
||||
{
|
||||
ThemeConfigs.TryRemove(themeName, out _);
|
||||
json = JObject.Parse(jsonStr);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
await SendAsync(_client, ex.Message);
|
||||
return;
|
||||
}
|
||||
// 获取到消息
|
||||
string themeName = json[ThemeField]?.ToString();
|
||||
if (!ThemeConfigs.TryGetValue(themeName, out var handldConfig))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
object dataValue;
|
||||
if (string.IsNullOrEmpty(handldConfig.DataField))
|
||||
{
|
||||
dataValue = json.ToObject(handldConfig.DataType);
|
||||
}
|
||||
else
|
||||
{
|
||||
dataValue = json[handldConfig.DataField].ToObject(handldConfig.DataType);
|
||||
}
|
||||
await handldConfig.Invoke(dataValue, SendAsync);
|
||||
}
|
||||
|
||||
public void AddConfig(string themeName, Type dataType, MsgHandler msgHandler)
|
||||
{
|
||||
if (!ThemeConfigs.TryGetValue(themeName, out var handldConfig))
|
||||
{
|
||||
handldConfig = new HandldConfig
|
||||
{
|
||||
DataField = themeName,
|
||||
DataType = dataType
|
||||
};
|
||||
ThemeConfigs.TryAdd(themeName, handldConfig);
|
||||
}
|
||||
handldConfig.HandldAsync += msgHandler;
|
||||
}
|
||||
public void RemoteConfig(string themeName, MsgHandler msgHandler)
|
||||
{
|
||||
if (ThemeConfigs.TryGetValue(themeName, out var handldConfig))
|
||||
{
|
||||
handldConfig.HandldAsync -= msgHandler;
|
||||
if (!handldConfig.HasSubscribers)
|
||||
{
|
||||
ThemeConfigs.TryRemove(themeName, out _);
|
||||
}
|
||||
}
|
||||
}
|
||||
#endregion*/
|
||||
}
|
||||
#endregion*/
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user