DeveelDB  20151217
complete SQL database system, primarly developed for .NET/Mono frameworks
NetworkClientConnector.cs
Go to the documentation of this file.
1 //
2 // Copyright 2010-2014 Deveel
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 
16 using System;
17 using System.Collections.Generic;
18 using System.Data;
19 using System.IO;
20 using System.Net.Sockets;
21 using System.Runtime.Serialization.Formatters.Binary;
22 using System.Security.Cryptography;
23 using System.Text;
24 using System.Threading;
25 
26 using Deveel.Data.Client;
27 using Deveel.Data.Routines;
28 
29 namespace Deveel.Data.Protocol {
30  public abstract class NetworkClientConnector : IClientConnector {
31  private readonly object channelLock = new object();
32 
33  private Thread envelopeReceiver;
34  private List<IMessageEnvelope> envelopes;
35 
36  protected NetworkClientConnector() {
37  envelopeReceiver = new Thread(ReceiveEnvelopes) {
38  IsBackground = true,
39  Name = "DeveelDB Network Client Envelope Receiver",
40  Priority = ThreadPriority.AboveNormal
41  };
42 
43  envelopes = new List<IMessageEnvelope>();
44  }
45 
47  Dispose(false);
48  }
49 
50  protected int Timeout { get; set; }
51 
52  public void Dispose() {
53  try {
54  Dispose(true);
55  } catch (Exception) {
56  // we ignore any exception at this point
57  } finally {
58  ChangeState(ConnectorState.Disposed);
59  }
60 
61  GC.SuppressFinalize(this);
62  }
63 
64  protected virtual void Dispose(bool disposing) {
65  if (disposing) {
66  try {
67  Close();
68 
69  if (envelopeReceiver != null) {
70  try {
71  envelopeReceiver.Abort();
72  envelopeReceiver = null;
73  } catch (Exception) {
74 
75  throw;
76  }
77  }
78  } catch (Exception) {
79  }
80 
81  if (InputStream != null)
82  InputStream.Dispose();
83  if (OutputStream != null)
84  OutputStream.Dispose();
85 
86  OutputStream = null;
87  InputStream = null;
88 
89  ChangeState(ConnectorState.Disposed);
90  }
91  }
92 
93  private Stream InputStream { get; set; }
94 
95  private Stream OutputStream { get; set; }
96 
97  public ConnectorState CurrentState { get; private set; }
98 
99  public abstract ConnectionEndPoint LocalEndPoint { get; }
100 
101  public ConnectionEndPoint RemoteEndPoint { get; private set; }
102 
103  private void AssertNotDisposed() {
104  if (CurrentState == ConnectorState.Disposed)
105  throw new ObjectDisposedException(GetType().AssemblyQualifiedName);
106  }
107 
108  private void AssertOpen() {
109  if (CurrentState != ConnectorState.Open)
110  throw new InvalidOperationException();
111  }
112 
113  protected abstract NetworkStream CreateNetworkStream(ConnectionEndPoint remoteEndPoint, FileAccess access);
114 
115  protected void ChangeState(ConnectorState newState) {
116  AssertNotDisposed();
117  CurrentState = newState;
118  }
119 
120  protected void OpenConnector(ConnectionEndPoint remoteEndPoint) {
121  try {
122  RemoteEndPoint = remoteEndPoint;
123  var readStream = CreateNetworkStream(remoteEndPoint, FileAccess.Read);
124  var writeStream = CreateNetworkStream(remoteEndPoint, FileAccess.Write);
125 
126  InputStream = new BufferedStream(readStream, 1024*3);
127  OutputStream = new BufferedStream(writeStream, 1024*3);
128 
129  OnConnectorOpen();
130  ChangeState(ConnectorState.Open);
131 
132  envelopeReceiver.Start();
133  } catch (Exception ex) {
134  //TODO: log somehwere ...
135  throw;
136  }
137  }
138 
139  protected void Close() {
140  try {
141  ChangeState(ConnectorState.Closed);
142 
143  if (envelopeReceiver != null &&
144  envelopeReceiver.ThreadState == ThreadState.Running) {
145  envelopeReceiver.Join(1000);
146  envelopeReceiver = null;
147  }
148 
149  if (InputStream != null)
150  InputStream.Close();
151  if (OutputStream != null)
152  OutputStream.Close();
153  } catch (Exception) {
154 
155  throw;
156  }
157  }
158 
159  protected virtual void OnAuthenticated(string username, long timeStamp) {
160  // TODO: make something with username and timeStamp?
161 
162  ChangeState(ConnectorState.Authenticated);
163  }
164 
165  protected virtual void OnConnectorOpen() {
166  }
167 
168  public abstract ConnectionEndPoint MakeEndPoint(IDictionary<string, object> properties);
169 
171  return new ClientProcessor(this);
172  }
173 
174  public virtual IMessageEnvelope CreateEnvelope(IDictionary<string, object> metadata, IMessage message) {
175  int dispatchId = ExtractDispatchId(metadata);
176  var envelope = new NetworkEnvelope(dispatchId, message);
177  envelope.IssueDate = DateTime.UtcNow;
178  return envelope;
179  }
180 
181  protected virtual IMessage OnProcessServerResponse(IMessageEnvelope envelope) {
182  if (envelope == null)
183  return null;
184  if (envelope.Error != null)
185  throw new DeveelDbException(envelope.Error.ErrorMessage, envelope.Error.ErrorClass, envelope.Error.ErrorCode);
186 
187  return envelope.Message;
188  }
189 
190  public IStreamableObjectChannel CreateObjectChannel(long objectId) {
191  throw new NotImplementedException();
192  }
193 
194  public ITriggerChannel CreateTriggerChannel(string triggerName, string objectName, TriggerEventType eventType) {
195  throw new NotImplementedException();
196  }
197 
198  private ICryptoTransform SelectHashAlgorithm(string name, byte[] key, byte[] iv, FileAccess access) {
199  if (String.Equals(name, EncryptionAlgorithms.HmacMd5, StringComparison.OrdinalIgnoreCase))
200  return new HMACMD5(key);
201  if (String.Equals(name, EncryptionAlgorithms.HmacSha256, StringComparison.OrdinalIgnoreCase))
202  return new HMACSHA256(key);
203  if (String.Equals(name, EncryptionAlgorithms.HmacSha512, StringComparison.OrdinalIgnoreCase))
204  return new HMACSHA512(key);
205 
206  if (String.Equals(name, EncryptionAlgorithms.Des, StringComparison.OrdinalIgnoreCase)) {
207  var des = new DESCryptoServiceProvider();
208  if (access == FileAccess.Read)
209  return des.CreateDecryptor(key, iv);
210  if (access == FileAccess.Write)
211  return des.CreateEncryptor(key, iv);
212  }
213  if (String.Equals(name, EncryptionAlgorithms.TripleDes, StringComparison.OrdinalIgnoreCase)) {
214  var des = new TripleDESCryptoServiceProvider();
215  if (access == FileAccess.Read)
216  return des.CreateDecryptor(key, iv);
217  if (access == FileAccess.Write)
218  return des.CreateEncryptor(key, iv);
219  }
220 
221  throw new NotSupportedException();
222  }
223 
224  public void SetEncrypton(EncryptionData encryptionData) {
225  lock (channelLock) {
226  var key = Encoding.Unicode.GetBytes(encryptionData.Key);
227  var iv = Encoding.Unicode.GetBytes(encryptionData.IV);
228  var readHash = SelectHashAlgorithm(encryptionData.HashAlgorithm, key, iv, FileAccess.Read);
229  var writeHash = SelectHashAlgorithm(encryptionData.HashAlgorithm, key, iv, FileAccess.Write);
230 
231  InputStream = new CryptoStream(InputStream, readHash, CryptoStreamMode.Read);
232  OutputStream = new CryptoStream(OutputStream, writeHash, CryptoStreamMode.Write);
233  }
234  }
235 
236  protected virtual byte[] SerializeEnvelope(IMessageEnvelope envelope) {
237  using (var stream = new MemoryStream()) {
238  var formatter = new BinaryFormatter();
239  formatter.Serialize(stream, envelope);
240  stream.Flush();
241  return stream.ToArray();
242  }
243  }
244 
245  protected void SendEnvelope(IMessageEnvelope envelope) {
246  lock (channelLock) {
247  var bytes = SerializeEnvelope(envelope);
248  OutputStream.Write(bytes, 0, bytes.Length);
249  OutputStream.Flush();
250  }
251  }
252 
253  protected virtual IMessageEnvelope DeserializeEnvelope(byte[] bytes) {
254  using (var stream = new MemoryStream(bytes, false)) {
255  var formatter = new BinaryFormatter();
256  return (IMessageEnvelope) formatter.Deserialize(stream);
257  }
258  }
259 
260  private IMessageEnvelope ReceiveEnvelope(int timeout) {
261  lock (channelLock) {
262  using (var input = new BinaryReader(InputStream)) {
263  try {
264  int commandLength = input.ReadInt32();
265  var buf = new byte[commandLength];
266  input.Read(buf, 0, commandLength);
267  return DeserializeEnvelope(buf);
268  } catch (Exception) {
269  //TODO: log ...
270  throw;
271  }
272  }
273  }
274  }
275 
276  private static int ExtractDispatchId(IDictionary<string, object> metadata) {
277  if (metadata == null || metadata.Count == 0)
278  return -1;
279 
280  object id;
281  if (!metadata.TryGetValue(NetworkEnvelopeMetadataKeys.DispatchId, out id))
282  return -1;
283 
284  return (int) id;
285  }
286 
287  protected virtual bool ShouldReceive(IDictionary<string, object> senderMetadata, IMessageEnvelope envelope) {
288  var senderId = ExtractDispatchId(senderMetadata);
289  var envelopeId = ExtractDispatchId(envelope.Metadata);
290  return senderId == envelopeId;
291  }
292 
293  private IMessageEnvelope ReceiveResponse(int timeout, IDictionary<string, object> senderMetadata) {
294  DateTime timeIn = DateTime.Now;
295  DateTime timeOutHigh = timeIn + new TimeSpan(((long) timeout*1000)*TimeSpan.TicksPerMillisecond);
296 
297  lock (envelopes) {
298  if (envelopes == null)
299  throw new DataException("Connection to server closed");
300 
301  while (true) {
302  for (int i = 0; i < envelopes.Count; ++i) {
303  var envelope = envelopes[i];
304  if (ShouldReceive(senderMetadata, envelope)) {
305  envelopes.RemoveAt(i);
306  return envelope;
307  }
308  }
309 
310  // Return null if we haven't received a response input the timeout
311  // period.
312  if (timeout != 0 &&
313  DateTime.Now > timeOutHigh) {
314  return null;
315  }
316 
317  // Wait a second.
318  try {
319  Monitor.Wait(envelopes, 1000);
320  } catch (ThreadInterruptedException) {
321  /* ignore */
322  }
323 
324  } // while (true)
325  }
326  }
327 
328  private void ReceiveEnvelopes() {
329  try {
330  while (CurrentState != ConnectorState.Closed) {
331  var envelope = ReceiveEnvelope(0);
332  lock (envelopes) {
333  envelopes.Add(envelope);
334 
335  Monitor.PulseAll(envelopes);
336  }
337  }
338  } catch (Exception) {
339  } finally {
340  // Invalidate this object when the thread finishes.
341  object oldEnvelopes = envelopes;
342  lock (oldEnvelopes) {
343  envelopes = null;
344  Monitor.PulseAll(oldEnvelopes);
345  }
346  }
347  }
348 
349  private List<NetworkTriggerChannel> triggerChannels;
350 
351  #region NetworkTriggerChannel
352 
355 
356  public NetworkTriggerChannel(NetworkClientConnector connector, string triggerName, string objectName, TriggerEventType eventType) {
357  this.connector = connector;
358  TriggerName = triggerName;
359  ObjectName = objectName;
360  EventType = eventType;
361  }
362 
363  public string TriggerName { get; private set; }
364 
365  public string ObjectName { get; private set; }
366 
367  public TriggerEventType EventType { get; private set; }
368 
369  public Action<TriggerEventNotification> OnInvoke { get; private set; }
370 
371  public void Dispose() {
372 
373  }
374 
375  public void OnTriggeInvoked(Action<TriggerEventNotification> notification) {
376  if (OnInvoke != null) {
377  OnInvoke = (Action<TriggerEventNotification>) Delegate.Combine(OnInvoke, notification);
378  } else {
379  OnInvoke = notification;
380  }
381  }
382  }
383 
384  #endregion
385 
386  protected virtual void OnTriggerNotification(IMessageEnvelope envelope) {
387  if (triggerChannels == null)
388  return;
389 
390  lock (triggerChannels) {
391  foreach (var channel in triggerChannels) {
392  }
393  }
394  }
395 
396  private void DispatchTriggerCallbacks() {
397  try {
398  while (CurrentState != ConnectorState.Closed) {
399  var notifications = new List<IMessageEnvelope>();
400 
401  lock (envelopes) {
402  foreach (var envelope in envelopes) {
403  if (envelope.Message is TriggerEventNotification) {
404  notifications.Add(envelope);
405  }
406  }
407 
408  Monitor.PulseAll(envelopes);
409  }
410 
411  foreach (var envelope in notifications) {
412  OnTriggerNotification(envelope);
413  }
414  }
415  } catch {
416 
417  }
418  }
419 
420  #region ClientProcessor
421 
424 
426  this.connector = connector;
427  }
428 
430  var message = envelope.Message;
431  IMessage response = null;
432 
433  if (message is ConnectRequest)
434  response = RequestConnect(envelope);
435  else if (message is AuthenticateRequest)
436  response = RequestAuthenticate(envelope);
437  else if (message is QueryExecuteRequest)
438  response = RequestQueryExecute(envelope);
439  else if (message is QueryResultPartRequest)
440  response = RequestQueryResultPart(envelope);
441  else if (message is DisposeResultRequest)
442  response = RequestDisposeResult(envelope);
443  else if (message is LargeObjectCreateRequest)
444  response = RequestCreateLargeObject(envelope);
445  else if (message is LargeObjectDisposeRequest)
446  response = RequestDisposeLargeObject(envelope);
447  else if (message is TriggerCreateRequest)
448  response = RequestCreateTrigger(envelope);
449  else if (message is BeginRequest)
450  response = RequestBegin(envelope);
451  else if (message is CommitRequest)
452  response = RequestCommit(envelope);
453  else if (message is RollbackRequest)
454  response = RequestRollback(envelope);
455  else if (message is PingRequest)
456  response = Ping(envelope);
457  else if (message is CloseRequest)
458  response = RequestClose(envelope);
459 
460  if (response == null)
461  throw new NotSupportedException();
462 
463  return CreateResponse(envelope.Metadata, response);
464  }
465 
466  private IMessage Ping(IMessageEnvelope envelope) {
467  return Request(envelope);
468  }
469 
471  throw new NotImplementedException();
472  }
473 
475  throw new NotImplementedException();
476  }
477 
479  throw new NotImplementedException();
480  }
481 
483  throw new NotImplementedException();
484  }
485 
487  throw new NotImplementedException();
488  }
489 
491  throw new NotImplementedException();
492  }
493 
495  throw new NotImplementedException();
496  }
497 
499  throw new NotImplementedException();
500  }
501 
503  throw new NotImplementedException();
504  }
505 
507  try {
508  connector.AssertOpen();
509 
510  var response = Request(envelope);
511  var responseMessage = (AuthenticateResponse) response;
512  if (!responseMessage.Authenticated)
513  throw new InvalidOperationException();
514 
515  connector.OnAuthenticated(((AuthenticateRequest)envelope.Message).UserName, responseMessage.TimeStamp);
516  return response;
517  } catch (Exception) {
518 
519  throw;
520  }
521  }
522 
524  try {
525  var response = Request(envelope);
526  var responseMessage = (AcknowledgeResponse) response;
527  if (!responseMessage.State)
528  throw new InvalidOperationException();
529 
530  connector.Close();
531 
532  return response;
533  } catch (Exception) {
534 
535  throw;
536  }
537  }
538 
539  private IMessage Request(IMessageEnvelope envelope) {
540  connector.SendEnvelope(envelope);
541  var response = connector.ReceiveResponse(connector.Timeout, envelope.Metadata);
542  return connector.OnProcessServerResponse(response);
543  }
544 
545  private IMessageEnvelope CreateResponse(IDictionary<string, object> senderMetadata, IMessage response) {
546  return connector.CreateEnvelope(senderMetadata, response);
547  }
548 
550  try {
551  var request = (ConnectRequest) envelope.Message;
552  connector.Timeout = request.Timeout;
553  connector.OpenConnector(request.RemoteEndPoint);
554 
555  var response = Request(envelope);
556  var responseMessage = (ConnectResponse) response;
557  if (!responseMessage.Opened) {
558  connector.Close();
559  throw new InvalidOperationException();
560  }
561 
562  return response;
563  } catch (Exception) {
564  //TODO:
565  throw;
566  }
567  }
568  }
569 
570  #endregion
571  }
572 }
IMessageEnvelope ProcessMessage(IMessageEnvelope envelope)
virtual byte[] SerializeEnvelope(IMessageEnvelope envelope)
virtual bool ShouldReceive(IDictionary< string, object > senderMetadata, IMessageEnvelope envelope)
TriggerEventType
The different types of high layer trigger events.
virtual IMessageEnvelope DeserializeEnvelope(byte[] bytes)
void OnTriggeInvoked(Action< TriggerEventNotification > notification)
void OpenConnector(ConnectionEndPoint remoteEndPoint)
IDictionary< string, object > Metadata
static int ExtractDispatchId(IDictionary< string, object > metadata)
NetworkTriggerChannel(NetworkClientConnector connector, string triggerName, string objectName, TriggerEventType eventType)
ICryptoTransform SelectHashAlgorithm(string name, byte[] key, byte[] iv, FileAccess access)
virtual IMessageEnvelope CreateEnvelope(IDictionary< string, object > metadata, IMessage message)
virtual IMessage OnProcessServerResponse(IMessageEnvelope envelope)
IMessageEnvelope ReceiveResponse(int timeout, IDictionary< string, object > senderMetadata)
IMessageEnvelope CreateResponse(IDictionary< string, object > senderMetadata, IMessage response)
virtual void OnAuthenticated(string username, long timeStamp)
IStreamableObjectChannel CreateObjectChannel(long objectId)
ITriggerChannel CreateTriggerChannel(string triggerName, string objectName, TriggerEventType eventType)
virtual void OnTriggerNotification(IMessageEnvelope envelope)
void SetEncrypton(EncryptionData encryptionData)