Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
628 views
in Technique[技术] by (71.8m points)

c# - Queue of async tasks with throttling which supports muti-threading

I need to implement a library to request vk.com API. The problem is that API supports only 3 requests per second. I would like to have API asynchronous.

Important: API should support safe accessing from multiple threads.

My idea is implement some class called throttler which allow no more than 3 request/second and delay other request.

The interface is next:

public interface IThrottler : IDisposable
{
    Task<TResult> Throttle<TResult>(Func<Task<TResult>> task);
}

The usage is like

var audio = await throttler.Throttle(() => api.MyAudio());
var messages = await throttler.Throttle(() => api.ReadMessages());
var audioLyrics = await throttler.Throttle(() => api.AudioLyrics(audioId));
/// Here should be delay because 3 requests executed
var photo = await throttler.Throttle(() => api.MyPhoto());

How to implement throttler?

Currently I implemented it as queue which is processed by background thread.

public Task<TResult> Throttle<TResult>(Func<Task<TResult>> task)
{
    /// TaskRequest has method Run() to run task
    /// TaskRequest uses TaskCompletionSource to provide new task 
    /// which is resolved when queue processed til this element.
    var request = new TaskRequest<TResult>(task);

    requestQueue.Enqueue(request);

    return request.ResultTask;
}

This is shorten code of background thread loop which process the queue:

private void ProcessQueue(object state)
{
    while (true)
    {
        IRequest request;
        while (requestQueue.TryDequeue(out request))
        {
            /// Delay method calculates actual delay value and calls Thread.Sleep()
            Delay();
            request.Run();
        }

    }
}

Is it possible to implement this without background thread?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

So we'll start out with a solution to a simpler problem, that of creating a queue that process up to N tasks concurrently, rather than throttling to N tasks started per second, and build on that:

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }
    public TaskQueue(int concurrentRequests)
    {
        semaphore = new SemaphoreSlim(concurrentRequests);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

We'll also use the following helper methods to match the result of a TaskCompletionSource to a `Task:

public static void Match<T>(this TaskCompletionSource<T> tcs, Task<T> task)
{
    task.ContinueWith(t =>
    {
        switch (t.Status)
        {
            case TaskStatus.Canceled:
                tcs.SetCanceled();
                break;
            case TaskStatus.Faulted:
                tcs.SetException(t.Exception.InnerExceptions);
                break;
            case TaskStatus.RanToCompletion:
                tcs.SetResult(t.Result);
                break;
        }

    });
}

public static void Match<T>(this TaskCompletionSource<T> tcs, Task task)
{
    Match(tcs, task.ContinueWith(t => default(T)));
}

Now for our actual solution what we can do is each time we need to perform a throttled operation we create a TaskCompletionSource, and then go into our TaskQueue and add an item that starts the task, matches the TCS to its result, doesn't await it, and then delays the task queue for 1 second. The task queue will then not allow a task to start until there are no longer N tasks started in the past second, while the result of the operation itself is the same as the create Task:

public class Throttler
{
    private TaskQueue queue;
    public Throttler(int requestsPerSecond)
    {
        queue = new TaskQueue(requestsPerSecond);
    }
    public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
    public Task Enqueue<T>(Func<Task> taskGenerator)
    {
        TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...