使用windows消息队列MessageQueue
时间:2022-03-18 14:50
Config中appSettings配置:
<--本地消息队列时 value=".\PRIVATE$\MgrApiRequest"/>-->
<add key="RequestQueueName" value="FormatName:Direct=TCP:192.168.100.102\PRIVATE$\MgrApiRequest" />
消息队列连接,将order类资料加入消息队列中:
MessageQueue messageOrderQueue = new MessageQueue(ConfigurationManager.AppSettings["RequestQueueName"]);
messageOrderQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
using (Message message = new Message())
{
message.Body = JsonConvert.SerializeObject(order);
message.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
messageOrderQueue.Send(message);
}
监控消息队列,获取消息队列中的数据进行处理:
private CancellationTokenSource cts = new CancellationTokenSource();
internal void Init()
{
string CommissionQueueName = ConfigurationManager.AppSettings["RequestQueueName"];
if (!MessageQueue.Exists(CommissionQueueName))
{
MessageQueue.Create(CommissionQueueName);
}
messageOrderQueue = new MessageQueue(CommissionQueueName);
messageOrderQueue.SetPermissions("Everyone", MessageQueueAccessRights.FullControl, AccessControlEntryType.Allow);
messageOrderQueue.SetPermissions("ANONYMOUS LOGON", MessageQueueAccessRights.FullControl, AccessControlEntryType.Allow);
messageOrderQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });//new XmlMessageFormatter();
messageOrderQueue.MessageReadPropertyFilter = new MessagePropertyFilter() { Id = true, CorrelationId = true, Body = true, Label = true };
messageOrderQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(RequestReceived);
messageOrderQueue.BeginReceive();
Task.Factory.StartNew(() => HandleRequestThread(), TaskCreationOptions.LongRunning);
}
/// <summary>
/// 从消息队列接收消息,加入待处理队列RequestQueue中
/// </summary>
/// <param name="source"></param>
/// <param name="args"></param>
private void RequestReceived(object source, ReceiveCompletedEventArgs args)
{
if (!cts.IsCancellationRequested)
{
MessageQueue q = (MessageQueue)source;
//once a message is received, stop receiving
using (var msMessage = q.EndReceive(args.AsyncResult))
{
try
{
//msMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
//do something with the message
string json = msMessage.Body as string;
RequestQueue.Add(json);
}
catch (Exception ex)
{
Logger.LogError(ex.ToString());
}
}
//begin receiving again
q.BeginReceive();
}
}
private async void HandleRequestThread()
{
while (!cts.IsCancellationRequested)
{
try
{
var json = RequestQueue.Take(cts.Token);
Logger.LogInfo($"msgContent:{json}");
MT4Order tradeOrder = JsonConvert.DeserializeObject<MT4Order>(json);
......
}
catch (OperationCanceledException)
{
Logger.LogError("HandleRequestThread is shutting down");
}
catch (Exception ex)
{
Logger.LogError(ex.ToString());
}
}
}