DeveelDB  20151217
complete SQL database system, primarly developed for .NET/Mono frameworks
ObjectStream.cs
Go to the documentation of this file.
1 //
2 // Copyright 2010-2015 Deveel
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 using System;
18 using System.IO;
19 
20 namespace Deveel.Data.Store {
21  public sealed class ObjectStream : Stream {
22  private readonly ILargeObject largeObject;
23  private long position;
24  private Stream outTempStream;
25 
26  private long readBufferPos;
27  private readonly byte[] readBuf;
28 
29  private const int BufferSize = 64*1024;
30 
31  public ObjectStream(ILargeObject largeObject) {
32  if (largeObject == null)
33  throw new ArgumentNullException("largeObject");
34 
35  this.largeObject = largeObject;
36  outTempStream = new MemoryStream(64*1024);
37 
38  readBuf = new byte[BufferSize];
39  readBufferPos = -1;
40  }
41 
42  private void ReadPageContent(byte[] buffer, long pos, int length) {
43  largeObject.Read(pos, buffer, length);
44  }
45 
46  private void FillBuffer(long pos) {
47  long readPos = (pos/BufferSize)*BufferSize;
48  int toRead = (int) System.Math.Min(BufferSize, (largeObject.RawSize - readPos));
49  if (toRead > 0) {
50  ReadPageContent(readBuf, readPos, toRead);
51  readBufferPos = readPos;
52  }
53  }
54 
55  protected override void Dispose(bool disposing) {
56  if (disposing) {
57  if (outTempStream != null)
58  outTempStream.Dispose();
59  }
60 
61  outTempStream = null;
62  base.Dispose(disposing);
63  }
64 
65  public override void Flush() {
66  if (outTempStream == null ||
67  outTempStream.Length == 0)
68  return;
69 
70  try {
71  long offset = 0;
72  var buffer = new byte[BufferSize];
73  var totalLength = outTempStream.Length;
74 
75  outTempStream.Seek(0, SeekOrigin.Begin);
76 
77  while (offset < totalLength) {
78  // Fill the buffer
79  int index = 0;
80  int blockRead = (int) System.Math.Min((long) BufferSize, (totalLength - offset));
81  int toRead = blockRead;
82  while (toRead > 0) {
83  int count = outTempStream.Read(buffer, index, toRead);
84  if (count == 0)
85  break;
86 
87  index += count;
88  toRead -= count;
89  }
90 
91  // Send the part of the streamable object to the database.
92  largeObject.Write(offset, buffer, blockRead);
93  // Increment the offset and upload the next part of the object.
94  offset += blockRead;
95  }
96  } finally {
97  outTempStream.SetLength(0);
98  }
99  }
100 
101  public override long Seek(long offset, SeekOrigin origin) {
102  throw new NotImplementedException();
103  }
104 
105  public override void SetLength(long value) {
106  throw new NotSupportedException("The lenght of the underlying object cannot be changed.");
107  }
108 
109  public override int Read(byte[] buffer, int offset, int count) {
110  if (!largeObject.IsComplete)
111  throw new IOException("The underlying object is not complete.");
112 
113  if (count <= 0) {
114  return 0;
115  }
116 
117  if (readBufferPos == -1) {
118  FillBuffer(position);
119  }
120 
121  int p = (int) (position - readBufferPos);
122  long bufferEnd = System.Math.Min(readBufferPos + BufferSize, largeObject.RawSize);
123  int toRead = (int) System.Math.Min((long) count, bufferEnd - position);
124  if (toRead <= 0) {
125  return 0;
126  }
127  int hasRead = 0;
128  while (toRead > 0) {
129  Array.Copy(readBuf, p, buffer, offset, toRead);
130  hasRead += toRead;
131  p += toRead;
132  offset += toRead;
133  count -= toRead;
134  position += toRead;
135  if (p >= BufferSize) {
136  FillBuffer(readBufferPos + BufferSize);
137  p -= BufferSize;
138  }
139  bufferEnd = System.Math.Min(readBufferPos + BufferSize, largeObject.RawSize);
140  toRead = (int) System.Math.Min((long)count, bufferEnd - position);
141  }
142 
143  return hasRead;
144  }
145 
146  public override void Write(byte[] buffer, int offset, int count) {
147  if (largeObject.IsComplete)
148  throw new IOException("The underlying object is complete.");
149 
150  outTempStream.Write(buffer, offset, count);
151  position += count;
152  }
153 
154  public override bool CanRead {
155  get { return largeObject.IsComplete; }
156  }
157 
158  public override bool CanSeek {
159  get { return true; }
160  }
161 
162  public override bool CanWrite {
163  get { return !largeObject.IsComplete; }
164  }
165 
166  public override long Length {
167  get { return largeObject.RawSize; }
168  }
169 
170  public override long Position {
171  get { return position; }
172  set { position = Seek(value, SeekOrigin.Begin); }
173  }
174  }
175 }
override void SetLength(long value)
readonly ILargeObject largeObject
Definition: ObjectStream.cs:22
override long Seek(long offset, SeekOrigin origin)
Defines a referenced object that can be accessed on a multi-phase level.
Definition: ILargeObject.cs:35
void ReadPageContent(byte[] buffer, long pos, int length)
Definition: ObjectStream.cs:42
override void Write(byte[] buffer, int offset, int count)
override int Read(byte[] buffer, int offset, int count)
override void Dispose(bool disposing)
Definition: ObjectStream.cs:55
ObjectStream(ILargeObject largeObject)
Definition: ObjectStream.cs:31