小编典典

节流异步任务

c#

我想运行一堆异步任务,并限制在任何给定时间可能有多少任务待完成。

假设您有1000个网址,并且一次只想打开50个请求;但是一旦一个请求完成,您就会打开与列表中下一个URL的连接。这样一来,每次URL总是用完时,恰好有50个连接打开。

如果可能,我还想利用给定数量的线程。

我想出了一种扩展方法,ThrottleTasksAsync可以满足我的需求。已经有一个更简单的解决方案了吗?我认为这是一种常见的情况。

用法:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

这是代码:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

该方法利用BlockingCollectionSemaphoreSlim使其起作用。节流器在一个线程上运行,所有异步任务在另一个线程上运行。为了实现并行性,我添加了一个maxDegreeOfParallelism参数,该参数传递给了一个Parallel.ForEach重新用作while循环的循环。

旧版本为:

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

但是,线程池很快就用尽了,您不能执行async/ await

奖励:
为了解决在调用when时BlockingCollection引发异常的问题,我使用了带有超时的重载。如果我不使用in中的超时,它将无法实现使用since
不会阻塞的目的。有没有更好的办法?理想情况下,将有一种方法。Take()``CompleteAdding()``TryTake``TryTake``BlockingCollection``TryTake``TakeAsync


阅读 329

收藏
2020-05-19

共1个答案

小编典典

根据建议,使用TPL Dataflow

TransformBlock<TInput, TOutput>您可能正在寻找A。

您定义一个MaxDegreeOfParallelism来限制可以并行转换多少个字符串(即可以下载多少个url)。然后,您将URL发布到该块,完成后,您告诉该块您已完成添加项目,并获取了响应。

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}

注意:TransformBlock缓冲区同时缓冲其输入和输出。那么,为什么我们需要将其链接到BufferBlock

因为在TransformBlock所有项目(HttpResponse)被消耗完之后才能完成,并且await downloader.Completion将挂起。取而代之的是,我们将其downloader所有输出转发到专用的缓冲块-
然后我们等待downloader完成,然后检查缓冲块。

2020-05-19