17 using System.Collections.Generic;
31 namespace Deveel.Data.Protocol {
45 private const int ServerVersion = 1;
50 private ClientConnectionState
state;
60 private readonly Dictionary<string, DatabaseInterface>
dbInterfaces;
63 protected Processor(DbController controller,
string hostString) {
64 this.hostString = hostString;
65 dbInterfaces =
new Dictionary<string, DatabaseInterface>();
66 this.controller = controller;
68 authenticationTries = 0;
69 dbCallback = OnDatabaseEvent;
75 MemoryStream bout =
new MemoryStream();
76 BinaryWriter dout =
new BinaryWriter(bout, Encoding.Unicode);
77 dout.Write(eventType);
78 dout.Write(eventMessage);
79 SendEvent(bout.ToArray());
80 }
catch (IOException e) {
81 controller.Logger.Error(
this,
"IO Error: " + e.Message);
82 controller.Logger.Error(
this, e);
93 if (!controller.DatabaseExists(config, databaseName))
96 DatabaseInterface dbi;
97 if (!dbInterfaces.TryGetValue(databaseName, out dbi)) {
98 dbi =
new DatabaseInterface(controller, databaseName, hostString);
99 dbInterfaces[databaseName] = dbi;
102 Database = controller.GetDatabase(databaseName);
116 if (state == ClientConnectionState.Closed) {
118 BinaryReader reader =
new BinaryReader(
new MemoryStream(command), Encoding.ASCII);
129 string databaseName = reader.ReadString();
131 if (!ChangeDatabaseInterface(controller.Config, databaseName))
134 Version version = Assembly.GetExecutingAssembly().GetName().Version;
136 byte[] ackCommand =
new byte[4 + 1 + 4 + 4 + 1];
145 state = ClientConnectionState.NotAuthenticated;
151 if (state == ClientConnectionState.NotAuthenticated) {
153 MemoryStream input =
new MemoryStream(command);
154 BinaryReader reader =
new BinaryReader(input, Encoding.ASCII);
155 string defaultSchema = reader.ReadString();
156 string username = reader.ReadString();
157 string password = reader.ReadString();
160 if (!dbInterface.Login(defaultSchema, username, password, dbCallback)) {
162 if (authenticationTries >= 12) {
165 ++authenticationTries;
169 state = ClientConnectionState.Processing;
172 }
catch (DataException) {
179 if (state == ClientConnectionState.Processing)
181 return ProcessQuery(command);
183 throw new Exception(
"Illegal state: " + state);
192 protected ClientConnectionState ClientState {
193 get {
return state; }
203 byte[] buf =
new byte[4];
214 private static byte[]
Exception(
int dispatchId, DataException e) {
216 string msg = e.Message;
217 if (String.IsNullOrEmpty(msg))
218 msg =
"NULL exception message";
220 string server_msg =
"";
221 string stack_trace =
"";
223 if (e is DbDataException) {
224 DbDataException me = (DbDataException)e;
225 server_msg = me.ServerErrorMessage;
226 stack_trace = me.ServerErrorStackTrace;
228 stack_trace = e.StackTrace;
231 MemoryStream output =
new MemoryStream();
232 BinaryWriter writer =
new BinaryWriter(output, Encoding.Unicode);
233 writer.Write(dispatchId);
237 writer.Write(stack_trace);
239 return output.ToArray();
249 byte[] buf =
new byte[8];
270 if (dispatchId == -1)
271 throw new Exception(
"Special case dispatch id of -1 in query");
274 result = ChangeDatabase(dispatchId, command);
276 result = ResultSection(dispatchId, command);
278 result = QueryCommand(dispatchId, command);
280 result = PushStreamableObjectPart(dispatchId, command);
282 result = DisposeResult(dispatchId, command);
284 result = StreamableObjectSection(dispatchId, command);
286 result = DisposeStreamableObject(dispatchId, command);
291 throw new Exception(
"Query (" + ins +
") not understood.");
304 if (dbInterfaces.Count > 0) {
305 foreach (DatabaseInterface databaseInterface
in dbInterfaces.Values) {
306 databaseInterface.Dispose();
309 dbInterfaces.Clear();
310 }
catch (Exception e) {
311 controller.Logger.Error(
this, e);
321 MemoryStream input =
new MemoryStream(command, 8, command.Length - 8);
322 BinaryReader reader =
new BinaryReader(input, Encoding.Unicode);
324 string databaseName = reader.ReadString();
327 dbInterface.ChangeDatabase(databaseName);
328 state = ClientConnectionState.NotAuthenticated;
329 return SimpleSuccess(dispatchId);
330 }
catch (DataException e) {
331 return Exception(dispatchId, e);
348 MemoryStream input =
new MemoryStream(command, 8, command.Length - 8);
349 BinaryReader reader =
new BinaryReader(input, Encoding.Unicode);
357 MemoryStream output =
new MemoryStream();
358 BinaryWriter writer =
new BinaryWriter(output, Encoding.Unicode);
360 writer.Write(dispatchId);
369 writer.Write(colCount);
370 for (
int i = 0; i < colCount; ++i) {
371 response.GetColumnDescription(i).WriteTo(writer);
374 return output.ToArray();
375 }
catch (DataException e) {
377 return Exception(dispatchId, e);
390 ReferenceType type = (ReferenceType) command[8];
394 byte[] obBuf =
new byte[length];
395 Array.Copy(command, 29, obBuf, 0, length);
400 dbInterface.PushStreamableObjectPart(type, objectId, objectLength, obBuf, offset, length);
403 return SimpleSuccess(dispatchId);
404 }
catch (DataException e) {
405 return Exception(dispatchId, e);
424 ResultPart block = dbInterface.GetResultPart(resultId, rowNumber, rowCount);
426 MemoryStream output =
new MemoryStream();
427 BinaryWriter writer =
new BinaryWriter(output, Encoding.Unicode);
429 writer.Write(dispatchId);
435 int colCount = block.Count / rowCount;
436 writer.Write(colCount);
437 int bsize = block.Count;
438 for (
int index = 0; index < bsize; ++index) {
439 ObjectTransfer.WriteTo(writer, block[index]);
443 return output.ToArray();
444 }
catch (DataException e) {
445 return Exception(dispatchId, e);
462 byte[] buf = dbInterface.GetStreamableObjectPart(resultId, streamableObjectId, offset, length);
464 MemoryStream output =
new MemoryStream();
465 BinaryWriter writer =
new BinaryWriter(output, Encoding.Unicode);
467 writer.Write(dispatchId);
470 writer.Write(buf.Length);
471 writer.Write(buf, 0, buf.Length);
473 return output.ToArray();
474 }
catch (DataException e) {
475 return Exception(dispatchId, e);
492 dbInterface.DisposeStreamableObject(resultId, streamableObjectId);
495 return SimpleSuccess(dispatchId);
496 }
catch (DataException e) {
497 return Exception(dispatchId, e);
513 dbInterface.DisposeResult(resultId);
515 return SimpleSuccess(dispatchId);
516 }
catch (DataException e) {
517 return Exception(dispatchId, e);
539 protected abstract void SendEvent(byte[] eventMsg);
544 public abstract void Close();
549 public abstract bool IsClosed {
get; }
560 GC.SuppressFinalize(
this);
byte[] ChangeDatabase(int dispatchId, byte[] command)
const int StreamableObjectSection
Requests a section of a streamable object from the server.
readonly DbController controller
const int Exception
Operation threw an exception.
This processes _queries from a client and dispatches the _queries to the database.
readonly DatabaseEventCallback dbCallback
The database call back method that sends database events back to the client.
ByteBuffer WriteInteger(int v)
Writes an integer into the buffer at the current position.
const int DisposeStreamableObject
Disposes of the resources associated with a streamable object on the server.
byte[] ProcessCommand(byte[] command)
Processes a single Query from the client.
int ReadInt4()
Reads an integer from the buffer at the current position.
static byte[] Single(int val)
Returns a single 4 byte array with the given int encoded into it.
The response to a command executed via the IDatabaseInterface.ExecuteQuery method in the IDatabaseInt...
const int Query
Query sent to the server for processing.
The default implementation of a database in a system.
const int PushStreamableObjectPart
For pushing a part of a streamable object onto the server from the client.
int ColumnCount
The number of columns in the command result.
const int DatabaseNotFound
The specified database was not found.
byte[] DisposeStreamableObject(int dispatchId, byte[] command)
Disposes of a streamable object.
The representation of a single database in the system.
bool ChangeDatabaseInterface(IDbConfig config, string databaseName)
delegate void DatabaseEventCallback(int eventType, string eventMessage)
Constants used in the database communication protocol.
void Dispose(bool disposing)
Disposes of this processor.
static byte[] Exception(int dispatchId, DataException e)
Creates a response that represents a data exception failure.
const int UserAuthenticationPassed
Sent if login passed.
byte[] ProcessQuery(byte[] command)
Processes a query on the byte[] array and returns the result.
readonly Dictionary< string, DatabaseInterface > dbInterfaces
byte[] StreamableObjectSection(int dispatchId, byte[] command)
Returns a section of a streamable object.
byte[] ResultSection(int dispatchId, byte[] command)
Responds with a part of the result set of a query made via the ProtocolConstants.Query Query...
int authenticationTries
Number of authentications tried.
DatabaseInterface dbInterface
Processor(DbController controller, string hostString)
byte[] DisposeResult(int dispatchId, byte[] command)
Disposes of a result set we queries via the ProtocolConstants.Query Query.
static byte[] SimpleSuccess(int dispatchId)
Creates a response that indicates a simple success of an operation with the given dispatch id...
byte[] PushStreamableObjectPart(int dispatchId, byte[] command)
Pushes a part of a streamable object onto the server.
A wrapper for an array of byte.
byte[] QueryCommand(int dispatchId, byte[] command)
Executes a query and returns the header for the result in the response.
ClientConnectionState state
The current state we are in.
readonly string hostString
static long ReadInt8(byte[] arr, int offset)
const int Close
Closes the protocol stream.
const int Success
Operation was successful.
const int Acknowledgement
Sent as an acknowledgement to a command.
const int ResultSection
Requests a section of a result from the server.
const int ChangeDatabase
Changes the current database for the session.
void OnDatabaseEvent(int eventType, String eventMessage)
int ResultId
Returns a number that identifies this command within the set of queries executed on the connection...
const int UserAuthenticationFailed
Sent if login failed because username or password were invalid.
const int DisposeResult
Disposes the server-side resources associated with a result.