Files
WCS/Plugins/Driver/Cowain.Driver/Services/VariableChannelService.cs
2026-03-02 09:08:20 +08:00

201 lines
7.3 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Microsoft.Extensions.Logging;
using Plugin.Cowain.Driver.IServices;
using Plugin.Cowain.Driver.Models;
using Plugin.Cowain.Driver.ViewModels;
using System.Threading;
using System.Threading.Channels;
namespace Plugin.Cowain.Driver.Services
{
public class VariableChannelService : IVariableChannelService
{
// 核心:标记是否已关闭,初始为 false
private bool _isChannelClosed;
private SemaphoreSlim _semaphores = new SemaphoreSlim(1000);
private readonly Channel<VariableAction> _channel;
private readonly CancellationTokenSource _internalCts = new();
private readonly IActionPluginService _actionPluginService;
private readonly ILogger<VariableChannelService> _logger;
public VariableChannelService(IActionPluginService actionPluginService, ILogger<VariableChannelService> logger)
{
_logger = logger;
_actionPluginService = actionPluginService;
_channel = Channel.CreateUnbounded<VariableAction>();
}
public void RegisterDeviceActions(DeviceViewModel device)
{
if (device.Variables == null)
{
_logger.LogError($"设备 {device.DeviceName} 的变量为空");
return;
}
if (device.VarActions == null)
{
_logger.LogError($"设备 {device.DeviceName} 的动作列表为空");
return;
}
var q = from v in device.Variables
join va in device.VarActions on v.Id equals va.TagId
select new { v, va };
foreach (var item in q)
{
item.v.Register(x =>
{
RegisterAction(x, item.va);
});
}
}
private void RegisterAction(VariableViewModel variable, VarActionViewModel varAction)
{
var condition = _actionPluginService.GetCondition(varAction.Condition);
if (condition == null)
{
_logger.LogError($"条件插件未注册:{varAction.Condition}");
return;
}
if (variable.Value == null)
{
_logger.LogError($"变量值为空:{variable.DeviceName}->{variable.Name}");
return;
}
// 创建变量的副本,避免引用问题
var variableCopy = new VariableViewModel
{
Id = variable.Id,
DeviceId = variable.DeviceId,
DeviceName = variable.DeviceName,
Name = variable.Name,
Address = variable.Address,
Desc = variable.Desc,
DataType = variable.DataType,
ArrayCount = variable.ArrayCount,
Value = variable.Value,
OldValue = variable.OldValue,
UpdateTime = variable.UpdateTime,
Message = variable.Message,
IsSuccess = variable.IsSuccess
};
//需要再设置一次因为new的时候调用了OnDataChange方法导致IsManualTrig被设未false
if (condition.IsMatch(variableCopy, varAction.ActionValue))
{
// 条件匹配,入队列
var va = new VariableAction(varAction.ActionName, varAction.Param, variableCopy);
try
{
_ = _channel.Writer.WriteAsync(va).AsTask().ContinueWith(task =>
{
if (task.IsFaulted)
{
_logger.LogError(task.Exception, $"入队列发生错误:{variable.DeviceName}->{variable.Name}");
}
else if (task.IsCompletedSuccessfully)
{
_logger.LogDebug($"入队列成功:{variable.DeviceName}->{variable.Name}");
}
});
}
catch (Exception ex)
{
_logger.LogError(ex, $"入队列错误:{variable.DeviceName}->{variable.Name}");
}
}
}
/// <summary>
/// 消费
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task ConsumeVariablesAsync()
{
var combinedToken = _internalCts.Token;
//最大并行任务数量20
await foreach (var item in _channel.Reader.ReadAllAsync(combinedToken))
{
await _semaphores.WaitAsync(combinedToken);
if (string.IsNullOrEmpty(item.Param))
{
_logger.LogError($"参数不能未空:{item.Action}");
continue;
}
var action = _actionPluginService.GetAction(item.Action);
if (action == null)
{
_logger.LogError($"变量动作未注册:{item.Action}");
continue;
}
//执行,但不等待结果
try
{
_ = action.ExecuteAsync(item, combinedToken).ContinueWith(task =>
{
if (task.IsFaulted)
{
_logger.LogError(task.Exception, $"执行动作时发生错误:{item.Action}");
}
else if (task.IsCompletedSuccessfully)
{
_logger.LogDebug($"动作执行成功:{item.Action}");
}
});
}
catch (Exception ex)
{
_logger.LogError(ex, $"执行动作时发生错误:{item.Action}");
}
finally
{
_semaphores.Release();
}
}
}
public async Task StopConsumeVariablesAsync(CancellationToken cancellationToken)
{
await _semaphores.WaitAsync(cancellationToken);
try
{
if (_isChannelClosed)
return;
_logger.LogInformation($"软件关闭,触发取消消费");
_internalCts.Cancel();
Exception error = new();
if (_channel.Writer.TryComplete(error))
{
_isChannelClosed = true;
if (error != null)
{
_logger.LogError(error, "ChannelWriter 关闭时携带异常");
}
}
else
{
_logger.LogWarning("ChannelWriter 已无法完成(可能已被关闭)");
_isChannelClosed = true;
}
}
catch (ChannelClosedException ex)
{
//捕获通道已关闭的异常,标记状态并记录日志
_isChannelClosed = true;
_logger.LogError(ex, "Channel 已关闭,调用 Complete 失败");
}
finally
{
_semaphores.Release();
}
}
}
}