查看原文
其他

C# 8 中使用 Channels

DotNet 2021-09-23

The following article is from 码农读书 Author 码农读书

(给DotNet加星标,提升.Net技能


前言


在面对  生产者-消费者 的场景下, .NET Core提供了一个新的命名空间 System.Threading.Channels 来帮助我们更高效的处理此类问题,有了这个 Channels 存在, 生产者 和 消费者 可以各自处理自己的任务而不相互干扰,有利于两方的并发处理,这篇文章我们就来讨论下如何使用 System.Threading.Channels。


Dataflow vs Channel


在 System.Threading.Tasks.Dataflow 命名空间下提供了一个数据流库,主要封装了 存储 和 处理 两大块,该库专注于 pipeline 处理,而 System.Threading.Tasks.Channels 主要专注于 存储 这块,从单一职责上来说,在 生产者-消费者 场景下,Channels 比 Dataflow 性能要高得多。


为什么要使用 Channels


可以利用 Channels 来实现 生产者和消费者 之间的解耦,大体上有两个好处:


  • 生产者 和 消费者 是相互独立的,两者可以并行执行。


  • 如果生产者不给力,可以创建多个的生产者,如果消费者不给力,可以创建更多的消费者。


总的来说,在 生产者-消费者 模式下可以帮助我们提高应用程序的吞吐率。


安装 System.Threading.Channels


要想使用 Channel,需要用 nuget 引用 System.Threading.Channels 包,还可以通过 Visual Studio 2019 的 NuGet package manager 可视化界面安装 或者 通过 NuGet package manager 命令行工具输入以下命令:


dotnet add package System.Threading.Channels


创建 channel


本质上来说,你可以创建两种类型的 channel,一种是有限容量的 bound channel,一种是无限容量的 unbound channel,接下来的问题是,如何创建呢?Channels 提供了两种 工厂方法 用于创建,如下代码所示:


  • CreateBounded<T> 创建的 channel 是一个有消息上限的通道。


  • CreateUnbounded<T> 创建的 channel 是一个无消息上限的通道。


下面的代码片段展示了如何创建 unbounded channel,并且只能存放 string 类型。


static void Main(string[] args)
{
var channel = Channel.CreateUnbounded<string>();
}


对了,Bounded channel 还提供了一个 FullMode 属性,用于指定当 channel 已满时该如何对插入的 message 进行处理,通常有四种做法。


  • Wait


  • DropWrite


  • DropNewest


  • DropOldest


下面的代码片段展示了如何在 Bounded channel 上使用 FullMode。


static void Main(string[] args)
{
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
}


将消息写入到 channel


要想将 message 写入到 channel,可以使用 WriteAsync() 方法,如下代码所示:


static async Task Main(string[] args)
{
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
await channel.Writer.WriteAsync("Hello World!");
}


从 channel 中读取消息


要想从 channel 中读取 message,可以使用 ReadAsync(),如下代码所示:


static async Task Main(string[] args)
{
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
while (await channel.Reader.WaitToReadAsync())
{
if (channel.Reader.TryRead(out var message))
{
Console.WriteLine(message);
}
}
}


System.Threading.Channels 例子


下面是完整的代码清单,展示了如何从 channel 中读写 message。


class Program
{
static async Task Main(string[] args)
{
await SingleProducerSingleConsumer();
Console.ReadKey();
}
public static async Task SingleProducerSingleConsumer()
{
var channel = Channel.CreateUnbounded<int>();
var reader = channel.Reader;
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i + 1);
}
while (await reader.WaitToReadAsync())
{
if (reader.TryRead(out var number))
{
Console.WriteLine(number);
}
}
}
}



可以看到,控制台中输出了数字 1-10,这些数字正是 Writer 写入到 channel 中的,对吧。


总结


总的来说,要想使用 生产者-消费者 场景,有几种实现途径,比如:BlockingCollection 和 TPL Dataflow,但本篇介绍的 Channels 要比前面的两种性能更高,关于 Channels 更多的细节,


会在后面文章中进行讨论,如果现在想急于了解的话,可以参考MSDN:https://docs.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0


译文:https://www.infoworld.com/article/3445156/how-to-use-systemthreadingchannels-in-net-core.html


- EOF -


推荐阅读  点击标题可跳转
.NET必知的EventCounters性能指标监视器C# 线程、线程池、Task概念+代码实践.NET 5 程序高级调试-WinDbg


看完本文有收获?请转发分享给更多人

推荐关注「DotNet」,提升.Net技能 

点赞和在看就是最大的支持❤️

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存