DeveelDB  20151217
complete SQL database system, primarly developed for .NET/Mono frameworks
ObjectStore.cs
Go to the documentation of this file.
1 //
2 // Copyright 2010-2015 Deveel
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 using System;
18 using System.IO;
19 #if !PCL
20 using System.IO.Compression;
21 #endif
22 
23 namespace Deveel.Data.Store {
24  public class ObjectStore : IObjectStore {
25  private const int Magic = 0x012BC53A9;
26 
27  private const int DeletedFlag = 0x020000;
28  private const int CompressedFlag = 0x010;
29 
30  private const int PageSize = 64;
31 
32  private readonly IStore store;
33  private readonly FixedRecordList fixedList;
34 
35  private long firstDeleteChainRecord;
36 
37  public ObjectStore(int id, IStore store) {
38  if (id < 0)
39  throw new ArgumentOutOfRangeException("id");
40  if (store == null)
41  throw new ArgumentNullException("store");
42 
43  Id = id;
44  this.store = store;
45  fixedList = new FixedRecordList(store, 30);
46  }
47 
48  private long AddToRecordList(long recordOffset) {
49  lock (fixedList) {
50  // If there is no free deleted records in the delete chain,
51  if (firstDeleteChainRecord == -1) {
52  // Increase the size of the list structure.
53  fixedList.IncreaseSize();
54  // The start record of the new size
55  int newBlockNumber = fixedList.BlockCount - 1;
56  long startIndex = fixedList.BlockFirstPosition(newBlockNumber);
57  long sizeOfBlock = fixedList.BlockNodeCount(newBlockNumber);
58 
59  // The IArea object for the new position
60  IArea a = fixedList.GetRecord(startIndex);
61 
62  a.WriteInt4(0);
63  a.WriteInt4(0);
64  a.WriteInt8(-1); // Initially unknown size
65  a.WriteInt8(0); // Initially unknown current size
66  a.WriteInt8(recordOffset);
67  // Set the rest of the block as deleted records
68  for (long n = 1; n < sizeOfBlock - 1; ++n) {
69  a.WriteInt4(DeletedFlag);
70  a.WriteInt4(0);
71  a.WriteInt8(-1);
72  a.WriteInt8(startIndex + n + 1);
73  }
74  // The last block is end of delete chain.
75  a.WriteInt4(DeletedFlag);
76  a.WriteInt4(0);
77  a.WriteInt8(-1);
78  a.WriteInt8(-1);
79  // Check out the changes.
80  a.Flush();
81 
82  // And set the new delete chain
83  firstDeleteChainRecord = startIndex + 1;
84  fixedList.WriteDeleteHead(firstDeleteChainRecord);
85 
86  // Return pointer to the record we just added.
87  return startIndex;
88 
89  }
90 
91  // Pull free block from the delete chain and recycle it.
92  long recycledRecord = firstDeleteChainRecord;
93  IArea block = fixedList.GetRecord(recycledRecord);
94  var recordPos = block.Position;
95  // Status of the recycled block
96  int status = block.ReadInt4();
97  if ((status & DeletedFlag) == 0)
98  throw new InvalidOperationException("Assertion failed: record is not deleted!");
99 
100  // Reference count (currently unused in delete chains).
101  block.ReadInt4();
102  // The size (should be -1);
103  block.ReadInt8();
104  // The current size should be 0
105  block.ReadInt8();
106  // The pointer to the next in the chain.
107  long nextChain = block.ReadInt8();
108  firstDeleteChainRecord = nextChain;
109 
110  // Update the first_delete_chain_record field in the header
111  fixedList.WriteDeleteHead(firstDeleteChainRecord);
112 
113  // Update the block
114  block.Position = recordPos;
115  block.WriteInt4(0);
116  block.WriteInt4(0);
117  block.WriteInt8(-1); // Initially unknown size
118  block.WriteInt8(0);
119  block.WriteInt8(recordOffset);
120 
121  // Check out the changes
122  block.Flush();
123 
124  return recycledRecord;
125  }
126  }
127 
128  public long Create() {
129  // Init the fixed record list area.
130  // The fixed list entries are formatted as follows;
131  // ( status (int), reference_count (int),
132  // blob_size (long), blob_pointer (long) )
133  long fixedListOffset = fixedList.Create();
134 
135  // Delete chain is empty when we start
136  firstDeleteChainRecord = -1;
137  fixedList.WriteDeleteHead(-1);
138 
139  // Allocate a small header that contains the MAGIC, and the pointer to the
140  // fixed list structure.
141  IArea blobStoreHeader = store.CreateArea(32);
142  long blobStoreId = blobStoreHeader.Id;
143 
144  blobStoreHeader.WriteInt4(Magic); // Magic
145  blobStoreHeader.WriteInt4(1); // The data version
146  blobStoreHeader.WriteInt8(fixedListOffset);
147  blobStoreHeader.Flush();
148 
149  // Return the pointer to the blob store header
150  return blobStoreId;
151  }
152 
153  public void Open(long offset) {
154  // Get the header area
155  IArea headerArea = store.GetArea(offset);
156  headerArea.Position = 0;
157 
158  // Read the magic
159  int magic = headerArea.ReadInt4();
160  int version = headerArea.ReadInt4();
161  if (magic != Magic)
162  throw new IOException("The magic value for this Object Store is not correct.");
163  if (version != 1)
164  throw new IOException("The version number for this Object Store is not correct.");
165 
166  long fixedListOffset = headerArea.ReadInt8();
167  fixedList.Open(fixedListOffset);
168 
169  // Set the delete chain
170  firstDeleteChainRecord = fixedList.ReadDeleteHead();
171  }
172 
173  public void Dispose() {
174  }
175 
176  public int Id { get; private set; }
177 
178  public ILargeObject CreateNewObject(long maxSize, bool compressed) {
179  if (maxSize < 0)
180  throw new IOException("Negative object size not allowed.");
181 
182  try {
183  store.Lock();
184 
185  // Allocate the area (plus header area) for storing the blob pages
186  long pageCount = ((maxSize - 1) / (PageSize * 1024)) + 1;
187  IArea objArea = store.CreateArea((pageCount * 8) + 32);
188  long objAreaId = objArea.Id;
189 
190  var type = 2; // Binary Type
191  if (compressed)
192  type |= CompressedFlag;
193 
194  // Set up the area header
195  objArea.WriteInt4(0); // Reserved for future
196  objArea.WriteInt4(type);
197  objArea.WriteInt8(maxSize);
198  objArea.WriteInt8(0);
199  objArea.WriteInt8(pageCount);
200 
201  // Initialize the empty blob area
202  for (long i = 0; i < pageCount; ++i) {
203  objArea.WriteInt8(-1);
204  }
205 
206  // And finish
207  objArea.Flush();
208 
209  // Update the fixed_list and return the record number for this blob
210  long refId = AddToRecordList(objAreaId);
211  return new LargeObject(this, refId, maxSize, 0, compressed, false);
212  } finally {
213  store.Unlock();
214  }
215  }
216 
218  private readonly ObjectStore store;
219 
220  public LargeObject(ObjectStore store, long refId, long size, long currentSize, bool compressed, bool isComplete) {
221  this.store = store;
222  RawSize = size;
223  CurrentSize = currentSize;
224  Id = new ObjectId(store.Id, refId);
225  IsCompressed = compressed;
226  IsComplete = isComplete;
227  }
228 
229  public void Dispose() {
230  }
231 
232  public ObjectId Id { get; private set; }
233 
234  public long RawSize { get; private set; }
235 
236  public long CurrentSize { get; private set; }
237 
238  public bool IsCompressed { get; private set; }
239 
240  public bool IsComplete { get; private set; }
241 
242  public int Read(long offset, byte[] buffer, int length) {
243  return store.ReadObjectPart(Id.Id, offset, buffer, 0, length);
244  }
245 
246  public void Write(long offset, byte[] buffer, int length) {
247  if (IsComplete)
248  throw new IOException("The object is complete and cannot be written.");
249 
250  CurrentSize += length;
251  store.WriteObjectPart(Id.Id, offset, buffer, 0, length);
252  }
253 
254  public void Complete() {
255  store.CompleteObject(this);
256  }
257 
258  public void Establish() {
259  store.EstablishReference(Id.Id);
260  }
261 
262  public bool Release() {
263  return store.ReleaseReference(Id.Id);
264  }
265 
266  public void MarkComplete() {
267  IsComplete = true;
268  }
269  }
270 
271  private void CompleteObject(LargeObject obj) {
272  // Get the blob reference id (reference to the fixed record list).
273  long refId = obj.Id.Id;
274 
275  lock (fixedList) {
276  // Update the record in the fixed list.
277  IArea block = fixedList.GetRecord(refId);
278 
279  // Record the position
280  var recordPos = block.Position;
281  // Read the information in the fixed record
282  int status = block.ReadInt4();
283  // Assert that the status is open
284  if (status != 0)
285  throw new IOException("Assertion failed: record is not open.");
286 
287  int refCount = block.ReadInt4();
288  long size = block.ReadInt8();
289  long currentSize = block.ReadInt8();
290  long pageCount = block.ReadInt8();
291 
292  try {
293  store.Lock();
294 
295  block.Position = recordPos;
296  block.WriteInt4(1); // Status
297  block.WriteInt4(0); // Reference Count
298  block.WriteInt8(obj.CurrentSize); // Final Size
299  block.WriteInt8(obj.CurrentSize);
300  block.WriteInt8(pageCount); // Page Count
301  block.Flush();
302  } finally {
303  store.Unlock();
304  }
305  }
306 
307  // Now the object has been finalized so change the state of the object
308  obj.MarkComplete();
309  }
310 
311  private void WriteObjectPart(long id, long objOffset, byte[] buffer, int off, int length) {
312  // ASSERT: Read and Write position must be 64K aligned.
313  if (objOffset%(PageSize*1024) != 0)
314  throw new Exception("Assert failed: offset is not 64k aligned.");
315 
316  // ASSERT: Length is less than or equal to 64K
317  if (length > (PageSize * 1024)) {
318  throw new Exception("Assert failed: length is greater than 64K.");
319  }
320 
321  int refCount;
322  long objPos;
323  long maxSize;
324  long currentSize;
325 
326  lock (fixedList) {
327  if (id < 0 || id >= fixedList.NodeCount)
328  throw new IOException("Object id is out of range.");
329 
330  IArea block = fixedList.GetRecord(id);
331  var status = block.ReadInt4();
332  if ((status & DeletedFlag) != 0)
333  throw new InvalidOperationException("Assertion failed: record is deleted!");
334 
335  block.ReadInt4(); // Ref count
336  maxSize = block.ReadInt8(); // Total Size / Max Size
337  currentSize = block.ReadInt8(); // Current Size
338  objPos = block.ReadInt8(); // Last Page Position
339  }
340 
341  // Open an IArea into the blob
342  IArea area = store.GetArea(objPos);
343  area.ReadInt4();
344  var type = area.ReadInt4();
345  var size = area.ReadInt8();
346 
347  // Assert that the area being Read is within the bounds of the blob
348  if (objOffset < 0 || objOffset + length > size) {
349  throw new IOException("Object invalid write. offset = " + objOffset + ", length = " + length + ", size = " + size);
350  }
351 
352  // Convert to the page number
353  long pageNumber = (objOffset / (PageSize * 1024));
354  area.Position = (int)((pageNumber * 8) + 32);
355  long pagePos = area.ReadInt8();
356 
357  if (pagePos != -1) {
358  // This means we are trying to rewrite a page we've already written
359  // before.
360  throw new Exception("Assert failed: page position is not -1");
361  }
362 
363  // Is the compression bit set?
364  byte[] toWrite;
365  int writeLength;
366  if ((type & CompressedFlag) != 0) {
367  // Yes, compression
368 #if !PCL
369  var deflateStream = new DeflateStream(new MemoryStream(buffer, off, length), CompressionMode.Compress, false);
370  toWrite = new byte[PageSize * 1024];
371  writeLength = deflateStream.Read(toWrite, 0, toWrite.Length);
372 #else
373  throw new NotSupportedException("Compression not supported in PCL.");
374 #endif
375  } else {
376  // No compression
377  toWrite = buffer;
378  writeLength = length;
379  }
380 
381  try {
382  store.Lock();
383 
384  // Allocate and Write the page.
385  IArea pageArea = store.CreateArea(writeLength + 8);
386  pagePos = pageArea.Id;
387  pageArea.WriteInt4(1);
388  pageArea.WriteInt4(writeLength);
389  pageArea.Write(toWrite, 0, writeLength);
390  // Finish this page
391  pageArea.Flush();
392 
393  // Update the page in the header.
394  area.Position = (int)((pageNumber * 8) + 24);
395  area.WriteInt8(currentSize + writeLength);
396  area.WriteInt8(pagePos);
397  // Check out this change.
398  area.Flush();
399  } finally {
400  store.Unlock();
401  }
402  }
403 
404  private int ReadObjectPart(long id, long objOffset, byte[] buffer, int off, int length) {
405  // ASSERT: Read and Write position must be 64K aligned.
406  if (off % (64 * 1024) != 0) {
407  throw new Exception("Assert failed: offset is not 64k aligned.");
408  }
409  // ASSERT: Length is less than or equal to 64K
410  if (length > (64 * 1024)) {
411  throw new Exception("Assert failed: length is greater than 64K.");
412  }
413 
414  int status;
415  long maxSize;
416  long currentSize;
417  long objPointer;
418 
419  lock (fixedList) {
420  // Assert that the blob reference id given is a valid range
421  if (id < 0 || id >= fixedList.NodeCount) {
422  throw new IOException("Object ID is out of range.");
423  }
424 
425  // Position on this record
426  IArea block = fixedList.GetRecord(id);
427  // Read the information in the fixed record
428  status = block.ReadInt4();
429  // Assert that the status is not deleted
430  if ((status & DeletedFlag) != 0)
431  throw new InvalidOperationException("Assertion failed: record is deleted!");
432 
433  // Get the reference count
434  block.ReadInt4();
435  // Get the total size of the object
436  maxSize = block.ReadInt8();
437  // Get the current running size of the block
438  currentSize = block.ReadInt8();
439  // Get the blob pointer
440  objPointer = block.ReadInt8();
441 
442  }
443 
444  // Assert that the area being Read is within the bounds of the object
445  if (off < 0 || objOffset + length > maxSize) {
446  throw new IOException("Invalid Read. offset = " + objOffset + ", length = " + length + " > maxSize = " + maxSize);
447  }
448 
449  // Open an IArea into the object
450  IArea area = store.GetArea(objPointer);
451  area.ReadInt4();
452  int type = area.ReadInt4();
453 
454  // Convert to the page number
455  long pageNumber = (objOffset / (64 * 1024));
456  area.Position = (int)((pageNumber * 8) + 32);
457  long pagePointer = area.ReadInt8();
458 
459  // Read the page
460  IArea pageArea = store.GetArea(pagePointer);
461  pageArea.Position = 0;
462  int pageType = pageArea.ReadInt4();
463  int pageSize = pageArea.ReadInt4();
464 
465  if ((type & CompressedFlag) != 0) {
466 #if !PCL
467  // The page is compressed
468  byte[] pageBuf = new byte[pageSize];
469  int readCount = pageArea.Read(pageBuf, 0, pageSize);
470 
471  var deflateStream = new DeflateStream(new MemoryStream(pageBuf, 0, pageSize), CompressionMode.Decompress, false);
472  try {
473  int resultLength = deflateStream.Read(buffer, off, length);
474  if (resultLength != length)
475  throw new Exception("Assert failed: decompressed length is incorrect.");
476 
477  return readCount;
478  } catch(InvalidDataException e) {
479  throw new IOException("ZIP Data Format Error: " + e.Message);
480  }
481 #else
482  throw new NotSupportedException("Compression not supported in PCL.");
483 #endif
484  }
485 
486  // The page is not compressed
487  return pageArea.Read(buffer, off, length);
488  }
489 
491  long objOffset;
492  long maxSize;
493  long currentSize;
494  lock (fixedList) {
495  if (id.StoreId != Id)
496  throw new InvalidObjectIdException(id);
497 
498  var refId = id.Id;
499  // Assert that the blob reference id given is a valid range
500  if (refId < 0 || refId >= fixedList.NodeCount)
501  throw new InvalidObjectIdException(id);
502 
503  // Position on this record
504  IArea block = fixedList.GetRecord(refId);
505  // Read the information in the fixed record
506  int status = block.ReadInt4();
507  // Assert that the status is not deleted
508  if ((status & DeletedFlag) != 0)
509  throw new InvalidOperationException("Assertion failed: record is deleted!");
510 
511  // Get the reference count
512  int refCount = block.ReadInt4();
513  // Get the total size of the blob
514  maxSize = block.ReadInt8();
515  // Get the current running size
516  currentSize = block.ReadInt8();
517  // Get the blob pointer
518  objOffset = block.ReadInt8();
519  }
520 
521  IArea area = store.GetArea(objOffset);
522  area.Position = 0;
523  area.ReadInt4(); // (reserved)
524  // Read the type
525  int type = area.ReadInt4();
526  // The size of the block
527  long blockSize = area.ReadInt8();
528  // The number of pages in the blob
529  long pageCount = area.ReadInt8();
530 
531  bool compressed = (type & CompressedFlag) != 0;
532  return new LargeObject(this, id.Id, maxSize, currentSize, compressed, true);
533  }
534 
535  private void EstablishReference(long id) {
536  try {
537  lock (fixedList) {
538  // Update the record in the fixed list.
539  IArea block = fixedList.GetRecord(id);
540  var recordPos = block.Position;
541  int status = block.ReadInt4();
542  if (status != 1)
543  throw new Exception("Assertion failed: record is not static.");
544 
545  int refCount = block.ReadInt4();
546 
547  // Set the fixed blob record as complete.
548  block.Position = recordPos + 4;
549  block.WriteInt4(refCount + 1);
550  block.Flush();
551  }
552  } catch (IOException e) {
553  throw new Exception("IO Error: " + e.Message);
554  }
555  }
556 
557  private bool ReleaseReference(long id) {
558  try {
559  lock (fixedList) {
560  // Update the record in the fixed list.
561  IArea block = fixedList.GetRecord(id);
562  var recordPos = block.Position;
563  int status = block.ReadInt4();
564  if (status != 1)
565  throw new Exception("Assertion failed: Record is not static (status = " + status + ")");
566 
567  int refCount = block.ReadInt4();
568  if (refCount == 0)
569  throw new Exception("Releasing when IBlob reference counter is at 0.");
570 
571  var objSize = block.ReadInt8();
572  var objPos = block.ReadInt8();
573 
574  // If reference count == 0 then we need to free all the resources
575  // associated with this object in the store.
576  if ((refCount - 1) == 0) {
577  // Free the resources associated with this object.
578  IArea area = store.GetArea(objPos);
579  area.ReadInt4();
580 
581  var type = (byte)area.ReadInt4();
582  var totalSize = area.ReadInt8();
583  var pageCount = area.ReadInt8();
584 
585  // Free all of the pages in this blob.
586  for (long i = 0; i < pageCount; ++i) {
587  long pageOffset = area.ReadInt8();
588  if (pageOffset > 0)
589  store.DeleteArea(pageOffset);
590  }
591 
592  // Free the blob area object itself.
593  store.DeleteArea(objPos);
594 
595  // Write out the blank record.
596  block.Position = recordPos;
597  block.WriteInt4(DeletedFlag);
598  block.WriteInt4(0);
599  block.WriteInt8(-1);
600  block.WriteInt8(firstDeleteChainRecord);
601  // CHeck out these changes
602  block.Flush();
603  firstDeleteChainRecord = id;
604 
605  // Update the first_delete_chain_record field in the header
606  fixedList.WriteDeleteHead(firstDeleteChainRecord);
607  return true;
608  }
609 
610  // Simply decrement the reference counter for this record.
611  block.Position = recordPos + 4;
612  // Write the reference count - 1
613  block.WriteInt4(refCount - 1);
614  // Check out this change
615  block.Flush();
616  return false;
617  }
618  } catch (IOException e) {
619  throw new Exception("IO Error: " + e.Message);
620  }
621  }
622  }
623 }
ILargeObject CreateNewObject(long maxSize, bool compressed)
Creates a new large object returning a reference to it.
Definition: ObjectStore.cs:178
Defines a referenced object that can be accessed on a multi-phase level.
Definition: ILargeObject.cs:35
Defines the contract for stores that handle lrge objects within a database system.
Definition: IObjectStore.cs:35
int Read(byte[] buffer, int offset, int length)
Reads an array of bytes from the underlying IArea and advances the position by length ...
ILargeObject GetObject(ObjectId id)
Gets an object that was previously created for the given unique identifier.
Definition: ObjectStore.cs:490
long Position
Returns or sets the current position of the pointer within the area.
Definition: IArea.cs:48
int ReadObjectPart(long id, long objOffset, byte[] buffer, int off, int length)
Definition: ObjectStore.cs:404
long Id
Gets the unique identifier of the object within the containing store.
Definition: ObjectId.cs:65
void Write(byte[] buffer, int offset, int length)
int Read(long offset, byte[] buffer, int length)
Reads the content of the object, starting at a given offset, into the buffer given, for the number of bytes specified.
Definition: ObjectStore.cs:242
An interface for access the contents of an area of a store.
Definition: IArea.cs:23
A unique identifier of an object within a database system, that is composed by a reference to the sto...
Definition: ObjectId.cs:31
readonly FixedRecordList fixedList
Definition: ObjectStore.cs:33
void WriteObjectPart(long id, long objOffset, byte[] buffer, int off, int length)
Definition: ObjectStore.cs:311
void EstablishReference(long id)
Definition: ObjectStore.cs:535
void Complete()
Marks the object as complete.
Definition: ObjectStore.cs:254
void Write(long offset, byte[] buffer, int length)
Write the given binary content into the object, starting at the given offset for the number of bytes ...
Definition: ObjectStore.cs:246
void CompleteObject(LargeObject obj)
Definition: ObjectStore.cs:271
LargeObject(ObjectStore store, long refId, long size, long currentSize, bool compressed, bool isComplete)
Definition: ObjectStore.cs:220
ObjectStore(int id, IStore store)
Definition: ObjectStore.cs:37
long AddToRecordList(long recordOffset)
Definition: ObjectStore.cs:48
long Id
Returns the unique identifier that represents this area.
Definition: IArea.cs:31
A store is a resource where areas can be allocated and freed to store information (a memory allocator...
Definition: IStore.cs:56
void Establish()
Establishes a reference of the object to the underlying store which contains it.
Definition: ObjectStore.cs:258
bool Release()
Removes a reference of the object from the underlying store which contains it.
Definition: ObjectStore.cs:262