我有一个场景,其中有多个线程添加到队列中,并且有多个线程从同一队列中读取。如果队列达到特定大小,则添加队列时将阻塞 所有 正在填充队列的 线程 ,直到从队列中删除一项为止。
下面的解决方案是我现在正在使用的解决方案,我的问题是:如何改进?是否有一个对象已经在我应该使用的BCL中启用此行为?
internal class BlockingCollection<T> : CollectionBase, IEnumerable { //todo: might be worth changing this into a proper QUEUE private AutoResetEvent _FullEvent = new AutoResetEvent(false); internal T this[int i] { get { return (T) List[i]; } } private int _MaxSize; internal int MaxSize { get { return _MaxSize; } set { _MaxSize = value; checkSize(); } } internal BlockingCollection(int maxSize) { MaxSize = maxSize; } internal void Add(T item) { Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.WaitOne(); List.Add(item); Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId)); checkSize(); } internal void Remove(T item) { lock (List) { List.Remove(item); } Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId)); } protected override void OnRemoveComplete(int index, object value) { checkSize(); base.OnRemoveComplete(index, value); } internal new IEnumerator GetEnumerator() { return List.GetEnumerator(); } private void checkSize() { if (Count < MaxSize) { Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.Set(); } else { Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId)); _FullEvent.Reset(); } } }
看起来非常不安全(几乎没有同步);怎么样:
class SizeQueue<T> { private readonly Queue<T> queue = new Queue<T>(); private readonly int maxSize; public SizeQueue(int maxSize) { this.maxSize = maxSize; } public void Enqueue(T item) { lock (queue) { while (queue.Count >= maxSize) { Monitor.Wait(queue); } queue.Enqueue(item); if (queue.Count == 1) { // wake up any blocked dequeue Monitor.PulseAll(queue); } } } public T Dequeue() { lock (queue) { while (queue.Count == 0) { Monitor.Wait(queue); } T item = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return item; } } }
(编辑)
实际上,您希望有一种关闭队列的方法,以使读者开始干净地退出-也许像是布尔标志-如果设置了,则空队列只会返回(而不是阻塞):
bool closing; public void Close() { lock(queue) { closing = true; Monitor.PulseAll(queue); } } public bool TryDequeue(out T value) { lock (queue) { while (queue.Count == 0) { if (closing) { value = default(T); return false; } Monitor.Wait(queue); } value = queue.Dequeue(); if (queue.Count == maxSize - 1) { // wake up any blocked enqueue Monitor.PulseAll(queue); } return true; } }