[C#]タスクを非同期でシリアル処理をする(?)

.NetフレームワークにObjective-Cのdispatchシリアルキューみたいなの無いのかなぁ。見逃してる?

lock使わずに書きたいけど、ConcurrentQueueとかInterlockedとか使って何とかならないかと考えたけど無理っぽい・・・?  => ぜんぜん無理じゃなかった。

lock を使う版

/// <summary>
/// タスクを非同期でシリアル処理をするキュー。
/// インスタンスの生成がメンドイ場合はDefaultQueueを使って下さい。
/// ```C#
/// SerialTaskQueue.DefaultQueue.Enqueue(action);
/// SerialTaskQueue.DefaultQueue.Enqueue(async action);
/// ```
/// </summary>
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class SerialTaskQueue
{
    private readonly Queue<object> tasks = new Queue<object>();
    private readonly Queue<Action<Task>> completionHandlers = new Queue<Action<Task>>();
    private bool active = false;

    public void Enqueue(Action action)
    {
        Enqueue(action, null);
    }

    public void Enqueue(Func<Task> function)
    {
        Enqueue(function, null);
    }

    public void Enqueue(Action action, Action<Task> onComplete)
    {
        lock (tasks)
        {
            if (active)
            {
                tasks.Enqueue(action);
                completionHandlers.Enqueue(onComplete);
            }
            else
            {
                active = true;
                Run(tasks, onComplete);
            }
        }
    }

    public void Enqueue(Func<Task> function, Action<Task> onComplete)
    {
        lock (tasks)
        {
            if (active)
            {
                tasks.Enqueue(function);
                completionHandlers.Enqueue(onComplete);
            }
            else
            {
                active = true;
                Run(function, onComplete);
            }
        }
    }

    private void Run(object actionOrFunction, Action<Task> onComplete)
    {
        Task task;
        if (actionOrFunction is Action a)
        {
            task = Task.Run(a);
        }
        else
        {
            task = Task.Run(actionOrFunction as Func<Task>);
        }

        task.ContinueWith(RunNext, onComplete).ConfigureAwait(false);
    }

    private void RunNext(Task completedTask, object onComplete)
    {
        (onComplete as Action<Task>)?.Invoke(completedTask);

        lock (tasks)
        {
            if (tasks.Count == 0)
            {
                active = false;
            }
            else
            {
                tasks.TrimExcess();
                completionHandlers.TrimExcess();

                Run(tasks.Dequeue(), completionHandlers.Dequeue());
            }
        }
    }
}

Interlock を使う版

競合が多いと lock 版より速度落ちる。

/// <summary>
/// タスクを非同期でシリアル処理をするキュー。
/// インスタンスの生成がメンドイ場合はDefaultQueueを使って下さい。
/// ```C#
/// SerialTaskQueue.DefaultQueue.Enqueue(action);
/// SerialTaskQueue.DefaultQueue.Enqueue(async action);
/// ```
/// </summary>
public class SerialTaskQueue
{
    private static readonly SerialTaskQueue defaultQueue = new SerialTaskQueue();
    private readonly Queue<object> tasks = new Queue<object>();
    private readonly Queue<Action<Task>> completionHandlers = new Queue<Action<Task>>();
    private bool active = false;
    private volatile int locked = 0;

    public static SerialTaskQueue DefaultQueue { get { return defaultQueue; } }

    public void Enqueue(Action action)
    {
        Enqueue(action, null);
    }

    public void Enqueue(Func<Task> function)
    {
        Enqueue(function, null);
    }

    public void Enqueue(Action action, Action<Task> onComplete)
    {
        int f = 1;
        try
        {
            do
            {
                f = Interlocked.CompareExchange(ref locked, 1, 0);
            }
            while (f == 1);

            if (active)
            {
                tasks.Enqueue(action);
                completionHandlers.Enqueue(onComplete);
            }
            else
            {
                active = true;
                Run(action, onComplete);
            }
        }
        finally
        {
            if (f == 0)
            {
                locked = 0;
            }
        }
    }

    public void Enqueue(Func<Task> function, Action<Task> onComplete)
    {
        int f = 1;
        try
        {
            do
            {
                f = Interlocked.CompareExchange(ref locked, 1, 0);
            }
            while (f == 1);

            if (active)
            {
                tasks.Enqueue(function);
                completionHandlers.Enqueue(onComplete);
            }
            else
            {
                active = true;
                Run(function, onComplete);
            }
        }
        finally
        {
            if (f == 0)
            {
                locked = 0;
            }
        }
    }

    private void Run(object actionOrFunction, Action<Task> onComplete)
    {
        Task task;
        if (actionOrFunction is Action a)
        {
            task = Task.Run(a);
        }
        else
        {
            task = Task.Run(actionOrFunction as Func<Task>);
        }

        task.ContinueWith(RunNext, onComplete).ConfigureAwait(false);
    }

    private void RunNext(Task completedTask, object onComplete)
    {
        (onComplete as Action<Task>)?.Invoke(completedTask);

        int f = 1;
        try
        {
            do
            {
                f = Interlocked.CompareExchange(ref locked, 1, 0);
            }
            while (f == 1);

            if (tasks.Count == 0)
            {
                active = false;
            }
            else
            {
                tasks.TrimExcess();
                completionHandlers.TrimExcess();

                Run(tasks.Dequeue(), completionHandlers.Dequeue());
            }
        }
        finally
        {
            if (f == 0)
            {
                locked = 0;
            }
        }
    }
}
Share
関連記事