DeveelDB  20151217
complete SQL database system, primarly developed for .NET/Mono frameworks
ThreadedQueue.cs
Go to the documentation of this file.
1 //
2 // Copyright 2010-2015 Deveel
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 using System;
18 using System.Collections;
19 using System.Collections.Generic;
20 using System.Threading;
21 #if PCL
22 using System.Threading.Tasks;
23 #endif
24 
25 namespace Deveel.Data.Diagnostics {
26  public abstract class ThreadedQueue<TMessage> : IDisposable where TMessage : class {
27  private readonly Queue<TMessage> messageQueue;
28 #if PCL
29  private CancellationTokenSource cancellationTokenSource;
30  private CancellationToken cancellationToken;
31  private List<Task> tasks;
32 #else
33  private List<Thread> threads;
34 #endif
35  private bool started;
36  private bool route;
37  private AutoResetEvent reset;
38 
39  private bool disposed;
40 
41  public const int DefaultThreadCount = 2;
42 
43  protected ThreadedQueue() {
44 #if PCL
45  cancellationTokenSource = new CancellationTokenSource();
46  cancellationToken = cancellationTokenSource.Token;
47 #endif
48 
49  messageQueue = new Queue<TMessage>();
50  }
51 
53  Dispose(false);
54  }
55 
56  public virtual int ThreadCount {
57  get { return DefaultThreadCount; }
58  }
59 
60  private void Start() {
61  if (started)
62  return;
63 
64 
65 #if PCL
66  tasks = new List<Task>();
67 #else
68  threads = new List<Thread>(ThreadCount);
69 #endif
70 
71  for (int i = 0; i < ThreadCount; i++) {
72 #if PCL
73  tasks.Add(new Task(RouteMessages, cancellationToken));
74 #else
75  var thread = new Thread(RouteMessages) {
76  IsBackground = true,
77  Name = String.Format("{0}.QueueConsumer {1}", GetType().Name, i)
78  };
79 
80  threads.Add(thread);
81 #endif
82  }
83 
84  route = true;
85  reset = new AutoResetEvent(false);
86 
87 #if PCL
88  foreach (var task in tasks) {
89  task.Start();
90  }
91 #else
92  foreach (var thread in threads) {
93  thread.Start();
94  }
95 #endif
96 
97  started = true;
98  }
99 
100  private void Stop() {
101  route = false;
102 
103  if (!started)
104  return;
105 
106 #if PCL
107  cancellationTokenSource.Cancel(true);
108 
109  foreach (var task in tasks) {
110  try {
111  task.Wait(300);
112  } catch (TaskCanceledException) {
113  }
114  }
115 #else
116  foreach (var thread in threads) {
117  try {
118  thread.Join(300);
119  thread.Interrupt();
120  } catch (ThreadInterruptedException) {
121  }
122  }
123 #endif
124 
125  started = false;
126  }
127 
128  private void RouteMessages() {
129  while (route) {
130  reset.WaitOne();
131 
132  TMessage message;
133 
134  lock (((ICollection) messageQueue).SyncRoot) {
135  message = messageQueue.Dequeue();
136  }
137 
138  Consume(message);
139  }
140  }
141 
142  protected abstract void Consume(TMessage message);
143 
144  protected void Enqueue(TMessage message) {
145  Start();
146 
147  if (!route)
148  return;
149 
150  lock (((ICollection) messageQueue).SyncRoot) {
151  messageQueue.Enqueue(message);
152  }
153 
154  reset.Set();
155  }
156 
157  public void Dispose() {
158  Dispose(true);
159  GC.SuppressFinalize(this);
160  }
161 
162  protected virtual void Dispose(bool disposing) {
163  if (disposed) {
164  if (disposing) {
165  Stop();
166  }
167 
168 #if PCL
169  tasks = null;
170 #else
171  threads = null;
172 #endif
173  disposed = true;
174  }
175  }
176  }
177 }
virtual void Dispose(bool disposing)
readonly Queue< TMessage > messageQueue