DeveelDB  20151217
complete SQL database system, primarly developed for .NET/Mono frameworks
Processor.cs
Go to the documentation of this file.
1 //
2 // Copyright 2010-2014 Deveel
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 
16 using System;
17 using System.Collections.Generic;
18 using System.Data;
19 using System.IO;
20 using System.Reflection;
21 using System.Text;
22 
23 using Deveel.Data.Client;
25 using Deveel.Data.Control;
26 using Deveel.Data.DbSystem;
27 using Deveel.Data.Sql;
28 using Deveel.Data.Util;
29 using Deveel.Diagnostics;
30 
31 namespace Deveel.Data.Protocol {
41  public abstract class Processor : IDisposable {
45  private const int ServerVersion = 1;
46 
50  private ClientConnectionState state;
51 
55  private int authenticationTries;
56 
57  private readonly string hostString;
58 
59  private readonly DbController controller;
60  private readonly Dictionary<string, DatabaseInterface> dbInterfaces;
61  private DatabaseInterface dbInterface;
62 
63  protected Processor(DbController controller, string hostString) {
64  this.hostString = hostString;
65  dbInterfaces = new Dictionary<string, DatabaseInterface>();
66  this.controller = controller;
67  state = 0;
68  authenticationTries = 0;
69  dbCallback = OnDatabaseEvent;
70  }
71 
72  private void OnDatabaseEvent(int eventType, String eventMessage) {
73  try {
74  // Format the call back and send the event.
75  MemoryStream bout = new MemoryStream();
76  BinaryWriter dout = new BinaryWriter(bout, Encoding.Unicode);
77  dout.Write(eventType);
78  dout.Write(eventMessage);
79  SendEvent(bout.ToArray());
80  } catch (IOException e) {
81  controller.Logger.Error(this, "IO Error: " + e.Message);
82  controller.Logger.Error(this, e);
83  }
84  }
85 
86 
91 
92  private bool ChangeDatabaseInterface(IDbConfig config, string databaseName) {
93  if (!controller.DatabaseExists(config, databaseName))
94  return false;
95 
96  DatabaseInterface dbi;
97  if (!dbInterfaces.TryGetValue(databaseName, out dbi)) {
98  dbi = new DatabaseInterface(controller, databaseName, hostString);
99  dbInterfaces[databaseName] = dbi;
100  }
101 
102  Database = controller.GetDatabase(databaseName);
103  dbInterface = dbi;
104  return true;
105  }
106 
115  protected byte[] ProcessCommand(byte[] command) {
116  if (state == ClientConnectionState.Closed) {
117  // State 0 means we looking for the header...
118  BinaryReader reader = new BinaryReader(new MemoryStream(command), Encoding.ASCII);
119  /*
120  int magic = ByteBuffer.ReadInt4(Query, 0);
121  // The driver version number
122  int maj_ver = ByteBuffer.ReadInt4(Query, 4);
123  int min_ver = ByteBuffer.ReadInt4(Query, 8);
124  */
125  reader.ReadInt32(); // magic
126  reader.ReadInt32(); // server major version
127  reader.ReadInt32(); // server minor version
128 
129  string databaseName = reader.ReadString();
130 
131  if (!ChangeDatabaseInterface(controller.Config, databaseName))
132  return Single(ProtocolConstants.DatabaseNotFound);
133 
134  Version version = Assembly.GetExecutingAssembly().GetName().Version;
135 
136  byte[] ackCommand = new byte[4 + 1 + 4 + 4 + 1];
137  // Send back an acknowledgement and the version number of the server
139  ackCommand[4] = 1;
140  ByteBuffer.WriteInteger(version.Major, ackCommand, 5);
141  ByteBuffer.WriteInteger(version.Minor, ackCommand, 9);
142  ackCommand[13] = 0;
143 
144  // Set to the next state.
145  state = ClientConnectionState.NotAuthenticated;
146 
147  // Return the acknowledgement
148  return ackCommand;
149  }
150 
151  if (state == ClientConnectionState.NotAuthenticated) {
152  // State 4 means we looking for username and password...
153  MemoryStream input = new MemoryStream(command);
154  BinaryReader reader = new BinaryReader(input, Encoding.ASCII);
155  string defaultSchema = reader.ReadString();
156  string username = reader.ReadString();
157  string password = reader.ReadString();
158 
159  try {
160  if (!dbInterface.Login(defaultSchema, username, password, dbCallback)) {
161  // Close after 12 tries.
162  if (authenticationTries >= 12) {
163  Close();
164  } else {
165  ++authenticationTries;
167  }
168  } else {
169  state = ClientConnectionState.Processing;
171  }
172  } catch (DataException) {
173 
174  }
175 
176  return null;
177  }
178 
179  if (state == ClientConnectionState.Processing)
180  // Process the query
181  return ProcessQuery(command);
182 
183  throw new Exception("Illegal state: " + state);
184  }
185 
192  protected ClientConnectionState ClientState {
193  get { return state; }
194  }
195 
196 
202  private static byte[] Single(int val) {
203  byte[] buf = new byte[4];
204  ByteBuffer.WriteInteger(val, buf, 0);
205  return buf;
206  }
207 
214  private static byte[] Exception(int dispatchId, DataException e) {
215  int code = /* TODO: e.ErrorCode */ -1;
216  string msg = e.Message;
217  if (String.IsNullOrEmpty(msg))
218  msg = "NULL exception message";
219 
220  string server_msg = "";
221  string stack_trace = "";
222 
223  if (e is DbDataException) {
224  DbDataException me = (DbDataException)e;
225  server_msg = me.ServerErrorMessage;
226  stack_trace = me.ServerErrorStackTrace;
227  } else {
228  stack_trace = e.StackTrace;
229  }
230 
231  MemoryStream output = new MemoryStream();
232  BinaryWriter writer = new BinaryWriter(output, Encoding.Unicode);
233  writer.Write(dispatchId);
234  writer.Write(ProtocolConstants.Exception);
235  writer.Write(code);
236  writer.Write(msg);
237  writer.Write(stack_trace);
238 
239  return output.ToArray();
240  }
241 
248  private static byte[] SimpleSuccess(int dispatchId) {
249  byte[] buf = new byte[8];
250  ByteBuffer.WriteInteger(dispatchId, buf, 0);
252  return buf;
253  }
254 
260  private byte[] ProcessQuery(byte[] command) {
261  byte[] result;
262 
263  // The first int is the Query.
264  int ins = ByteBuffer.ReadInt4(command, 0);
265 
266  // Otherwise must be a dispatch type request.
267  // The second is the dispatch id.
268  int dispatchId = ByteBuffer.ReadInt4(command, 4);
269 
270  if (dispatchId == -1)
271  throw new Exception("Special case dispatch id of -1 in query");
272 
273  if (ins == ProtocolConstants.ChangeDatabase) {
274  result = ChangeDatabase(dispatchId, command);
275  } else if (ins == ProtocolConstants.ResultSection) {
276  result = ResultSection(dispatchId, command);
277  } else if (ins == ProtocolConstants.Query) {
278  result = QueryCommand(dispatchId, command);
279  } else if (ins == ProtocolConstants.PushStreamableObjectPart) {
280  result = PushStreamableObjectPart(dispatchId, command);
281  } else if (ins == ProtocolConstants.DisposeResult) {
282  result = DisposeResult(dispatchId, command);
283  } else if (ins == ProtocolConstants.StreamableObjectSection) {
284  result = StreamableObjectSection(dispatchId, command);
285  } else if (ins == ProtocolConstants.DisposeStreamableObject) {
286  result = DisposeStreamableObject(dispatchId, command);
287  } else if (ins == ProtocolConstants.Close) {
288  Close();
289  result = null;
290  } else {
291  throw new Exception("Query (" + ins + ") not understood.");
292  }
293 
294  return result;
295 
296  }
297 
301  protected void Dispose(bool disposing) {
302  if (disposing) {
303  try {
304  if (dbInterfaces.Count > 0) {
305  foreach (DatabaseInterface databaseInterface in dbInterfaces.Values) {
306  databaseInterface.Dispose();
307  }
308  }
309  dbInterfaces.Clear();
310  } catch (Exception e) {
311  controller.Logger.Error(this, e);
312  }
313  }
314  }
315 
316 
317  // ---------- Primitive _queries ----------
318 
319  private byte [] ChangeDatabase(int dispatchId, byte[] command) {
320  // Read the query from the Query.
321  MemoryStream input = new MemoryStream(command, 8, command.Length - 8);
322  BinaryReader reader = new BinaryReader(input, Encoding.Unicode);
323 
324  string databaseName = reader.ReadString();
325 
326  try {
327  dbInterface.ChangeDatabase(databaseName);
328  state = ClientConnectionState.NotAuthenticated;
329  return SimpleSuccess(dispatchId);
330  } catch (DataException e) {
331  return Exception(dispatchId, e);
332  }
333  }
334 
346  private byte[] QueryCommand(int dispatchId, byte[] command) {
347  // Read the query from the Query.
348  MemoryStream input = new MemoryStream(command, 8, command.Length - 8);
349  BinaryReader reader = new BinaryReader(input, Encoding.Unicode);
350  SqlQuery query = SqlQuery.ReadFrom(reader);
351 
352  try {
353  // Do the query
354  IQueryResponse response = dbInterface.ExecuteQuery(query)[0];
355 
356  // Prepare the stream to output the response to,
357  MemoryStream output = new MemoryStream();
358  BinaryWriter writer = new BinaryWriter(output, Encoding.Unicode);
359 
360  writer.Write(dispatchId);
361  writer.Write(ProtocolConstants.Success);
362 
363  // The response sends the result id, the time the query took, the
364  // total row count, and description of each column in the result.
365  writer.Write(response.ResultId);
366  writer.Write(response.QueryTimeMillis);
367  writer.Write(response.RowCount);
368  int colCount = response.ColumnCount;
369  writer.Write(colCount);
370  for (int i = 0; i < colCount; ++i) {
371  response.GetColumnDescription(i).WriteTo(writer);
372  }
373  writer.Flush();
374  return output.ToArray();
375  } catch (DataException e) {
376  // debug.writeException(e);
377  return Exception(dispatchId, e);
378  }
379 
380  }
381 
382 
389  private byte[] PushStreamableObjectPart(int dispatchId, byte[] command) {
390  ReferenceType type = (ReferenceType) command[8];
391  long objectId = ByteBuffer.ReadInt8(command, 9);
392  long objectLength = ByteBuffer.ReadInt8(command, 17);
393  int length = ByteBuffer.ReadInt4(command, 25);
394  byte[] obBuf = new byte[length];
395  Array.Copy(command, 29, obBuf, 0, length);
396  long offset = ByteBuffer.ReadInt8(command, 29 + length);
397 
398  try {
399  // Pass this through to the underlying database interface.
400  dbInterface.PushStreamableObjectPart(type, objectId, objectLength, obBuf, offset, length);
401 
402  // Return operation success.
403  return SimpleSuccess(dispatchId);
404  } catch (DataException e) {
405  return Exception(dispatchId, e);
406  }
407  }
408 
409 
417  private byte[] ResultSection(int dispatchId, byte[] command) {
418  int resultId = ByteBuffer.ReadInt4(command, 8);
419  int rowNumber = ByteBuffer.ReadInt4(command, 12);
420  int rowCount = ByteBuffer.ReadInt4(command, 16);
421 
422  try {
423  // Get the result part...
424  ResultPart block = dbInterface.GetResultPart(resultId, rowNumber, rowCount);
425 
426  MemoryStream output = new MemoryStream();
427  BinaryWriter writer = new BinaryWriter(output, Encoding.Unicode);
428 
429  writer.Write(dispatchId);
430  writer.Write(ProtocolConstants.Success);
431 
432  // Send the contents of the result set.
433  // HACK - Work out column count by dividing number of entries in block
434  // by number of rows.
435  int colCount = block.Count / rowCount;
436  writer.Write(colCount);
437  int bsize = block.Count;
438  for (int index = 0; index < bsize; ++index) {
439  ObjectTransfer.WriteTo(writer, block[index]);
440  }
441 
442  writer.Flush();
443  return output.ToArray();
444  } catch (DataException e) {
445  return Exception(dispatchId, e);
446  }
447  }
448 
455  private byte[] StreamableObjectSection(int dispatchId, byte[] command) {
456  int resultId = ByteBuffer.ReadInt4(command, 8);
457  long streamableObjectId = ByteBuffer.ReadInt8(command, 12);
458  long offset = ByteBuffer.ReadInt8(command, 20);
459  int length = ByteBuffer.ReadInt4(command, 28);
460 
461  try {
462  byte[] buf = dbInterface.GetStreamableObjectPart(resultId, streamableObjectId, offset, length);
463 
464  MemoryStream output = new MemoryStream();
465  BinaryWriter writer = new BinaryWriter(output, Encoding.Unicode);
466 
467  writer.Write(dispatchId);
468  writer.Write(ProtocolConstants.Success);
469 
470  writer.Write(buf.Length);
471  writer.Write(buf, 0, buf.Length);
472  writer.Flush();
473  return output.ToArray();
474  } catch (DataException e) {
475  return Exception(dispatchId, e);
476  }
477 
478  }
479 
486  private byte[] DisposeStreamableObject(int dispatchId, byte[] command) {
487  int resultId = ByteBuffer.ReadInt4(command, 8);
488  long streamableObjectId = ByteBuffer.ReadInt8(command, 12);
489 
490  try {
491  // Pass this through to the underlying database interface.
492  dbInterface.DisposeStreamableObject(resultId, streamableObjectId);
493 
494  // Return operation success.
495  return SimpleSuccess(dispatchId);
496  } catch (DataException e) {
497  return Exception(dispatchId, e);
498  }
499  }
500 
507  private byte[] DisposeResult(int dispatchId, byte[] command) {
508  // Get the result id.
509  int resultId = ByteBuffer.ReadInt4(command, 8);
510 
511  try {
512  // Dispose the table.
513  dbInterface.DisposeResult(resultId);
514  // Return operation success.
515  return SimpleSuccess(dispatchId);
516  } catch (DataException e) {
517  return Exception(dispatchId, e);
518  }
519  }
520 
521 
522  // ---------- Abstract methods ----------
523 
539  protected abstract void SendEvent(byte[] eventMsg);
540 
544  public abstract void Close();
545 
549  public abstract bool IsClosed { get; }
550 
551  public IDatabase Database { get; private set; }
552 
553  // ---------- Finalize ----------
555  Dispose(false);
556  }
557 
558  public void Dispose() {
559  Dispose(true);
560  GC.SuppressFinalize(this);
561  }
562  }
563 }
byte[] ChangeDatabase(int dispatchId, byte[] command)
Definition: Processor.cs:319
const int StreamableObjectSection
Requests a section of a streamable object from the server.
readonly DbController controller
Definition: Processor.cs:59
const int Exception
Operation threw an exception.
This processes _queries from a client and dispatches the _queries to the database.
Definition: Processor.cs:41
readonly DatabaseEventCallback dbCallback
The database call back method that sends database events back to the client.
Definition: Processor.cs:90
ByteBuffer WriteInteger(int v)
Writes an integer into the buffer at the current position.
Definition: ByteBuffer.cs:118
const int DisposeStreamableObject
Disposes of the resources associated with a streamable object on the server.
byte[] ProcessCommand(byte[] command)
Processes a single Query from the client.
Definition: Processor.cs:115
int ReadInt4()
Reads an integer from the buffer at the current position.
Definition: ByteBuffer.cs:128
static byte[] Single(int val)
Returns a single 4 byte array with the given int encoded into it.
Definition: Processor.cs:202
The response to a command executed via the IDatabaseInterface.ExecuteQuery method in the IDatabaseInt...
const int Query
Query sent to the server for processing.
The default implementation of a database in a system.
Definition: Database.cs:38
const int PushStreamableObjectPart
For pushing a part of a streamable object onto the server from the client.
int ColumnCount
The number of columns in the command result.
const int DatabaseNotFound
The specified database was not found.
byte[] DisposeStreamableObject(int dispatchId, byte[] command)
Disposes of a streamable object.
Definition: Processor.cs:486
The representation of a single database in the system.
Definition: IDatabase.cs:40
bool ChangeDatabaseInterface(IDbConfig config, string databaseName)
Definition: Processor.cs:92
delegate void DatabaseEventCallback(int eventType, string eventMessage)
Constants used in the database communication protocol.
void Dispose(bool disposing)
Disposes of this processor.
Definition: Processor.cs:301
static byte[] Exception(int dispatchId, DataException e)
Creates a response that represents a data exception failure.
Definition: Processor.cs:214
const int UserAuthenticationPassed
Sent if login passed.
byte[] ProcessQuery(byte[] command)
Processes a query on the byte[] array and returns the result.
Definition: Processor.cs:260
readonly Dictionary< string, DatabaseInterface > dbInterfaces
Definition: Processor.cs:60
byte[] StreamableObjectSection(int dispatchId, byte[] command)
Returns a section of a streamable object.
Definition: Processor.cs:455
byte[] ResultSection(int dispatchId, byte[] command)
Responds with a part of the result set of a query made via the ProtocolConstants.Query Query...
Definition: Processor.cs:417
int authenticationTries
Number of authentications tried.
Definition: Processor.cs:55
DatabaseInterface dbInterface
Definition: Processor.cs:61
Processor(DbController controller, string hostString)
Definition: Processor.cs:63
byte[] DisposeResult(int dispatchId, byte[] command)
Disposes of a result set we queries via the ProtocolConstants.Query Query.
Definition: Processor.cs:507
static byte[] SimpleSuccess(int dispatchId)
Creates a response that indicates a simple success of an operation with the given dispatch id...
Definition: Processor.cs:248
byte[] PushStreamableObjectPart(int dispatchId, byte[] command)
Pushes a part of a streamable object onto the server.
Definition: Processor.cs:389
A wrapper for an array of byte.
Definition: ByteBuffer.cs:27
byte[] QueryCommand(int dispatchId, byte[] command)
Executes a query and returns the header for the result in the response.
Definition: Processor.cs:346
ClientConnectionState state
The current state we are in.
Definition: Processor.cs:50
readonly string hostString
Definition: Processor.cs:57
static long ReadInt8(byte[] arr, int offset)
Definition: ByteBuffer.cs:234
const int Close
Closes the protocol stream.
const int Success
Operation was successful.
const int Acknowledgement
Sent as an acknowledgement to a command.
const int ResultSection
Requests a section of a result from the server.
const int ChangeDatabase
Changes the current database for the session.
void OnDatabaseEvent(int eventType, String eventMessage)
Definition: Processor.cs:72
int ResultId
Returns a number that identifies this command within the set of queries executed on the connection...
const int UserAuthenticationFailed
Sent if login failed because username or password were invalid.
const int DisposeResult
Disposes the server-side resources associated with a result.