DeveelDB  20151217
complete SQL database system, primarly developed for .NET/Mono frameworks
ServerConnector.old.cs
Go to the documentation of this file.
1 //
2 // Copyright 2010-2014 Deveel
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 
16 using System;
17 using System.Collections.Generic;
18 using System.IO;
19 
21 using Deveel.Data.Control;
22 using Deveel.Data.DbSystem;
23 using Deveel.Data.Routines;
24 using Deveel.Data.Sql;
25 using Deveel.Data.Threading;
27 using Deveel.Diagnostics;
28 
29 namespace Deveel.Data.Protocol {
30  public abstract class ServerConnector : IServerConnector {
31  private readonly Dictionary<long, IRef> blobIdMap;
32 
33  private bool autoCommit;
34  private bool ignoreIdentifiersCase;
35  private ParameterStyle parameterStyle;
36 
37  private int triggerId;
38  private Dictionary<int, TriggerChannel> triggerChannels;
39  private readonly object triggerLock = new object();
40 
41  protected ServerConnector(IDatabaseHandler handler) {
42  if (handler == null)
43  throw new ArgumentNullException("handler");
44 
45  DatabaseHandler = handler;
46  resultMap = new Dictionary<int, QueryResult>();
47  blobIdMap = new Dictionary<long, IRef>();
48  uniqueResultId = 1;
49  }
50 
51  public AuthenticatedSession Session { get; protected set; }
52 
53  public abstract ConnectionEndPoint LocalEndPoint { get; }
54 
55  public ConnectionEndPoint RemoteEndPoint { get; private set; }
56 
57  public ConnectorState CurrentState { get; private set; }
58 
59  protected IDatabaseHandler DatabaseHandler { get; private set; }
60 
61  protected ILogger Logger {
62  get {
63  if (Database == null)
64  return new EmptyLogger();
65 
66  return Database.Context.Logger;
67  }
68  }
69 
70  protected IDatabase Database { get; private set; }
71 
72  private void AssertNotDisposed() {
73  if (CurrentState == ConnectorState.Disposed)
74  throw new ObjectDisposedException(GetType().AssemblyQualifiedName);
75  }
76 
77  private void AssertAuthenticated() {
78  if (CurrentState != ConnectorState.Authenticated)
79  throw new InvalidOperationException("The connector is not authenticated.");
80  }
81 
82  protected void ChangeState(ConnectorState newState) {
83  AssertNotDisposed();
84  CurrentState = newState;
85  }
86 
87  protected void OpenConnector(ConnectionEndPoint remoteEndPoint, string databaseName) {
88  try {
89  RemoteEndPoint = remoteEndPoint;
90  Database = DatabaseHandler.GetDatabase(databaseName);
91  if (Database == null)
92  throw new DatabaseException();
93 
94  OnConnectorOpen();
95  ChangeState(ConnectorState.Open);
96  } catch (Exception ex) {
97  Logger.Error(this, "Error when opening the connector.");
98  Logger.Error(this, ex);
99  throw;
100  }
101  }
102 
103  protected virtual void OnConnectorOpen() {
104  }
105 
106  protected void SetAutoCommit(bool state) {
107  AssertNotDisposed();
108  autoCommit = state;
109  }
110 
111  protected void SetIgnoreIdentifiersCase(bool state) {
112  AssertNotDisposed();
113  ignoreIdentifiersCase = state;
114  }
115 
116  protected void SetParameterStyle(ParameterStyle style) {
117  AssertNotDisposed();
118  parameterStyle = style;
119  }
120 
121  protected void CloseConnector() {
122  try {
123  OnCloseConnector();
124  } catch (Exception ex) {
125  Logger.Error(this, "Error when closing the connector.");
126  Logger.Error(this, ex);
127  } finally {
128  ChangeState(ConnectorState.Closed);
129  }
130  }
131 
132  protected virtual void OnCloseConnector() {
133 
134  }
135 
136  protected virtual EncryptionData GetEncryptionData() {
137  return null;
138  }
139 
140  protected virtual AuthenticatedSession OnAuthenticate(string defaultSchema, string username, string password) {
141  var user = Database.AuthenticateUser(username, password, RemoteEndPoint);
142 
143  if (user == null)
144  return null;
145 
146  IDatabaseConnection connection = Database.CreateNewConnection(user, OnTriggerFired);
147 
148  // Put the connection in exclusive mode
149  LockingMechanism locker = connection.LockingMechanism;
150  locker.SetMode(LockingMode.Exclusive);
151 
152  try {
153  // By default, connections are auto-commit
154  connection.AutoCommit = true;
155 
156  // Set the default schema for this connection if it exists
157  if (connection.SchemaExists(defaultSchema)) {
158  connection.SetDefaultSchema(defaultSchema);
159  } else {
160  Logger.WarningFormat(this, "Couldn't change to '{0}' schema.", defaultSchema);
161 
162  // If we can't change to the schema then change to the APP schema
163  connection.SetDefaultSchema(ConfigDefaultValues.DefaultSchema);
164  }
165  } finally {
166  try {
167  connection.Commit();
168  } catch (TransactionException e) {
169  // Just issue a warning...
170  Logger.Warning(this, e);
171  } finally {
172  // Guarentee that we unluck from EXCLUSIVE
173  locker.FinishMode(LockingMode.Exclusive);
174  }
175  }
176 
177  return new AuthenticatedSession(user, connection);
178  }
179 
180  protected virtual void OnTriggerFired(string triggerName, string triggerSource, TriggerEventType eventType, int count) {
181  lock (triggerChannels) {
182  foreach (var channel in triggerChannels.Values) {
183  if (channel.ShouldNotify(triggerName, triggerSource, eventType))
184  channel.Notify(triggerName, triggerSource, eventType, count);
185  }
186  }
187  }
188 
189  protected int BeginTransaction() {
190  AssertNotDisposed();
191 
192  // TODO: In a future version, we will provide multiple transactions.
193  // for the moment we only set the current connection not to auto-commit
194  // that will require an explicit commit.
195  Session.Connection.AutoCommit = false;
196  return -1;
197  }
198 
199  protected virtual bool Authenticate(string defaultSchema, string username, string password) {
200  if (CurrentState == ConnectorState.Authenticated &&
201  Session != null)
202  throw new InvalidOperationException("Already authenticated.");
203 
204  if (Logger.IsInterestedIn(LogLevel.Debug)) {
205  // Output the instruction to the _queries log.
206  Logger.DebugFormat(this, "[CLIENT] [{0}] - Log in", username);
207  }
208 
209  if (Logger.IsInterestedIn(LogLevel.Info)) {
210  Logger.InfoFormat(this, "Authenticate User: {0}", username);
211  }
212 
213  try {
214  Session = OnAuthenticate(defaultSchema, username, password);
215  if (Session == null)
216  return false;
217 
218  Session.Connection.AutoCommit = autoCommit;
219  Session.Connection.IsInCaseInsensitiveMode = ignoreIdentifiersCase;
220  Session.Connection.ParameterStyle = parameterStyle;
221 
222  ChangeState(ConnectorState.Authenticated);
223 
224  return true;
225  } catch (Exception e) {
226  // TODO: throw server error
227  throw;
228  }
229  }
230 
231  protected long CreateStreamableObject(ReferenceType referenceType, long length) {
232  lock (blobIdMap) {
233  try {
234  var obj = Session.Connection.CreateLargeObject(referenceType, length);
235  blobIdMap[obj.Id] = obj;
236  return obj.Id;
237  } catch (Exception ex) {
238  Logger.ErrorFormat(this, "A request to create an object of type {0} with length {1} caused and error.", referenceType, length);
239  Logger.Error(this, ex);
240  throw;
241  }
242  }
243  }
244 
245  private IRef GetObjectRef(long objectId) {
246  lock (blobIdMap) {
247  IRef obj;
248  if (!blobIdMap.TryGetValue(objectId, out obj)) {
249  obj = Session.Connection.GetLargeObject(objectId);
250  blobIdMap[objectId] = obj;
251  }
252 
253  return obj;
254  }
255  }
256 
257  protected IQueryResponse[] CoreExecuteQuery(string text, IEnumerable<SqlQueryParameter> parameters) {
258  // Record the Query start time
259  DateTime startTime = DateTime.Now;
260 
261  // Where Query result eventually resides.
262  int resultId = -1;
263 
264  // For each StreamableObject in the query object, translate it to a
265  // IRef object that presumably has been pre-pushed onto the server from
266  // the client.
267 
268  // Evaluate the sql Query.
269  var query = new SqlQuery(text);
270  if (parameters != null) {
271  foreach (var parameter in parameters) {
272  var preparedParam = parameter.Value;
273  if (preparedParam is StreamableObject) {
274  var obj = (StreamableObject) preparedParam;
275  IRef objRef = CompleteStream(obj.Identifier);
276  preparedParam = objRef;
277  }
278  query.Parameters.Add(new SqlQueryParameter(parameter.Name, preparedParam));
279  }
280  }
281 
282  Table[] results = SqlQueryExecutor.Execute(Session.Connection, query);
283  var responses = new IQueryResponse[results.Length];
284  int j = 0;
285 
286  foreach (Table result in results) {
287  QueryResult queryResult;
288  try {
289  // Put the result in the result cache... This will Lock this object
290  // until it is removed from the result set cache. Returns an id that
291  // uniquely identifies this result set in future communication.
292  // NOTE: This locks the roots of the table so that its contents
293  // may not be altered.
294  queryResult = new QueryResult(query, result);
295  resultId = AddResult(queryResult);
296  } catch (Exception e) {
297  // If resultId set, then dispose the result set.
298  if (resultId != -1)
299  DisposeResult(resultId);
300 
301  throw;
302  }
303 
304  // The time it took the Query to execute.
305  TimeSpan taken = DateTime.Now - startTime;
306 
307  // Return the Query response
308  responses[j] = new QueryResponse(resultId, queryResult, (int) taken.TotalMilliseconds, "");
309  j++;
310  }
311 
312  return responses;
313  }
314 
315  protected virtual IQueryResponse[] ExecuteQuery(string text, IEnumerable<SqlQueryParameter> parameters) {
316  // Log this Query if Query logging is enabled
317  if (Logger.IsInterestedIn(LogLevel.Debug)) {
318  // Output the instruction to the _queries log.
319  Logger.DebugFormat(this, "[CLIENT] [{0}] - Query: {1}", Session.User.UserName, text);
320  }
321 
322  // Write debug message (Info level)
323  if (Logger.IsInterestedIn(LogLevel.Debug)) {
324  Logger.DebugFormat(this, "Query From User: {0}", Session.User.UserName);
325  Logger.DebugFormat(this, "Query: {0}", text.Trim());
326  }
327 
328  // Get the locking mechanism.
329  LockingMechanism locker = Session.Connection.LockingMechanism;
330  LockingMode lockMode = LockingMode.None;
331  IQueryResponse[] response = null;
332 
333  try {
334  try {
335  // For simplicity - all database locking is now exclusive inside
336  // a transaction. This means it is not possible to execute
337  // queries concurrently inside a transaction. However, we are
338  // still able to execute queries concurrently from different
339  // connections.
340  //
341  // It's debatable whether we even need to perform this Lock anymore
342  // because we could change the contract of this method so that
343  // it is not thread safe. This would require that the callee ensures
344  // more than one thread can not execute queries on the connection.
345  lockMode = LockingMode.Exclusive;
346  locker.SetMode(lockMode);
347 
348  // Execute the Query (behaviour for this comes from super).
349  response = CoreExecuteQuery(text, parameters);
350 
351  // Return the result.
352  return response;
353 
354  } finally {
355  try {
356  // This is executed no matter what happens. Very important we
357  // unlock the tables.
358  if (lockMode != LockingMode.None) {
359  locker.FinishMode(lockMode);
360  }
361  } catch (Exception e) {
362  // If this throws an exception, we should output it to the debug
363  // log and screen.
364  Logger.Error(this, "Exception finishing locks");
365  Logger.Error(this, e);
366  // Note, we can't throw an error here because we may already be in
367  // an exception that happened in the above 'try' block.
368  }
369  }
370  } finally {
371  // This always happens after tables are unlocked.
372  // Also guarenteed to happen even if something fails.
373 
374  // If we are in auto-commit mode then commit the Query here.
375  // Do we auto-commit?
376  if (Session.Connection.AutoCommit) {
377  // Yes, so grab an exclusive Lock and auto-commit.
378  try {
379  // Lock into exclusive mode.
380  locker.SetMode(LockingMode.Exclusive);
381  // If an error occured then roll-back
382  if (response == null) {
383  // Rollback.
384  Session.Connection.Rollback();
385  } else {
386  try {
387  // Otherwise commit.
388  Session.Connection.Commit();
389  } catch (Exception e) {
390  foreach (IQueryResponse queryResponse in response) {
391  // Dispose this response if the commit failed.
392  DisposeResult(queryResponse.ResultId);
393  }
394 
395  // And throw the SQL Exception
396  throw;
397  }
398  }
399  } finally {
400  locker.FinishMode(LockingMode.Exclusive);
401  }
402  }
403  }
404  }
405 
406  private readonly Dictionary<int, QueryResult> resultMap;
407 
408  private int uniqueResultId;
409 
410  private int AddResult(QueryResult result) {
411  // Lock the roots of the result set.
412  result.LockRoot(-1); // -1 because lock_key not implemented
413 
414  // Make a new result id
415  int resultId;
416  // This ensures this block can handle concurrent updates.
417  lock (resultMap) {
418  resultId = ++uniqueResultId;
419  // Add the result to the map.
420  resultMap[resultId] = result;
421  }
422 
423  return resultId;
424  }
425 
426  private QueryResult GetResult(int resultId) {
427  lock (resultMap) {
428  QueryResult result;
429  return resultMap.TryGetValue(resultId, out result) ? result : null;
430  }
431  }
432 
433  protected QueryResultPart GetResultPart(int resultId, int startRow, int countRows) {
434  AssertNotDisposed();
435 
436  QueryResult table = GetResult(resultId);
437  if (table == null)
438  throw new DatabaseException("'resultId' invalid.");
439 
440  int rowEnd = startRow + countRows;
441 
442  if (startRow < 0 || startRow >= table.RowCount ||
443  rowEnd > table.RowCount) {
444  throw new DatabaseException("Result part out of range.");
445  }
446 
447  try {
448  int colCount = table.ColumnCount;
449  var block = new QueryResultPart(colCount);
450  for (int r = startRow; r < rowEnd; ++r) {
451  var row = new object[colCount];
452  for (int c = 0; c < colCount; ++c) {
453  TObject value = table.GetCellContents(c, r);
454 
455  // If this is a IRef, we must assign it a streamable object
456  // id that the client can use to access the large object.
457  object clientOb;
458  if (value.Object is IRef) {
459  var reference = (IRef) value.Object;
460  clientOb = new StreamableObject(reference.Type, reference.RawSize, reference.Id);
461  } else {
462  clientOb = value.Object;
463  }
464 
465  row[c] = clientOb;
466  }
467 
468  block.AddRow(row);
469  }
470  return block;
471  } catch (Exception e) {
472  Logger.Warning(this, e);
473  // If an exception was generated while getting the cell contents, then
474  // throw an DataException.
475  throw new DatabaseException("Exception while reading results: " + e.Message, e);
476  }
477  }
478 
479  protected void DisposeResult(int resultId) {
480  // Remove this entry.
481  QueryResult result;
482  lock (resultMap) {
483  if (resultMap.TryGetValue(resultId, out result))
484  resultMap.Remove(resultId);
485  }
486  if (result != null) {
487  result.Dispose();
488  } else {
489  Logger.Error(this, "Attempt to dispose invalid 'resultId'.");
490  }
491  }
492 
493  private void ClearResults() {
494  List<int> keys;
495  lock (resultMap) {
496  keys = new List<int>(resultMap.Keys);
497  }
498 
499  foreach (int resultId in keys) {
500  DisposeResult(resultId);
501  }
502  }
503 
504  protected void CommitTransaction(int transactionId) {
505  AssertNotDisposed();
506 
507  try {
508  Session.Connection.Commit();
509  } finally {
510  Session.Connection.AutoCommit = autoCommit;
511  }
512  }
513 
514  protected void RollbackTransaction(int transactionId) {
515  AssertNotDisposed();
516 
517  try {
518  Session.Connection.Rollback();
519  } finally {
520  Session.Connection.AutoCommit = autoCommit;
521  }
522  }
523 
524  public abstract ConnectionEndPoint MakeEndPoint(IDictionary<string, object> properties);
525 
527  return new ServerMessageProcessor(this);
528  }
529 
530  protected abstract IServerMessageEnvelope CreateEnvelope(IDictionary<string, object> metadata, IMessage message);
531 
532  IMessageEnvelope IConnector.CreateEnvelope(IDictionary<string, object> metadata, IMessage message) {
533  return CreateEnvelope(metadata, message);
534  }
535 
536  protected virtual IMessage GetMessage(IMessageEnvelope envelope) {
537  if (envelope == null)
538  return null;
539 
540  // TODO: handle errors? it's not supposed the client to send errors to the server ...
541 
542  return envelope.Message;
543  }
544 
545  IStreamableObjectChannel IConnector.CreateObjectChannel(long objectId) {
546  return CreateObjectChannel(objectId);
547  }
548 
549  protected virtual IStreamableObjectChannel CreateObjectChannel(long objectId) {
550  var obj = GetObjectRef(objectId);
551  if (obj == null)
552  throw new InvalidOperationException("The object was not created or was not found.");
553 
554  return new DirectStreamableObjectChannel(this, obj);
555  }
556 
557  private void DisposeChannel(long objId) {
558  lock (blobIdMap) {
559  blobIdMap.Remove(objId);
560  }
561  }
562 
563  private IRef CompleteStream(long objId) {
564  lock (blobIdMap) {
565  var objRef = GetObjectRef(objId);
566  if (objRef == null)
567  throw new InvalidOperationException();
568 
569  blobIdMap.Remove(objId);
570  objRef.Complete();
571  return objRef;
572  }
573  }
574 
575  ITriggerChannel IConnector.CreateTriggerChannel(string triggerName, string objectName, TriggerEventType eventType) {
576  return CreateTriggerChannel(triggerName, objectName, eventType);
577  }
578 
579  protected virtual ITriggerChannel CreateTriggerChannel(string triggerName, string objectName, TriggerEventType eventType) {
580  AssertAuthenticated();
581 
582  lock (triggerLock) {
583  if (triggerChannels == null)
584  triggerChannels = new Dictionary<int, TriggerChannel>();
585 
586  foreach (TriggerChannel channel in triggerChannels.Values) {
587  // If there's an open channel for the trigger return it
588  if (channel.ShouldNotify(triggerName, objectName, eventType))
589  return channel;
590  }
591 
592  int id = ++triggerId;
593  var newChannel = new TriggerChannel(this, id, triggerName, objectName, eventType);
594  triggerChannels[id] = newChannel;
595  return newChannel;
596  }
597  }
598 
599  public void Dispose() {
600  Dispose(true);
601  GC.SuppressFinalize(this);
602  }
603 
604  protected virtual void Dispose(bool disposing) {
605  if (disposing) {
606  // Clear the result set mapping
607  ClearResults();
608 
609  if (Session != null)
610  Session.Dispose();
611  }
612 
613  ChangeState(ConnectorState.Disposed);
614  }
615 
616  #region QueryResponse
617 
618  private sealed class QueryResponse : IQueryResponse {
619  private readonly QueryResult result;
620 
621  internal QueryResponse(int resultId, QueryResult result, int queryTime, string warnings) {
622  ResultId = resultId;
623  this.result = result;
624  QueryTimeMillis = queryTime;
625  Warnings = warnings;
626  }
627 
628  public int ResultId { get; private set; }
629 
630  public int QueryTimeMillis { get; private set; }
631 
632  public int RowCount {
633  get { return result.RowCount; }
634  }
635 
636  public int ColumnCount {
637  get { return result.ColumnCount; }
638  }
639 
641  return result.Fields[n];
642  }
643 
644  public string Warnings { get; private set; }
645  }
646 
647  #endregion
648 
649  #region DirectStreamableObjectChannel
650 
651  private class DirectStreamableObjectChannel : IStreamableObjectChannel {
652  private readonly IRef obj;
653  private readonly ServerConnector connector;
654 
655  public DirectStreamableObjectChannel(ServerConnector connector, IRef obj) {
656  this.obj = obj;
657  this.connector = connector;
658  }
659 
660  public void Dispose() {
661  connector.DisposeChannel(obj.Id);
662  }
663 
664  public void PushData(long offset, byte[] buffer, int length) {
665  obj.Write(offset, buffer, length);
666  }
667 
668  public byte[] ReadData(long offset, int length) {
669  if (length > 512 * 1024)
670  throw new DatabaseException("Request length exceeds 512 KB");
671 
672  try {
673  // Read the blob part into the byte array.
674  var blobPart = new byte[length];
675  obj.Read(offset, blobPart, length);
676 
677  // And return as a StreamableObjectPart object.
678  return blobPart;
679  } catch (IOException e) {
680  throw new DatabaseException("Exception while reading blob: " + e.Message, e);
681  }
682  }
683  }
684 
685  #endregion
686 
687  #region ServerMessageProcessor
688 
689  private class ServerMessageProcessor : IMessageProcessor {
690  private readonly ServerConnector connector;
691 
693  this.connector = connector;
694  }
695 
696  private IMessageEnvelope CreateErrorResponse(IMessageEnvelope sourceMessage, string message) {
697  return CreateErrorResponse(sourceMessage, new ProtocolException(message));
698  }
699 
700  private IMessageEnvelope CreateErrorResponse(IMessageEnvelope sourceMessage, Exception error) {
701  IDictionary<string, object> metadata = null;
702  if (sourceMessage != null)
703  metadata = sourceMessage.Metadata;
704 
705  return CreateErrorResponse(metadata, error);
706  }
707 
708  private IMessageEnvelope CreateErrorResponse(IDictionary<string, object> metadata, Exception error) {
709  var envelope = connector.CreateEnvelope(metadata, new AcknowledgeResponse(false));
710  envelope.SetError(error);
711  return envelope;
712  }
713 
714  private IMessageEnvelope ProcessAuthenticate(IDictionary<string, object> metadata, AuthenticateRequest request) {
715  try {
716  if (!connector.Authenticate(request.DefaultSchema, request.UserName, request.Password)) {
717  var response = connector.CreateEnvelope(metadata, new AuthenticateResponse(false, -1));
718  // TODO: make the specialized exception ...
719  response.SetError(new Exception("Unable to authenticate."));
720  return response;
721  }
722 
723  connector.ChangeState(ConnectorState.Authenticated);
724 
725  // TODO: Get the UNIX epoch here?
726  return connector.CreateEnvelope(metadata, new AuthenticateResponse(true, DateTime.UtcNow.Ticks));
727  } catch (Exception ex) {
728  return CreateErrorResponse(metadata, ex);
729  }
730  }
731 
733  var metadata = envelope.Metadata;
734  var message = connector.GetMessage(envelope);
735  if (message == null)
736  return CreateErrorResponse(metadata, new Exception("No message found in the envelope."));
737 
738  if (message is ConnectRequest)
739  return ProcessConnect(metadata, (ConnectRequest) message);
740 
741  if (message is AuthenticateRequest)
742  return ProcessAuthenticate(metadata, (AuthenticateRequest) message);
743 
744  if (message is QueryExecuteRequest)
745  return ProcessQuery(metadata, (QueryExecuteRequest) message);
746  if (message is QueryResultPartRequest)
747  return ProcessQueryPart(metadata, (QueryResultPartRequest) message);
748  if (message is DisposeResultRequest)
749  return ProcessDisposeResult(metadata, (DisposeResultRequest) message);
750 
751  if (message is LargeObjectCreateRequest)
752  return ProcessCreateLargeObject(metadata, (LargeObjectCreateRequest) message);
753 
754  if (message is BeginRequest)
755  return ProcessBegin(metadata);
756  if (message is CommitRequest)
757  return ProcessCommit(metadata, (CommitRequest)message);
758  if (message is RollbackRequest)
759  return ProcessRollback(metadata, (RollbackRequest)message);
760 
761  if (message is CloseRequest)
762  return ProcessClose(metadata);
763 
764  return CreateErrorResponse(envelope, "Message not supported");
765  }
766 
767  private IMessageEnvelope ProcessConnect(IDictionary<string, object> metadata, ConnectRequest request) {
768  Exception error = null;
769  ConnectResponse response;
770 
771  try {
772  connector.OpenConnector(request.RemoteEndPoint, request.DatabaseName);
773  if (request.AutoCommit)
774  connector.SetAutoCommit(request.AutoCommit);
775 
776  connector.SetIgnoreIdentifiersCase(request.IgnoreIdentifiersCase);
777  connector.SetParameterStyle(request.ParameterStyle);
778 
779  var encryptionData = connector.GetEncryptionData();
780 
781  var serverVersion = connector.Database.Version.ToString(2);
782  response = new ConnectResponse(true, serverVersion, encryptionData != null, encryptionData);
783  } catch (Exception ex) {
784  connector.Logger.Error(connector, "Error while opening a connection.");
785  connector.Logger.Error(connector, ex);
786 
787  error = ex;
788  response = new ConnectResponse(false, null);
789  }
790 
791  var envelope = connector.CreateEnvelope(metadata, response);
792  if (error != null)
793  envelope.SetError(error);
794 
795  return connector.CreateEnvelope(metadata, response);
796  }
797 
798  private IMessageEnvelope ProcessClose(IDictionary<string, object> metadata) {
799  try {
800  connector.AssertNotDisposed();
801  connector.AssertAuthenticated();
802 
803  connector.CloseConnector();
804  return connector.CreateEnvelope(metadata, new AcknowledgeResponse(true));
805  } catch (Exception ex) {
806  connector.Logger.Error(connector, "Error while closing a connection.");
807  connector.Logger.Error(connector, ex);
808  return CreateErrorResponse(metadata, ex);
809  }
810  }
811 
812  private IMessageEnvelope ProcessQuery(IDictionary<string, object> metadata, QueryExecuteRequest request) {
813  try {
814  connector.AssertNotDisposed();
815  connector.AssertAuthenticated();
816 
817  // TODO: use the timeout ...
818  var queryResonse = connector.ExecuteQuery(request.Query.Text, request.Query.Parameters);
819  return connector.CreateEnvelope(metadata, new QueryExecuteResponse(queryResonse));
820  } catch (Exception ex) {
821  connector.Logger.Error(connector, "Error while processing a query request.");
822  connector.Logger.Error(connector, ex);
823  return CreateErrorResponse(metadata, ex);
824  }
825  }
826 
827  private IMessageEnvelope ProcessQueryPart(IDictionary<string, object> metadata, QueryResultPartRequest request) {
828  try {
829  connector.AssertNotDisposed();
830  connector.AssertAuthenticated();
831 
832  var part = connector.GetResultPart(request.ResultId, request.RowIndex, request.Count);
833  return connector.CreateEnvelope(metadata, new QueryResultPartResponse(request.ResultId, part));
834  } catch (Exception ex) {
835  connector.Logger.Error(connector, "Error while requesting part of a query result.");
836  connector.Logger.Error(connector, ex);
837  throw;
838  }
839  }
840 
841  private IMessageEnvelope ProcessDisposeResult(IDictionary<string, object> metadata, DisposeResultRequest request) {
842  try {
843  connector.AssertNotDisposed();
844  connector.AssertAuthenticated();
845 
846  connector.DisposeResult(request.ResultId);
847  return connector.CreateEnvelope(metadata, new AcknowledgeResponse(true));
848  } catch (Exception ex) {
849  connector.Logger.Error(connector, "Error occurred while disposing a query result.");
850  connector.Logger.Error(connector, ex);
851  return CreateErrorResponse(metadata, ex);
852  }
853  }
854 
855  private IMessageEnvelope ProcessCreateLargeObject(IDictionary<string, object> metadata,
856  LargeObjectCreateRequest request) {
857  try {
858  connector.AssertNotDisposed();
859  connector.AssertAuthenticated();
860 
861  var objRef = connector.CreateStreamableObject(request.ReferenceType, request.ObjectLength);
862  return connector.CreateEnvelope(metadata,
863  new LargeObjectCreateResponse(request.ReferenceType, request.ObjectLength, objRef));
864  } catch (Exception ex) {
865  connector.Logger.Error(connector, "Error while creating a large object.");
866  connector.Logger.Error(connector, ex);
867  return CreateErrorResponse(metadata, ex);
868  }
869  }
870 
871  private IMessageEnvelope ProcessBegin(IDictionary<string, object> metadata) {
872  try {
873  connector.AssertNotDisposed();
874  connector.AssertAuthenticated();
875 
876  var id = connector.BeginTransaction();
877  return connector.CreateEnvelope(metadata, new BeginResponse(id));
878  } catch (Exception ex) {
879  connector.Logger.Error(connector, "Error while beginning a transaction.");
880  connector.Logger.Error(connector, ex);
881  return CreateErrorResponse(metadata, ex);
882  }
883  }
884 
885  private IMessageEnvelope ProcessCommit(IDictionary<string, object> metadata, CommitRequest request) {
886  try {
887  connector.AssertNotDisposed();
888  connector.AssertAuthenticated();
889 
890  connector.CommitTransaction(request.TransactionId);
891  return connector.CreateEnvelope(metadata, new AcknowledgeResponse(true));
892  } catch (Exception ex) {
893  connector.Logger.Error(connector, "Error while committing the transaction.");
894  connector.Logger.Error(connector, ex);
895  return CreateErrorResponse(metadata, ex);
896  }
897  }
898 
899  private IMessageEnvelope ProcessRollback(IDictionary<string, object> metadata, RollbackRequest request) {
900  try {
901  connector.AssertNotDisposed();
902  connector.AssertAuthenticated();
903 
904  connector.RollbackTransaction(request.TransactionId);
905  return connector.CreateEnvelope(metadata, new AcknowledgeResponse(true));
906  } catch (Exception ex) {
907  connector.Logger.Error(connector, "Error while rolling-back the transaction.");
908  connector.Logger.Error(connector, ex);
909  return CreateErrorResponse(metadata, ex);
910  }
911  }
912  }
913 
914  #endregion
915 
916  #region TriggerChannel
917 
919  private readonly ServerConnector connector;
920  private readonly long id;
921 
922  private string TriggerName { get; set; }
923 
924  private string ObjectName { get; set; }
925 
926  private TriggerEventType EventType { get; set; }
927 
928  private Action<TriggerEventNotification> callback;
929 
930  public TriggerChannel(ServerConnector connector, long id, string triggerName, string objectName, TriggerEventType eventType) {
931  this.connector = connector;
932  this.id = id;
933  TriggerName = triggerName;
934  ObjectName = objectName;
935  EventType = eventType;
936  }
937 
938  public bool ShouldNotify(string triggerName, string objectName, TriggerEventType eventType) {
939  if (!String.Equals(triggerName, TriggerName, StringComparison.OrdinalIgnoreCase))
940  return false;
941 
942  return (eventType & EventType) != 0;
943  }
944 
945  public void Dispose() {
946  Dispose(true);
947  GC.SuppressFinalize(this);
948  }
949 
950  private void Dispose(bool disposing) {
951  if (disposing) {
952  connector.DisposeTriggerChannel(id);
953  }
954  }
955 
956  public void OnTriggeInvoked(Action<TriggerEventNotification> notification) {
957  callback = notification;
958  }
959 
960  public void Notify(string triggerName, string triggerSource, TriggerEventType eventType, int count) {
961  if (callback != null)
962  callback(new TriggerEventNotification(triggerName, triggerSource, TriggerType.Callback, eventType, count));
963  }
964  }
965 
966  private void DisposeTriggerChannel(long id) {
967  throw new NotImplementedException();
968  }
969 
970  #endregion
971  }
972 }
IMessageProcessor CreateProcessor()
virtual IQueryResponse[] ExecuteQuery(string text, IEnumerable< SqlQueryParameter > parameters)
IMessageEnvelope CreateErrorResponse(IDictionary< string, object > metadata, Exception error)
IMessageEnvelope ProcessQueryPart(IDictionary< string, object > metadata, QueryResultPartRequest request)
Dictionary< int, TriggerChannel > triggerChannels
int ColumnCount
Returns the column count.
Definition: QueryResult.cs:98
IQueryResponse[] CoreExecuteQuery(string text, IEnumerable< SqlQueryParameter > parameters)
IMessageEnvelope ProcessCommit(IDictionary< string, object > metadata, CommitRequest request)
TriggerEventType
The different types of high layer trigger events.
LockingMode
The mode applied to a lock over a resource during a transaction.
Definition: LockingMode.cs:24
IMessageEnvelope CreateErrorResponse(IMessageEnvelope sourceMessage, Exception error)
void Notify(string triggerName, string triggerSource, TriggerEventType eventType, int count)
IMessageEnvelope ProcessAuthenticate(IDictionary< string, object > metadata, AuthenticateRequest request)
QueryResultPart GetResultPart(int resultId, int startRow, int countRows)
IMessageEnvelope ProcessBegin(IDictionary< string, object > metadata)
The response to a command executed via the IDatabaseInterface.ExecuteQuery method in the IDatabaseInt...
void Rollback()
Rolls-back all the modifications made by the user in this session
Definition: Session.cs:213
The default implementation of a database in a system.
Definition: Database.cs:38
readonly Dictionary< int, QueryResult > resultMap
IMessageEnvelope CreateErrorResponse(IMessageEnvelope sourceMessage, string message)
Describes the name of an object within a database.
Definition: ObjectName.cs:44
int RowCount
Returns the row count.
Definition: QueryResult.cs:94
LogLevel
The level listened by a diagnostic logger
Definition: LogLevel.cs:23
IDictionary< string, object > Metadata
void RollbackTransaction(int transactionId)
The representation of a single database in the system.
Definition: IDatabase.cs:40
IContext IEventSource. Context
Definition: Database.cs:95
virtual void OnTriggerFired(string triggerName, string triggerSource, TriggerEventType eventType, int count)
IMessageEnvelope CreateEnvelope(IDictionary< string, object > metadata, IMessage message)
This is a session that is constructed around a given user and a transaction, to the given database...
Definition: Session.cs:32
IMessageEnvelope ProcessCreateLargeObject(IDictionary< string, object > metadata, LargeObjectCreateRequest request)
void OpenConnector(ConnectionEndPoint remoteEndPoint, string databaseName)
void OnTriggeInvoked(Action< TriggerEventNotification > notification)
TObject GetCellContents(int column, int row)
Gets the cell contents of the cell at the given row/column.
virtual IStreamableObjectChannel CreateObjectChannel(long objectId)
long CreateStreamableObject(ReferenceType referenceType, long length)
TriggerType
Enumerates the types of triggers, that can be volatile (like the Callback) or stored in the database...
Definition: TriggerType.cs:22
IMessageEnvelope ProcessRollback(IDictionary< string, object > metadata, RollbackRequest request)
IMessageEnvelope ProcessConnect(IDictionary< string, object > metadata, ConnectRequest request)
ServerConnector(IDatabaseHandler handler)
IMessageEnvelope ProcessDisposeResult(IDictionary< string, object > metadata, DisposeResultRequest request)
void SetParameterStyle(ParameterStyle style)
readonly Dictionary< long, IRef > blobIdMap
IMessageEnvelope ProcessMessage(IMessageEnvelope envelope)
bool ShouldNotify(string triggerName, string objectName, TriggerEventType eventType)
virtual void Dispose(bool disposing)
void ChangeState(ConnectorState newState)
IMessageEnvelope ProcessQuery(IDictionary< string, object > metadata, QueryExecuteRequest request)
virtual IMessage GetMessage(IMessageEnvelope envelope)
IMessageEnvelope ProcessClose(IDictionary< string, object > metadata)
ITriggerChannel CreateTriggerChannel(string triggerName, string objectName, TriggerEventType eventType)
void Commit()
Commits the latest changes made by the user in the session.
Definition: Session.cs:200
virtual AuthenticatedSession OnAuthenticate(string defaultSchema, string username, string password)
A TABLE object in a database.
virtual ITriggerChannel CreateTriggerChannel(string triggerName, string objectName, TriggerEventType eventType)
TriggerChannel(ServerConnector connector, long id, string triggerName, string objectName, TriggerEventType eventType)
ILargeObjectChannel CreateObjectChannel(long objectId)
ICollection< QueryParameter > Parameters
Definition: SqlQuery.cs:53
virtual EncryptionData GetEncryptionData()
QueryResponse(int resultId, QueryResult result, int queryTime, string warnings)
virtual bool Authenticate(string defaultSchema, string username, string password)
An object that is streamable (such as a long binary object, or a long string object).
int ResultId
Returns a number that identifies this command within the set of queries executed on the connection...