19 using System.Collections.Generic;
22 using System.Threading.Tasks;
25 namespace Deveel.Data.Diagnostics {
26 public abstract class ThreadedQueue<TMessage> : IDisposable where TMessage : class {
29 private CancellationTokenSource cancellationTokenSource;
30 private CancellationToken cancellationToken;
31 private List<Task> tasks;
41 public const int DefaultThreadCount = 2;
45 cancellationTokenSource =
new CancellationTokenSource();
46 cancellationToken = cancellationTokenSource.Token;
49 messageQueue =
new Queue<TMessage>();
56 public virtual int ThreadCount {
57 get {
return DefaultThreadCount; }
66 tasks =
new List<Task>();
68 threads =
new List<Thread>(ThreadCount);
71 for (
int i = 0; i < ThreadCount; i++) {
73 tasks.Add(
new Task(RouteMessages, cancellationToken));
75 var thread =
new Thread(RouteMessages) {
77 Name = String.Format(
"{0}.QueueConsumer {1}", GetType().Name, i)
85 reset =
new AutoResetEvent(
false);
88 foreach (var task
in tasks) {
92 foreach (var thread
in threads) {
107 cancellationTokenSource.Cancel(
true);
109 foreach (var task
in tasks) {
112 }
catch (TaskCanceledException) {
116 foreach (var thread
in threads) {
120 }
catch (ThreadInterruptedException) {
134 lock (((ICollection) messageQueue).SyncRoot) {
135 message = messageQueue.Dequeue();
142 protected abstract void Consume(TMessage message);
150 lock (((ICollection) messageQueue).SyncRoot) {
151 messageQueue.Enqueue(message);
159 GC.SuppressFinalize(
this);
162 protected virtual void Dispose(
bool disposing) {
virtual void Dispose(bool disposing)
void Enqueue(TMessage message)
readonly Queue< TMessage > messageQueue