我想运行一堆异步任务,并限制在任何给定时间可能有多少任务待完成。
假设您有1000个网址,并且一次只想打开50个请求;但是一旦一个请求完成,您就会打开与列表中下一个URL的连接。这样一来,每次URL总是用完时,恰好有50个连接打开。
如果可能,我还想利用给定数量的线程。
我想出了一种扩展方法,ThrottleTasksAsync可以满足我的需求。已经有一个更简单的解决方案了吗?我认为这是一种常见的情况。
ThrottleTasksAsync
用法:
class Program { static void Main(string[] args) { Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait(); Console.WriteLine("Press a key to exit..."); Console.ReadKey(true); } }
这是代码:
static class IEnumerableExtensions { public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun) { var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>()); var semaphore = new SemaphoreSlim(maxConcurrentTasks); // Run the throttler on a separate thread. var t = Task.Run(() => { foreach (var item in enumerable) { // Wait for the semaphore semaphore.Wait(); blockingQueue.Add(item); } blockingQueue.CompleteAdding(); }); var taskList = new List<Task<Result_T>>(); Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, _ => { Enumerable_T item; if (blockingQueue.TryTake(out item, 100)) { taskList.Add( // Run the task taskToRun(item) .ContinueWith(tsk => { // For effect Thread.Sleep(2000); // Release the semaphore semaphore.Release(); return tsk.Result; } ) ); } }); // Await all the tasks. return await Task.WhenAll(taskList); } static IEnumerable<bool> IterateUntilTrue(Func<bool> condition) { while (!condition()) yield return true; } }
该方法利用BlockingCollection并SemaphoreSlim使其起作用。节流器在一个线程上运行,所有异步任务在另一个线程上运行。为了实现并行性,我添加了一个maxDegreeOfParallelism参数,该参数传递给了一个Parallel.ForEach重新用作while循环的循环。
BlockingCollection
SemaphoreSlim
Parallel.ForEach
while
旧版本为:
foreach (var master = ...) { var details = ...; Parallel.ForEach(details, detail => { // Process each detail record here }, new ParallelOptions { MaxDegreeOfParallelism = 15 }); // Perform the final batch updates here }
但是,线程池很快就用尽了,您不能执行async/ await。
async
await
奖励: 为了解决在调用when时BlockingCollection引发异常的问题,我使用了带有超时的重载。如果我不使用in中的超时,它将无法实现使用since 不会阻塞的目的。有没有更好的办法?理想情况下,将有一种方法。Take()``CompleteAdding()``TryTake``TryTake``BlockingCollection``TryTake``TakeAsync
Take()``CompleteAdding()``TryTake``TryTake``BlockingCollection``TryTake``TakeAsync
根据建议,使用TPL Dataflow。
TransformBlock<TInput, TOutput>您可能正在寻找A。
TransformBlock<TInput, TOutput>
您定义一个MaxDegreeOfParallelism来限制可以并行转换多少个字符串(即可以下载多少个url)。然后,您将URL发布到该块,完成后,您告诉该块您已完成添加项目,并获取了响应。
MaxDegreeOfParallelism
var downloader = new TransformBlock<string, HttpResponse>( url => Download(url), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 } ); var buffer = new BufferBlock<HttpResponse>(); downloader.LinkTo(buffer); foreach(var url in urls) downloader.Post(url); //or await downloader.SendAsync(url); downloader.Complete(); await downloader.Completion; IList<HttpResponse> responses; if (buffer.TryReceiveAll(out responses)) { //process responses }
注意:TransformBlock缓冲区同时缓冲其输入和输出。那么,为什么我们需要将其链接到BufferBlock?
TransformBlock
BufferBlock
因为在TransformBlock所有项目(HttpResponse)被消耗完之后才能完成,并且await downloader.Completion将挂起。取而代之的是,我们将其downloader所有输出转发到专用的缓冲块- 然后我们等待downloader完成,然后检查缓冲块。
HttpResponse
await downloader.Completion
downloader