17 using System.Collections.Generic;
29 namespace Deveel.Data.Protocol {
30 public abstract class ServerConnector : IServerConnector {
33 private bool autoCommit;
34 private bool ignoreIdentifiersCase;
39 private readonly
object triggerLock =
new object();
43 throw new ArgumentNullException(
"handler");
45 DatabaseHandler = handler;
46 resultMap =
new Dictionary<int, QueryResult>();
47 blobIdMap =
new Dictionary<long, IRef>();
51 public AuthenticatedSession
Session {
get;
protected set; }
55 public ConnectionEndPoint RemoteEndPoint {
get;
private set; }
59 protected IDatabaseHandler DatabaseHandler {
get;
private set; }
61 protected ILogger Logger {
64 return new EmptyLogger();
74 throw new ObjectDisposedException(GetType().AssemblyQualifiedName);
79 throw new InvalidOperationException(
"The connector is not authenticated.");
84 CurrentState = newState;
89 RemoteEndPoint = remoteEndPoint;
90 Database = DatabaseHandler.GetDatabase(databaseName);
92 throw new DatabaseException();
96 }
catch (Exception ex) {
97 Logger.Error(
this,
"Error when opening the connector.");
98 Logger.Error(
this, ex);
113 ignoreIdentifiersCase = state;
118 parameterStyle = style;
124 }
catch (Exception ex) {
125 Logger.Error(
this,
"Error when closing the connector.");
126 Logger.Error(
this, ex);
140 protected virtual AuthenticatedSession
OnAuthenticate(
string defaultSchema,
string username,
string password) {
141 var user =
Database.AuthenticateUser(username, password, RemoteEndPoint);
146 IDatabaseConnection connection =
Database.CreateNewConnection(user, OnTriggerFired);
149 LockingMechanism locker = connection.LockingMechanism;
154 connection.AutoCommit =
true;
157 if (connection.SchemaExists(defaultSchema)) {
158 connection.SetDefaultSchema(defaultSchema);
160 Logger.WarningFormat(
this,
"Couldn't change to '{0}' schema.", defaultSchema);
163 connection.SetDefaultSchema(ConfigDefaultValues.DefaultSchema);
170 Logger.Warning(
this, e);
177 return new AuthenticatedSession(user, connection);
181 lock (triggerChannels) {
182 foreach (var channel
in triggerChannels.Values) {
183 if (channel.ShouldNotify(triggerName, triggerSource, eventType))
184 channel.Notify(triggerName, triggerSource, eventType, count);
195 Session.Connection.AutoCommit =
false;
199 protected virtual bool Authenticate(
string defaultSchema,
string username,
string password) {
202 throw new InvalidOperationException(
"Already authenticated.");
204 if (Logger.IsInterestedIn(
LogLevel.Debug)) {
206 Logger.DebugFormat(
this,
"[CLIENT] [{0}] - Log in", username);
209 if (Logger.IsInterestedIn(
LogLevel.Info)) {
210 Logger.InfoFormat(
this,
"Authenticate User: {0}", username);
214 Session = OnAuthenticate(defaultSchema, username, password);
218 Session.Connection.AutoCommit = autoCommit;
219 Session.Connection.IsInCaseInsensitiveMode = ignoreIdentifiersCase;
220 Session.Connection.ParameterStyle = parameterStyle;
225 }
catch (Exception e) {
234 var obj =
Session.Connection.CreateLargeObject(referenceType, length);
235 blobIdMap[obj.Id] = obj;
237 }
catch (Exception ex) {
238 Logger.ErrorFormat(
this,
"A request to create an object of type {0} with length {1} caused and error.", referenceType, length);
239 Logger.Error(
this, ex);
248 if (!blobIdMap.TryGetValue(objectId, out obj)) {
249 obj =
Session.Connection.GetLargeObject(objectId);
250 blobIdMap[objectId] = obj;
259 DateTime startTime = DateTime.Now;
270 if (parameters != null) {
271 foreach (var parameter
in parameters) {
272 var preparedParam = parameter.Value;
274 var obj = (StreamableObject) preparedParam;
275 IRef objRef = CompleteStream(obj.Identifier);
276 preparedParam = objRef;
278 query.Parameters.Add(
new SqlQueryParameter(parameter.Name, preparedParam));
282 Table[] results = SqlQueryExecutor.Execute(
Session.Connection, query);
286 foreach (
Table result
in results) {
295 resultId = AddResult(queryResult);
296 }
catch (Exception e) {
299 DisposeResult(resultId);
305 TimeSpan taken = DateTime.Now - startTime;
308 responses[j] =
new QueryResponse(resultId, queryResult, (
int) taken.TotalMilliseconds,
"");
317 if (Logger.IsInterestedIn(
LogLevel.Debug)) {
319 Logger.DebugFormat(
this,
"[CLIENT] [{0}] - Query: {1}",
Session.
User.UserName, text);
323 if (Logger.IsInterestedIn(
LogLevel.Debug)) {
324 Logger.DebugFormat(
this,
"Query From User: {0}",
Session.
User.UserName);
325 Logger.DebugFormat(
this,
"Query: {0}", text.Trim());
329 LockingMechanism locker =
Session.Connection.LockingMechanism;
346 locker.SetMode(lockMode);
349 response = CoreExecuteQuery(text, parameters);
359 locker.FinishMode(lockMode);
361 }
catch (Exception e) {
364 Logger.Error(
this,
"Exception finishing locks");
365 Logger.Error(
this, e);
376 if (
Session.Connection.AutoCommit) {
382 if (response == null) {
389 }
catch (Exception e) {
392 DisposeResult(queryResponse.
ResultId);
406 private readonly Dictionary<int, QueryResult>
resultMap;
408 private int uniqueResultId;
418 resultId = ++uniqueResultId;
420 resultMap[resultId] = result;
429 return resultMap.TryGetValue(resultId, out result) ? result : null;
438 throw new DatabaseException(
"'resultId' invalid.");
440 int rowEnd = startRow + countRows;
442 if (startRow < 0 || startRow >= table.
RowCount ||
444 throw new DatabaseException(
"Result part out of range.");
450 for (
int r = startRow; r < rowEnd; ++r) {
451 var row =
new object[colCount];
452 for (
int c = 0; c < colCount; ++c) {
458 if (value.Object is IRef) {
459 var reference = (IRef) value.Object;
460 clientOb =
new StreamableObject(reference.Type, reference.RawSize, reference.Id);
462 clientOb = value.Object;
471 }
catch (Exception e) {
472 Logger.Warning(
this, e);
475 throw new DatabaseException(
"Exception while reading results: " + e.Message, e);
483 if (resultMap.TryGetValue(resultId, out result))
484 resultMap.Remove(resultId);
486 if (result != null) {
489 Logger.Error(
this,
"Attempt to dispose invalid 'resultId'.");
496 keys =
new List<int>(resultMap.Keys);
499 foreach (
int resultId
in keys) {
500 DisposeResult(resultId);
510 Session.Connection.AutoCommit = autoCommit;
520 Session.Connection.AutoCommit = autoCommit;
524 public abstract ConnectionEndPoint MakeEndPoint(IDictionary<string, object> properties);
533 return CreateEnvelope(metadata, message);
537 if (envelope == null)
546 return CreateObjectChannel(objectId);
550 var obj = GetObjectRef(objectId);
552 throw new InvalidOperationException(
"The object was not created or was not found.");
559 blobIdMap.Remove(objId);
565 var objRef = GetObjectRef(objId);
567 throw new InvalidOperationException();
569 blobIdMap.Remove(objId);
576 return CreateTriggerChannel(triggerName, objectName, eventType);
580 AssertAuthenticated();
583 if (triggerChannels == null)
584 triggerChannels =
new Dictionary<int, TriggerChannel>();
588 if (channel.
ShouldNotify(triggerName, objectName, eventType))
592 int id = ++triggerId;
593 var newChannel =
new TriggerChannel(
this,
id, triggerName, objectName, eventType);
594 triggerChannels[id] = newChannel;
601 GC.SuppressFinalize(
this);
604 protected virtual void Dispose(
bool disposing) {
616 #region QueryResponse
623 this.result = result;
624 QueryTimeMillis = queryTime;
628 public int ResultId {
get;
private set; }
630 public int QueryTimeMillis {
get;
private set; }
632 public int RowCount {
636 public int ColumnCount {
637 get {
return result.ColumnCount; }
641 return result.Fields[n];
644 public string Warnings {
get;
private set; }
649 #region DirectStreamableObjectChannel
652 private readonly IRef
obj;
657 this.connector = connector;
661 connector.DisposeChannel(obj.Id);
664 public void PushData(
long offset, byte[] buffer,
int length) {
665 obj.Write(offset, buffer, length);
669 if (length > 512 * 1024)
670 throw new DatabaseException(
"Request length exceeds 512 KB");
674 var blobPart =
new byte[length];
675 obj.Read(offset, blobPart, length);
679 }
catch (IOException e) {
680 throw new DatabaseException(
"Exception while reading blob: " + e.Message, e);
687 #region ServerMessageProcessor
693 this.connector = connector;
701 IDictionary<string, object> metadata = null;
702 if (sourceMessage != null)
705 return CreateErrorResponse(metadata, error);
710 envelope.SetError(error);
719 response.SetError(
new Exception(
"Unable to authenticate."));
726 return connector.CreateEnvelope(metadata,
new AuthenticateResponse(
true, DateTime.UtcNow.Ticks));
727 }
catch (Exception ex) {
728 return CreateErrorResponse(metadata, ex);
734 var message = connector.GetMessage(envelope);
736 return CreateErrorResponse(metadata,
new Exception(
"No message found in the envelope."));
739 return ProcessConnect(metadata, (ConnectRequest) message);
742 return ProcessAuthenticate(metadata, (AuthenticateRequest) message);
745 return ProcessQuery(metadata, (QueryExecuteRequest) message);
747 return ProcessQueryPart(metadata, (QueryResultPartRequest) message);
749 return ProcessDisposeResult(metadata, (DisposeResultRequest) message);
752 return ProcessCreateLargeObject(metadata, (LargeObjectCreateRequest) message);
755 return ProcessBegin(metadata);
757 return ProcessCommit(metadata, (CommitRequest)message);
759 return ProcessRollback(metadata, (RollbackRequest)message);
762 return ProcessClose(metadata);
764 return CreateErrorResponse(envelope,
"Message not supported");
768 Exception error = null;
779 var encryptionData = connector.GetEncryptionData();
781 var serverVersion = connector.Database.
Version.ToString(2);
782 response =
new ConnectResponse(
true, serverVersion, encryptionData != null, encryptionData);
783 }
catch (Exception ex) {
784 connector.Logger.Error(connector,
"Error while opening a connection.");
785 connector.Logger.Error(connector, ex);
791 var envelope = connector.CreateEnvelope(metadata, response);
793 envelope.SetError(error);
795 return connector.CreateEnvelope(metadata, response);
800 connector.AssertNotDisposed();
801 connector.AssertAuthenticated();
803 connector.CloseConnector();
805 }
catch (Exception ex) {
806 connector.Logger.
Error(connector,
"Error while closing a connection.");
807 connector.Logger.Error(connector, ex);
808 return CreateErrorResponse(metadata, ex);
814 connector.AssertNotDisposed();
815 connector.AssertAuthenticated();
820 }
catch (Exception ex) {
821 connector.Logger.
Error(connector,
"Error while processing a query request.");
822 connector.Logger.Error(connector, ex);
823 return CreateErrorResponse(metadata, ex);
829 connector.AssertNotDisposed();
830 connector.AssertAuthenticated();
834 }
catch (Exception ex) {
835 connector.Logger.
Error(connector,
"Error while requesting part of a query result.");
836 connector.Logger.Error(connector, ex);
843 connector.AssertNotDisposed();
844 connector.AssertAuthenticated();
846 connector.DisposeResult(request.
ResultId);
848 }
catch (Exception ex) {
849 connector.Logger.
Error(connector,
"Error occurred while disposing a query result.");
850 connector.Logger.Error(connector, ex);
851 return CreateErrorResponse(metadata, ex);
858 connector.AssertNotDisposed();
859 connector.AssertAuthenticated();
861 var objRef = connector.CreateStreamableObject(request.ReferenceType, request.
ObjectLength);
862 return connector.CreateEnvelope(metadata,
864 }
catch (Exception ex) {
865 connector.Logger.
Error(connector,
"Error while creating a large object.");
866 connector.Logger.Error(connector, ex);
867 return CreateErrorResponse(metadata, ex);
873 connector.AssertNotDisposed();
874 connector.AssertAuthenticated();
876 var
id = connector.BeginTransaction();
877 return connector.CreateEnvelope(metadata,
new BeginResponse(
id));
878 }
catch (Exception ex) {
879 connector.Logger.
Error(connector,
"Error while beginning a transaction.");
880 connector.Logger.Error(connector, ex);
881 return CreateErrorResponse(metadata, ex);
887 connector.AssertNotDisposed();
888 connector.AssertAuthenticated();
892 }
catch (Exception ex) {
893 connector.Logger.
Error(connector,
"Error while committing the transaction.");
894 connector.Logger.Error(connector, ex);
895 return CreateErrorResponse(metadata, ex);
901 connector.AssertNotDisposed();
902 connector.AssertAuthenticated();
906 }
catch (Exception ex) {
907 connector.Logger.
Error(connector,
"Error while rolling-back the transaction.");
908 connector.Logger.Error(connector, ex);
909 return CreateErrorResponse(metadata, ex);
916 #region TriggerChannel
920 private readonly
long id;
922 private string TriggerName {
get; set; }
931 this.connector = connector;
933 TriggerName = triggerName;
935 EventType = eventType;
939 if (!String.Equals(triggerName, TriggerName, StringComparison.OrdinalIgnoreCase))
942 return (eventType & EventType) != 0;
947 GC.SuppressFinalize(
this);
952 connector.DisposeTriggerChannel(
id);
957 callback = notification;
961 if (callback != null)
967 throw new NotImplementedException();
IMessageProcessor CreateProcessor()
virtual IQueryResponse[] ExecuteQuery(string text, IEnumerable< SqlQueryParameter > parameters)
IMessageEnvelope CreateErrorResponse(IDictionary< string, object > metadata, Exception error)
IMessageEnvelope ProcessQueryPart(IDictionary< string, object > metadata, QueryResultPartRequest request)
Dictionary< int, TriggerChannel > triggerChannels
int ColumnCount
Returns the column count.
IQueryResponse[] CoreExecuteQuery(string text, IEnumerable< SqlQueryParameter > parameters)
void DisposeResult(int resultId)
IMessageEnvelope ProcessCommit(IDictionary< string, object > metadata, CommitRequest request)
TriggerEventType
The different types of high layer trigger events.
LockingMode
The mode applied to a lock over a resource during a transaction.
ServerMessageProcessor(ServerConnector connector)
IMessageEnvelope CreateErrorResponse(IMessageEnvelope sourceMessage, Exception error)
readonly QueryResult result
void Notify(string triggerName, string triggerSource, TriggerEventType eventType, int count)
void AssertAuthenticated()
IMessageEnvelope ProcessAuthenticate(IDictionary< string, object > metadata, AuthenticateRequest request)
void CommitTransaction(int transactionId)
QueryResultPart GetResultPart(int resultId, int startRow, int countRows)
readonly ServerConnector connector
virtual void OnConnectorOpen()
ConnectionEndPoint RemoteEndPoint
IMessageEnvelope ProcessBegin(IDictionary< string, object > metadata)
The response to a command executed via the IDatabaseInterface.ExecuteQuery method in the IDatabaseInt...
ParameterStyle parameterStyle
void Rollback()
Rolls-back all the modifications made by the user in this session
QueryParameterStyle ParameterStyle
int AddResult(QueryResult result)
The default implementation of a database in a system.
readonly Dictionary< int, QueryResult > resultMap
IMessageEnvelope CreateErrorResponse(IMessageEnvelope sourceMessage, string message)
Describes the name of an object within a database.
int RowCount
Returns the row count.
byte[] ReadData(long offset, int length)
LogLevel
The level listened by a diagnostic logger
IDictionary< string, object > Metadata
void RollbackTransaction(int transactionId)
bool IgnoreIdentifiersCase
The representation of a single database in the system.
IContext IEventSource. Context
virtual void OnTriggerFired(string triggerName, string triggerSource, TriggerEventType eventType, int count)
IMessageEnvelope CreateEnvelope(IDictionary< string, object > metadata, IMessage message)
void SetAutoCommit(bool state)
This is a session that is constructed around a given user and a transaction, to the given database...
IMessageEnvelope ProcessCreateLargeObject(IDictionary< string, object > metadata, LargeObjectCreateRequest request)
void OpenConnector(ConnectionEndPoint remoteEndPoint, string databaseName)
DirectStreamableObjectChannel(ServerConnector connector, IRef obj)
void PushData(long offset, byte[] buffer, int length)
void OnTriggeInvoked(Action< TriggerEventNotification > notification)
TObject GetCellContents(int column, int row)
Gets the cell contents of the cell at the given row/column.
virtual void OnCloseConnector()
QueryResult GetResult(int resultId)
virtual IStreamableObjectChannel CreateObjectChannel(long objectId)
long CreateStreamableObject(ReferenceType referenceType, long length)
TriggerType
Enumerates the types of triggers, that can be volatile (like the Callback) or stored in the database...
void DisposeChannel(long objId)
IMessageEnvelope ProcessRollback(IDictionary< string, object > metadata, RollbackRequest request)
IMessageEnvelope ProcessConnect(IDictionary< string, object > metadata, ConnectRequest request)
ServerConnector(IDatabaseHandler handler)
IMessageEnvelope ProcessDisposeResult(IDictionary< string, object > metadata, DisposeResultRequest request)
void SetParameterStyle(ParameterStyle style)
readonly Dictionary< long, IRef > blobIdMap
IMessageEnvelope ProcessMessage(IMessageEnvelope envelope)
bool ShouldNotify(string triggerName, string objectName, TriggerEventType eventType)
virtual void Dispose(bool disposing)
void ChangeState(ConnectorState newState)
IMessageEnvelope ProcessQuery(IDictionary< string, object > metadata, QueryExecuteRequest request)
Action< TriggerEventNotification > callback
virtual IMessage GetMessage(IMessageEnvelope envelope)
IMessageEnvelope ProcessClose(IDictionary< string, object > metadata)
ITriggerChannel CreateTriggerChannel(string triggerName, string objectName, TriggerEventType eventType)
IRef CompleteStream(long objId)
void SetIgnoreIdentifiersCase(bool state)
void Commit()
Commits the latest changes made by the user in the session.
virtual AuthenticatedSession OnAuthenticate(string defaultSchema, string username, string password)
A TABLE object in a database.
virtual ITriggerChannel CreateTriggerChannel(string triggerName, string objectName, TriggerEventType eventType)
TriggerChannel(ServerConnector connector, long id, string triggerName, string objectName, TriggerEventType eventType)
IRef GetObjectRef(long objectId)
QueryResultColumn GetColumnDescription(int n)
ILargeObjectChannel CreateObjectChannel(long objectId)
readonly ServerConnector connector
void DisposeTriggerChannel(long id)
ICollection< QueryParameter > Parameters
virtual EncryptionData GetEncryptionData()
QueryResponse(int resultId, QueryResult result, int queryTime, string warnings)
virtual bool Authenticate(string defaultSchema, string username, string password)
An object that is streamable (such as a long binary object, or a long string object).
void Dispose(bool disposing)
int ResultId
Returns a number that identifies this command within the set of queries executed on the connection...