.NetフレームワークにObjective-Cのdispatchシリアルキューみたいなの無いのかなぁ。見逃してる?
lock使わずに書きたいけど、ConcurrentQueueとかInterlockedとか使って何とかならないかと考えたけど無理っぽい・・・? => ぜんぜん無理じゃなかった。
lock を使う版
/// <summary>
/// タスクを非同期でシリアル処理をするキュー。
/// インスタンスの生成がメンドイ場合はDefaultQueueを使って下さい。
/// ```C#
/// SerialTaskQueue.DefaultQueue.Enqueue(action);
/// SerialTaskQueue.DefaultQueue.Enqueue(async action);
/// ```
/// </summary>
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
public class SerialTaskQueue
{
private readonly Queue<object> tasks = new Queue<object>();
private readonly Queue<Action<Task>> completionHandlers = new Queue<Action<Task>>();
private bool active = false;
public void Enqueue(Action action)
{
Enqueue(action, null);
}
public void Enqueue(Func<Task> function)
{
Enqueue(function, null);
}
public void Enqueue(Action action, Action<Task> onComplete)
{
lock (tasks)
{
if (active)
{
tasks.Enqueue(action);
completionHandlers.Enqueue(onComplete);
}
else
{
active = true;
Run(tasks, onComplete);
}
}
}
public void Enqueue(Func<Task> function, Action<Task> onComplete)
{
lock (tasks)
{
if (active)
{
tasks.Enqueue(function);
completionHandlers.Enqueue(onComplete);
}
else
{
active = true;
Run(function, onComplete);
}
}
}
private void Run(object actionOrFunction, Action<Task> onComplete)
{
Task task;
if (actionOrFunction is Action a)
{
task = Task.Run(a);
}
else
{
task = Task.Run(actionOrFunction as Func<Task>);
}
task.ContinueWith(RunNext, onComplete).ConfigureAwait(false);
}
private void RunNext(Task completedTask, object onComplete)
{
(onComplete as Action<Task>)?.Invoke(completedTask);
lock (tasks)
{
if (tasks.Count == 0)
{
active = false;
}
else
{
tasks.TrimExcess();
completionHandlers.TrimExcess();
Run(tasks.Dequeue(), completionHandlers.Dequeue());
}
}
}
}
Interlock を使う版
競合が多いと lock 版より速度落ちる。
/// <summary>
/// タスクを非同期でシリアル処理をするキュー。
/// インスタンスの生成がメンドイ場合はDefaultQueueを使って下さい。
/// ```C#
/// SerialTaskQueue.DefaultQueue.Enqueue(action);
/// SerialTaskQueue.DefaultQueue.Enqueue(async action);
/// ```
/// </summary>
public class SerialTaskQueue
{
private static readonly SerialTaskQueue defaultQueue = new SerialTaskQueue();
private readonly Queue<object> tasks = new Queue<object>();
private readonly Queue<Action<Task>> completionHandlers = new Queue<Action<Task>>();
private bool active = false;
private volatile int locked = 0;
public static SerialTaskQueue DefaultQueue { get { return defaultQueue; } }
public void Enqueue(Action action)
{
Enqueue(action, null);
}
public void Enqueue(Func<Task> function)
{
Enqueue(function, null);
}
public void Enqueue(Action action, Action<Task> onComplete)
{
int f = 1;
try
{
do
{
f = Interlocked.CompareExchange(ref locked, 1, 0);
}
while (f == 1);
if (active)
{
tasks.Enqueue(action);
completionHandlers.Enqueue(onComplete);
}
else
{
active = true;
Run(action, onComplete);
}
}
finally
{
if (f == 0)
{
locked = 0;
}
}
}
public void Enqueue(Func<Task> function, Action<Task> onComplete)
{
int f = 1;
try
{
do
{
f = Interlocked.CompareExchange(ref locked, 1, 0);
}
while (f == 1);
if (active)
{
tasks.Enqueue(function);
completionHandlers.Enqueue(onComplete);
}
else
{
active = true;
Run(function, onComplete);
}
}
finally
{
if (f == 0)
{
locked = 0;
}
}
}
private void Run(object actionOrFunction, Action<Task> onComplete)
{
Task task;
if (actionOrFunction is Action a)
{
task = Task.Run(a);
}
else
{
task = Task.Run(actionOrFunction as Func<Task>);
}
task.ContinueWith(RunNext, onComplete).ConfigureAwait(false);
}
private void RunNext(Task completedTask, object onComplete)
{
(onComplete as Action<Task>)?.Invoke(completedTask);
int f = 1;
try
{
do
{
f = Interlocked.CompareExchange(ref locked, 1, 0);
}
while (f == 1);
if (tasks.Count == 0)
{
active = false;
}
else
{
tasks.TrimExcess();
completionHandlers.TrimExcess();
Run(tasks.Dequeue(), completionHandlers.Dequeue());
}
}
finally
{
if (f == 0)
{
locked = 0;
}
}
}
}