17 using System.Collections.Generic;
21 using System.Runtime.Serialization.Formatters.Binary;
22 using System.Security.Cryptography;
29 namespace Deveel.Data.Protocol {
31 private readonly
object channelLock =
new object();
37 envelopeReceiver =
new Thread(ReceiveEnvelopes) {
39 Name =
"DeveelDB Network Client Envelope Receiver",
40 Priority = ThreadPriority.AboveNormal
43 envelopes =
new List<IMessageEnvelope>();
50 protected int Timeout {
get; set; }
61 GC.SuppressFinalize(
this);
64 protected virtual void Dispose(
bool disposing) {
69 if (envelopeReceiver != null) {
71 envelopeReceiver.Abort();
72 envelopeReceiver = null;
81 if (InputStream != null)
82 InputStream.Dispose();
83 if (OutputStream != null)
84 OutputStream.Dispose();
93 private Stream InputStream {
get; set; }
95 private Stream OutputStream {
get; set; }
105 throw new ObjectDisposedException(GetType().AssemblyQualifiedName);
110 throw new InvalidOperationException();
113 protected abstract NetworkStream CreateNetworkStream(
ConnectionEndPoint remoteEndPoint, FileAccess access);
117 CurrentState = newState;
122 RemoteEndPoint = remoteEndPoint;
123 var readStream = CreateNetworkStream(remoteEndPoint, FileAccess.Read);
124 var writeStream = CreateNetworkStream(remoteEndPoint, FileAccess.Write);
126 InputStream =
new BufferedStream(readStream, 1024*3);
127 OutputStream =
new BufferedStream(writeStream, 1024*3);
132 envelopeReceiver.Start();
133 }
catch (Exception ex) {
143 if (envelopeReceiver != null &&
144 envelopeReceiver.ThreadState == ThreadState.Running) {
145 envelopeReceiver.Join(1000);
146 envelopeReceiver = null;
149 if (InputStream != null)
151 if (OutputStream != null)
152 OutputStream.Close();
153 }
catch (Exception) {
168 public abstract ConnectionEndPoint MakeEndPoint(IDictionary<string, object> properties);
175 int dispatchId = ExtractDispatchId(metadata);
177 envelope.IssueDate = DateTime.UtcNow;
182 if (envelope == null)
184 if (envelope.
Error != null)
191 throw new NotImplementedException();
195 throw new NotImplementedException();
200 return new HMACMD5(key);
202 return new HMACSHA256(key);
204 return new HMACSHA512(key);
207 var des =
new DESCryptoServiceProvider();
208 if (access == FileAccess.Read)
209 return des.CreateDecryptor(key, iv);
210 if (access == FileAccess.Write)
211 return des.CreateEncryptor(key, iv);
214 var des =
new TripleDESCryptoServiceProvider();
215 if (access == FileAccess.Read)
216 return des.CreateDecryptor(key, iv);
217 if (access == FileAccess.Write)
218 return des.CreateEncryptor(key, iv);
221 throw new NotSupportedException();
226 var key = Encoding.Unicode.GetBytes(encryptionData.
Key);
227 var iv = Encoding.Unicode.GetBytes(encryptionData.
IV);
228 var readHash = SelectHashAlgorithm(encryptionData.
HashAlgorithm, key, iv, FileAccess.Read);
229 var writeHash = SelectHashAlgorithm(encryptionData.
HashAlgorithm, key, iv, FileAccess.Write);
231 InputStream =
new CryptoStream(InputStream, readHash, CryptoStreamMode.Read);
232 OutputStream =
new CryptoStream(OutputStream, writeHash, CryptoStreamMode.Write);
237 using (var stream =
new MemoryStream()) {
238 var formatter =
new BinaryFormatter();
239 formatter.Serialize(stream, envelope);
241 return stream.ToArray();
247 var bytes = SerializeEnvelope(envelope);
248 OutputStream.Write(bytes, 0, bytes.Length);
249 OutputStream.Flush();
254 using (var stream =
new MemoryStream(bytes,
false)) {
255 var formatter =
new BinaryFormatter();
262 using (var input =
new BinaryReader(InputStream)) {
264 int commandLength = input.ReadInt32();
265 var buf =
new byte[commandLength];
266 input.Read(buf, 0, commandLength);
267 return DeserializeEnvelope(buf);
268 }
catch (Exception) {
277 if (metadata == null || metadata.Count == 0)
288 var senderId = ExtractDispatchId(senderMetadata);
289 var envelopeId = ExtractDispatchId(envelope.
Metadata);
290 return senderId == envelopeId;
294 DateTime timeIn = DateTime.Now;
295 DateTime timeOutHigh = timeIn +
new TimeSpan(((
long) timeout*1000)*TimeSpan.TicksPerMillisecond);
298 if (envelopes == null)
299 throw new DataException(
"Connection to server closed");
302 for (
int i = 0; i < envelopes.Count; ++i) {
303 var envelope = envelopes[i];
304 if (ShouldReceive(senderMetadata, envelope)) {
305 envelopes.RemoveAt(i);
313 DateTime.Now > timeOutHigh) {
319 Monitor.Wait(envelopes, 1000);
320 }
catch (ThreadInterruptedException) {
331 var envelope = ReceiveEnvelope(0);
333 envelopes.Add(envelope);
335 Monitor.PulseAll(envelopes);
338 }
catch (Exception) {
341 object oldEnvelopes = envelopes;
342 lock (oldEnvelopes) {
344 Monitor.PulseAll(oldEnvelopes);
351 #region NetworkTriggerChannel
357 this.connector = connector;
358 TriggerName = triggerName;
359 ObjectName = objectName;
360 EventType = eventType;
363 public string TriggerName {
get;
private set; }
365 public string ObjectName {
get;
private set; }
369 public Action<TriggerEventNotification> OnInvoke {
get;
private set; }
376 if (OnInvoke != null) {
377 OnInvoke = (Action<TriggerEventNotification>) Delegate.Combine(OnInvoke, notification);
379 OnInvoke = notification;
387 if (triggerChannels == null)
390 lock (triggerChannels) {
391 foreach (var channel
in triggerChannels) {
399 var notifications =
new List<IMessageEnvelope>();
402 foreach (var envelope
in envelopes) {
404 notifications.Add(envelope);
408 Monitor.PulseAll(envelopes);
411 foreach (var envelope
in notifications) {
412 OnTriggerNotification(envelope);
420 #region ClientProcessor
426 this.connector = connector;
430 var message = envelope.
Message;
434 response = RequestConnect(envelope);
436 response = RequestAuthenticate(envelope);
438 response = RequestQueryExecute(envelope);
440 response = RequestQueryResultPart(envelope);
442 response = RequestDisposeResult(envelope);
444 response = RequestCreateLargeObject(envelope);
446 response = RequestDisposeLargeObject(envelope);
448 response = RequestCreateTrigger(envelope);
450 response = RequestBegin(envelope);
452 response = RequestCommit(envelope);
454 response = RequestRollback(envelope);
456 response = Ping(envelope);
458 response = RequestClose(envelope);
460 if (response == null)
461 throw new NotSupportedException();
463 return CreateResponse(envelope.
Metadata, response);
467 return Request(envelope);
471 throw new NotImplementedException();
475 throw new NotImplementedException();
479 throw new NotImplementedException();
483 throw new NotImplementedException();
487 throw new NotImplementedException();
491 throw new NotImplementedException();
495 throw new NotImplementedException();
499 throw new NotImplementedException();
503 throw new NotImplementedException();
508 connector.AssertOpen();
510 var response = Request(envelope);
512 if (!responseMessage.Authenticated)
513 throw new InvalidOperationException();
517 }
catch (Exception) {
525 var response = Request(envelope);
527 if (!responseMessage.State)
528 throw new InvalidOperationException();
533 }
catch (Exception) {
540 connector.SendEnvelope(envelope);
541 var response = connector.ReceiveResponse(connector.Timeout, envelope.
Metadata);
542 return connector.OnProcessServerResponse(response);
546 return connector.CreateEnvelope(senderMetadata, response);
552 connector.Timeout = request.Timeout;
553 connector.OpenConnector(request.RemoteEndPoint);
555 var response = Request(envelope);
557 if (!responseMessage.Opened) {
559 throw new InvalidOperationException();
563 }
catch (Exception) {
IMessageEnvelope ProcessMessage(IMessageEnvelope envelope)
virtual byte[] SerializeEnvelope(IMessageEnvelope envelope)
NetworkClientConnector connector
IMessage RequestQueryResultPart(IMessageEnvelope envelope)
virtual void OnConnectorOpen()
virtual bool ShouldReceive(IDictionary< string, object > senderMetadata, IMessageEnvelope envelope)
TriggerEventType
The different types of high layer trigger events.
virtual IMessageEnvelope DeserializeEnvelope(byte[] bytes)
IMessage RequestCreateTrigger(IMessageEnvelope envelope)
void ChangeState(ConnectorState newState)
void OnTriggeInvoked(Action< TriggerEventNotification > notification)
IMessage RequestCommit(IMessageEnvelope envelope)
void OpenConnector(ConnectionEndPoint remoteEndPoint)
readonly NetworkClientConnector connector
IMessageEnvelope ReceiveEnvelope(int timeout)
IMessage RequestDisposeLargeObject(IMessageEnvelope envelope)
IDictionary< string, object > Metadata
void DispatchTriggerCallbacks()
IMessage RequestQueryExecute(IMessageEnvelope envelope)
virtual void Dispose(bool disposing)
IMessage Ping(IMessageEnvelope envelope)
static int ExtractDispatchId(IDictionary< string, object > metadata)
NetworkTriggerChannel(NetworkClientConnector connector, string triggerName, string objectName, TriggerEventType eventType)
IMessage RequestClose(IMessageEnvelope envelope)
ICryptoTransform SelectHashAlgorithm(string name, byte[] key, byte[] iv, FileAccess access)
virtual IMessageEnvelope CreateEnvelope(IDictionary< string, object > metadata, IMessage message)
IMessage RequestDisposeResult(IMessageEnvelope envelope)
IMessage RequestConnect(IMessageEnvelope envelope)
IMessage RequestBegin(IMessageEnvelope envelope)
virtual IMessage OnProcessServerResponse(IMessageEnvelope envelope)
IMessageEnvelope ReceiveResponse(int timeout, IDictionary< string, object > senderMetadata)
IMessage RequestRollback(IMessageEnvelope envelope)
List< NetworkTriggerChannel > triggerChannels
virtual IMessageProcessor CreateProcessor()
IMessageEnvelope CreateResponse(IDictionary< string, object > senderMetadata, IMessage response)
virtual void OnAuthenticated(string username, long timeStamp)
IMessage Request(IMessageEnvelope envelope)
List< IMessageEnvelope > envelopes
ClientProcessor(NetworkClientConnector connector)
IStreamableObjectChannel CreateObjectChannel(long objectId)
void SendEnvelope(IMessageEnvelope envelope)
ITriggerChannel CreateTriggerChannel(string triggerName, string objectName, TriggerEventType eventType)
~NetworkClientConnector()
IMessage RequestAuthenticate(IMessageEnvelope envelope)
virtual void OnTriggerNotification(IMessageEnvelope envelope)
void SetEncrypton(EncryptionData encryptionData)
IMessage RequestCreateLargeObject(IMessageEnvelope envelope)