其他
.NET Core 使用 Channel 消息队列
↓推荐关注↓
常见的使用redis,或者mq,只需要不断的向中间件发送数据即可,redis使用队列,如果是mq直接发送消息即可,使用起来简单方便,但是要引入这些中间件,目前的架构里面没有,需要自己去起服务,维护。
public async Task ProduceHeartBeat(string message)
{
await channel.Writer.WriteAsync(message);
}
不断的向里面写入数据即可.
消费者代码
/// <summary>
/// timespan时间内消费多少数据
/// </summary>
/// <param name="count"></param>
/// <param name="timeSpan"></param>
/// <returns></returns>
public async Task<List<string>> ConsumeHeartBeatAsync(int count,TimeSpan timeSpan)
{
var result = new List<string>(count);
CancellationTokenSource cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
cts.CancelAfter(timeSpan);
int rcount = 0;
while ( !cancellationToken.IsCancellationRequested && rcount<count)
{
//await Task.Delay(2000);
if (channel.Reader.TryRead(out var number))
{
Console.WriteLine(number);
result.Add(number);
rcount++;
}
else
{
break;
}
}
return result;
}
里面加入了一个cancellationToken,进行消费的时长限制。在此时长内消费多少条数据,超时直接结束。
这就是基本的代码
后台定时消费数据
public class HeartBeatService : BackgroundService
{
private readonly HeartBeatsChannel heartBeatsChannel;
public HeartBeatService(HeartBeatsChannel heartBeatsChannel)
{
this.heartBeatsChannel = heartBeatsChannel;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
Task.Factory.StartNew(() =>
{
while (!stoppingToken.IsCancellationRequested)
{
//阻塞的队列使得一直在同一个线程运行
Process(15,heartBeatsChannel).Wait();
}
}, TaskCreationOptions.LongRunning);
Console.WriteLine("主线程 现在运行的线程id为:" + Thread.CurrentThread.ManagedThreadId);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
/// <summary>
/// 消费数据
/// </summary>
/// <param name="count">一次消费数量</param>
/// <param name="heartBeatsChannel"></param>
/// <returns></returns>
private async Task Process(int count ,HeartBeatsChannel heartBeatsChannel)
{
Console.WriteLine("子线程_现在运行的线程id为:" + Thread.CurrentThread.ManagedThreadId);
//每次消费三十个
if (heartBeatsChannel.IsHasContent)
{
//int count = 15;
//进行消费
await heartBeatsChannel.ConsumeHeartBeatAsync(count, TimeSpan.FromSeconds(3));
}
await Task.Delay(3000);
}
}
使用的是BackgroundServic,直接实现要处理的业务逻辑就好了。
在这里使用的是TaskCreationOptions.LongRunning,新开一个线程去处理心跳数据。
总结
以上就是主要的实现全过程,完整的代码在github
https://github.com/lackguozi/LearnChannelWebApi
实际上完全可以不用后台去定时消费数据,channel有很多api可以去处理,比如WaitToReadAsync(),但是这里没有使用,主要是不想持续的占数据库资源?
总结的话学习了channel的用法,底层似乎使用了deque?只稍微看了下源码,但是看到了许多的lock,这个是必不可少的。还是巨硬轮子造的好
转自:果小天
链接:cnblogs.com/guoxiaotian/p/17506536.html
- EOF -
C# 实现 Linux 视频聊天、远程桌面(源码).NET Core 使用 HttpClient 的正确方式
看完本文有收获?请转发分享给更多人
推荐关注「DotNet」,提升.Net技能
点赞和在看就是最大的支持❤️