DeveelDB  20151217
complete SQL database system, primarly developed for .NET/Mono frameworks
LengthMarkedBufferedInputStream.cs
Go to the documentation of this file.
1 using System;
2 using System.IO;
3 
4 using Deveel.Data.Util;
5 
6 namespace Deveel.Data.Protocol {
16  public sealed class LengthMarkedBufferedInputStream : Stream {
20  private const int InitialBufferSize = 512;
21 
25  private readonly IInputStream input;
26 
30  private byte[] buf;
31 
35  private int count;
36 
41  private int markedLength;
42 
46  private int markedIndex;
47 
49  this.input = input;
50  buf = new byte[InitialBufferSize];
51  count = 0;
52  markedLength = -1;
53  markedIndex = -1;
54  }
55 
63  private void EnsureCapacity(int newSize) {
64  int old_size = buf.Length;
65  if (newSize > old_size) {
66  int cap = (old_size * 3) / 2 + 1;
67  if (cap < newSize)
68  cap = newSize;
69  byte[] oldBuf = buf;
70  buf = new byte[cap];
71  // // Copy all the contents except the first 4 bytes (the size marker)
72  Array.Copy(oldBuf, 0, buf, 0, count);
73  }
74  }
75 
86  private void HandleEndReached() {
87  // Move anything from the end of the buffer to the start.
88  Array.Copy(buf, markedIndex, buf, 0, count - markedLength);
89  count -= markedLength;
90 
91  // Reset the state
92  markedLength = -1;
93  markedIndex = -1;
94  }
95 
96  // ---------- Overwritten from Stream ----------
97 
98  public override int ReadByte() {
99  lock (this) {
100  if (markedIndex == -1)
101  throw new IOException("No mark has been read yet.");
102 
103  if (markedIndex >= markedLength) {
104  string debugMsg = "Read over end of length marked buffer. ";
105  debugMsg += "(marked_index=" + markedIndex;
106  debugMsg += ",marked_length=" + markedLength + ")";
107  debugMsg += ")";
108  throw new IOException(debugMsg);
109  }
110  int n = buf[markedIndex++] & 0x0FF;
111  if (markedIndex >= markedLength) {
112  HandleEndReached();
113  }
114  return n;
115  }
116  }
117 
118  public override void Write(byte[] buffer, int offset, int count) {
119  throw new NotSupportedException();
120  }
121 
122  public override bool CanRead {
123  get { return true; }
124  }
125 
126  public override bool CanSeek {
127  get { return false; }
128  }
129 
130  public override bool CanWrite {
131  get { return false; }
132  }
133 
134  public override long Length {
135  get { throw new NotImplementedException(); }
136  }
137 
138  public override long Position {
139  get { throw new NotImplementedException(); }
140  set { throw new NotImplementedException(); }
141  }
142 
143  public override void Flush() {
144  }
145 
146  public override long Seek(long offset, SeekOrigin origin) {
147  throw new NotSupportedException();
148  }
149 
150  public override void SetLength(long value) {
151  throw new NotSupportedException();
152  }
153 
154  public override int Read(byte[] b, int off, int len) {
155  lock (this) {
156  if (markedIndex == -1)
157  throw new IOException("No mark has been read yet.");
158 
159  int readUpto = markedIndex + len;
160  if (readUpto > markedLength) {
161  String debug_msg = "Read over end of length marked buffer. ";
162  debug_msg += "(marked_index=" + markedIndex;
163  debug_msg += ",len=" + len;
164  debug_msg += ",marked_length=" + markedLength + ")";
165  throw new IOException(debug_msg);
166  }
167  Array.Copy(buf, markedIndex, b, off, len);
168  markedIndex = readUpto;
169  if (markedIndex >= markedLength) {
170  HandleEndReached();
171  }
172  return len;
173  }
174  }
175 
176  public int Available {
177  get {
178  lock (this) {
179  // This method only returns a non 0 value if there is a complete command
180  // waiting on the stream.
181  if (markedLength >= 0) {
182  return (markedLength - markedIndex);
183  }
184  return 0;
185  }
186  }
187  }
188 
189  // ---------- These methods aid in reading state from the stream ----------
190 
204  public bool PollForCommand(int maxSize) {
205  lock (this) {
206  if (markedLength == -1) {
207  int available = input.Available;
208  if (count > 0 || available > 0) {
209  if ((count + available) > maxSize) {
210  throw new IOException("Marked length is greater than max size ( " +
211  (count + available) + " > " + maxSize + " )");
212  }
213 
214  EnsureCapacity(count + available);
215  int readIn = input.Read(buf, count, available);
216 
217  if (readIn == 0) {
218  //TODO: Check this format...
219  // throw new EndOfStreamException();
220 
221  // zero bytes read means that the stream is finished...
222  return false;
223  }
224  count = count + readIn;
225 
226  // Check: Is a complete command available?
227  if (count >= 4) {
228  int lengthMarker = ByteBuffer.ReadInt4(buf, 0);
229 
230  if (count >= lengthMarker + 4) {
231  // Yes, complete command available.
232  // mark this area up.
233  markedLength = lengthMarker + 4;
234  markedIndex = 4;
235  return true;
236  }
237  }
238  }
239  }
240  return false;
241  }
242  }
243 
247  public void BlockForCommand() {
248  lock (this) {
249  while (true) {
250  // Is there a command available?
251  if (count >= 4) {
252  int lengthMarker = ByteBuffer.ReadInt4(buf, 0);
253  if (count >= lengthMarker + 4) {
254  // Yes, complete command available.
255  // mark this area up.
256  markedLength = lengthMarker + 4;
257  markedIndex = 4;
258  return;
259  }
260  }
261 
262  // If the buffer is full grow it larger.
263  if (count >= buf.Length) {
264  EnsureCapacity(count + InitialBufferSize);
265  }
266  // Read in a block of data, block if nothing there
267  int read_in = input.Read(buf, count, buf.Length - count);
268  if (read_in == 0) {
269  //TODO: Check this format...
270  // throw new EndOfStreamException();
271 
272  // zero bytes read means that the stream is finished...
273  return;
274  }
275  count += read_in;
276  }
277  }
278  }
279  }
280 }
Reads a command block on the underlying stream that is constrained by a length marker preceeding the ...
Represents a stream that supports required functionalities for a LengthMarkedBufferedInputStream ...
Definition: IInputStream.cs:10
int ReadInt4()
Reads an integer from the buffer at the current position.
Definition: ByteBuffer.cs:128
readonly IInputStream input
The chained InputStream that is underneath this object.
void EnsureCapacity(int newSize)
Ensures that the buffer is large enough to store the given value.
void BlockForCommand()
Blocks until a complete command has been read in.
int markedLength
The area of the buffer that is marked as being an available command. If it's -1 then there is no area...
A wrapper for an array of byte.
Definition: ByteBuffer.cs:27
bool PollForCommand(int maxSize)
Checks to see if there is a complete command waiting on the input stream.
int markedIndex
The current index of the marked area that is being read.
byte[] buf
The buffer that is used to read in whatever is on the stream.
override void Write(byte[] buffer, int offset, int count)
void HandleEndReached()
Called when the end of the marked length is reached.