using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Threading.Channels;
namespace Serein.Proto.Modbus
{
///
/// Modbus TCP 客户端
///
public class ModbusTcpClient : IModbusClient
{
///
/// 消息发送时触发的事件
///
public Action OnTx { get; set; }
///
/// 接收到消息时触发的事件
///
public Action OnRx { get; set; }
///
/// 消息通道
///
private readonly Channel _channel = Channel.CreateUnbounded();
///
/// TCP客户端
///
private readonly TcpClient _tcpClient;
///
/// TCP客户端的网络流,用于发送和接收数据
///
private readonly NetworkStream _stream;
///
/// 存储未完成请求的字典,键为事务ID,值为任务完成源
///
private readonly ConcurrentDictionary> _pendingRequests = new();
///
/// 事务ID计数器,用于生成唯一的事务ID
///
private int _transactionId = 0;
#pragma warning disable CS8618 // 在退出构造函数时,不可为 null 的字段必须包含非 null 值。请考虑添加 "required" 修饰符或声明为可为 null。
public ModbusTcpClient(string host, int port = 502)
#pragma warning restore CS8618 // 在退出构造函数时,不可为 null 的字段必须包含非 null 值。请考虑添加 "required" 修饰符或声明为可为 null。
{
_tcpClient = new TcpClient();
_tcpClient.Connect(host, port);
_stream = _tcpClient.GetStream();
_ = ProcessQueueAsync(); // 启动后台消费者
_ = ReceiveLoopAsync(); // 启动接收响应线程
}
#region 功能码封装
///
/// 读取线圈状态
///
///
///
///
public async Task ReadCoils(ushort startAddress, ushort quantity)
{
var pdu = BuildReadPdu(startAddress, quantity);
var responsePdu = await SendAsync(ModbusFunctionCode.ReadCoils, pdu);
return ParseDiscreteBits(responsePdu, quantity);
}
///
/// 读取离散输入状态
///
///
///
///
public async Task ReadDiscreteInputs(ushort startAddress, ushort quantity)
{
var pdu = BuildReadPdu(startAddress, quantity);
var responsePdu = await SendAsync(ModbusFunctionCode.ReadDiscreteInputs, pdu);
return ParseDiscreteBits(responsePdu, quantity);
}
///
/// 读取保持寄存器
///
///
///
///
public async Task ReadHoldingRegisters(ushort startAddress, ushort quantity)
{
var pdu = BuildReadPdu(startAddress, quantity);
var responsePdu = await SendAsync(ModbusFunctionCode.ReadHoldingRegisters, pdu);
return ParseRegisters(responsePdu, quantity);
}
///
/// 读取输入寄存器
///
///
///
///
public async Task ReadInputRegisters(ushort startAddress, ushort quantity)
{
var pdu = BuildReadPdu(startAddress, quantity);
var responsePdu = await SendAsync(ModbusFunctionCode.ReadInputRegisters, pdu);
return ParseRegisters(responsePdu, quantity);
}
///
/// 写单个线圈
///
///
///
///
public async Task WriteSingleCoil(ushort address, bool value)
{
var pdu = new byte[]
{
(byte)(address >> 8), // 地址高字节
(byte)(address & 0xFF), // 地址低字节
value ? (byte)0xFF : (byte)0x00, // 线圈值高字节,高电平为0xFF,低电平为00
0x00 // 线圈值低字节
};
await SendAsync(ModbusFunctionCode.WriteSingleCoil, pdu);
}
///
/// 写单个寄存器
///
///
///
///
public async Task WriteSingleRegister(ushort address, ushort value)
{
var pdu = new byte[]
{
(byte)(address >> 8), // 地址高字节
(byte)(address & 0xFF), // 地址低字节
(byte)(value >> 8), // 寄存器值高字节
(byte)(value & 0xFF) // 寄存器值低字节
};
await SendAsync(ModbusFunctionCode.WriteSingleRegister, pdu);
}
///
/// 写多个线圈
///
///
///
///
public async Task WriteMultipleCoils(ushort startAddress, bool[] values)
{
int byteCount = (values.Length + 7) / 8; // 计算需要的字节数
byte[] data = new byte[byteCount];
for (int i = 0; i < values.Length; i++)
{
if (values[i])
data[i / 8] |= (byte)(1 << (i % 8)); // 设置对应位
}
var pdu = new List
{
(byte)(startAddress >> 8), // 地址高字节
(byte)(startAddress & 0xFF), // 地址低字节
(byte)(values.Length >> 8), // 数量高字节
(byte)(values.Length & 0xFF), // 数量低字节
(byte)data.Length // 字节数
};
pdu.AddRange(data);
await SendAsync(ModbusFunctionCode.WriteMultipleCoils, pdu.ToArray());
}
///
/// 写多个寄存器
///
///
///
///
public async Task WriteMultipleRegisters(ushort startAddress, ushort[] values)
{
var pdu = new List
{
(byte)(startAddress >> 8), // 地址高字节
(byte)(startAddress & 0xFF), // 地址低字节
(byte)(values.Length >> 8), // 数量高字节
(byte)(values.Length & 0xFF), // 数量低字节
(byte)(values.Length * 2) // 字节数
};
foreach (var val in values)
{
pdu.Add((byte)(val >> 8)); // 寄存器值高字节
pdu.Add((byte)(val & 0xFF)); // 寄存器值低字节
}
await SendAsync(ModbusFunctionCode.WriteMultipleRegister, pdu.ToArray());
}
///
/// 构建读取PDU数据
///
///
///
///
private byte[] BuildReadPdu(ushort startAddress, ushort quantity)
{
byte[] buffer = new byte[4];
BinaryPrimitives.WriteUInt16BigEndian(buffer.AsSpan(0, 2), startAddress); // 起始地址高低字节
BinaryPrimitives.WriteUInt16BigEndian(buffer.AsSpan(2, 2), quantity); // 读取数量高低字节
return buffer;
}
///
/// 解析离散位数据
///
///
///
///
private bool[] ParseDiscreteBits(byte[] pdu, ushort count)
{
int byteCount = pdu[2]; // 第2字节是后续的字节数量
int dataIndex = 3; // 数据从第3字节开始(0-based)
var result = new bool[count];
for (int i = 0, bytePos = 0, bitPos = 0; i < count; i++, bitPos++)
{
if (bitPos == 8)
{
bitPos = 0;
bytePos++;
}
result[i] = ((pdu[dataIndex + bytePos] >> bitPos) & 0x01) != 0;
}
return result;
}
///
/// 解析寄存器数据
///
///
///
///
private ushort[] ParseRegisters(byte[] pdu, ushort count)
{
var result = new ushort[count];
int dataStart = 3; // 数据从第3字节开始
for (int i = 0; i < count; i++)
{
int offset = dataStart + i * 2;
result[i] = (ushort)((pdu[offset] << 8) | pdu[offset + 1]);
}
return result;
}
#endregion
///
/// 处理消息队列,发送请求到服务器
///
///
private async Task ProcessQueueAsync()
{
while (_tcpClient.Connected)
{
var request = await _channel.Reader.ReadAsync();
if (request.PDU is null)
{
request.Completion?.TrySetCanceled();
continue;
}
byte[] packet = BuildPacket(request.TransactionId, 0x01, (byte)request.FunctionCode, request.PDU);
OnTx?.Invoke(packet); // 触发发送日志
await _stream.WriteAsync(packet, 0, packet.Length);
await _stream.FlushAsync();
}
}
///
/// 发送请求并等待响应
///
/// 功能码
/// 内容
/// 超时时间
/// 最大重发次数
///
///
public Task SendAsync(ModbusFunctionCode functionCode, byte[] pdu)
{
int id = Interlocked.Increment(ref _transactionId);
var transactionId = (ushort)(id % ushort.MaxValue); // 0~65535 循环
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var request = new ModbusTcpRequest
{
TransactionId = transactionId,
FunctionCode = functionCode,
PDU = pdu,
Completion = tcs
};
_pendingRequests[transactionId] = tcs;
_channel.Writer.TryWrite(request);
return tcs.Task;
}
///
/// 接收数据循环
///
///
private async Task ReceiveLoopAsync()
{
var buffer = new byte[1024];
while (true)
{
int len = await _stream.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
//int len = await _stream.ReadAsync(buffer.AsMemory(0, buffer.Length)).ConfigureAwait(false);
if (len == 0) return; // 连接关闭
if (len < 6)
{
Console.WriteLine("接收到的数据长度不足");
return;
}
ushort protocolId = BinaryPrimitives.ReadUInt16BigEndian(buffer.AsSpan(2, 2));
if (protocolId != 0x0000)
{
Console.WriteLine($"协议不匹配: {protocolId:X4}");
return;
}
ushort dataLength = BinaryPrimitives.ReadUInt16BigEndian(buffer.AsSpan(4, 2));
// 检查数据长度是否合法
if (dataLength > 253 || len < 6 + dataLength)
{
Console.WriteLine($"数据长度异常: dataLength={dataLength}, 实际接收={len}");
return;
}
ushort transactionId = BinaryPrimitives.ReadUInt16BigEndian(buffer.AsSpan(0, 2));
if (_pendingRequests.TryRemove(transactionId, out var tcs))
{
var responsePdu = new ReadOnlySpan(buffer, 6, dataLength).ToArray();
if (OnRx is not null)
{
var packet = new ReadOnlySpan(buffer, 0, 6 + dataLength).ToArray();
OnRx?.Invoke(packet); // 触发接收日志
}
// 检查是否异常响应
if ((responsePdu[1] & 0x80) != 0)
{
byte exceptionCode = responsePdu[2];
tcs.SetException(new ModbusException(responsePdu[1], exceptionCode));
}
else
{
tcs.SetResult(responsePdu);
}
}
else
{
Console.WriteLine($"未匹配到 TransactionId={transactionId} 的请求");
}
}
}
///
/// 构造 Modbus Tcp 报文
///
///
///
///
///
///
private byte[] BuildPacket(ushort transactionId, byte unitId, byte functionCode, byte[] pduData)
{
int pduLength = 1 + pduData.Length; // PDU 长度 = 功能码1字节 + 数据长度
int totalLength = 7 + pduLength; // MBAP头长度 = 7字节 + PDU长度
Span packet = totalLength <= 256 ? stackalloc byte[totalLength] : new byte[totalLength];
// 写入事务ID(大端序)
packet[0] = (byte)(transactionId >> 8);
packet[1] = (byte)(transactionId);
// 协议ID(固定0x0000)
packet[2] = 0;
packet[3] = 0;
// 长度(PDU长度 + 1字节UnitID)
ushort length = (ushort)(pduLength + 1);
packet[4] = (byte)(length >> 8);
packet[5] = (byte)(length);
// UnitID & 功能码
packet[6] = unitId;
packet[7] = functionCode;
// 复制PDU数据
pduData.AsSpan().CopyTo(packet.Slice(8));
return packet.ToArray();
}
public void Dispose()
{
var tcs = _pendingRequests.Values.ToArray();
foreach (var pending in tcs)
{
pending.TrySetCanceled();
}
_stream?.Close();
_tcpClient?.Close();
}
}
}