XByteBufferpublic class XByteBuffer extends Object The XByteBuffer provides a dual functionality.
One, it stores message bytes and automatically extends the byte buffer if needed.
Two, it can encode and decode packages so that they can be defined and identified
as they come in on a socket.
THIS CLASS IS NOT THREAD SAFE
Transfer package:
- START_DATA/b> - 7 bytes - FLT2002
- SIZE - 4 bytes - size of the data package
- DATA - should be as many bytes as the prev SIZE
- END_DATA - 7 bytes - TLF2003
|
Fields Summary |
---|
public static org.apache.juli.logging.Log | log | public static final byte[] | START_DATAThis is a package header, 7 bytes (FLT2002) | public static final byte[] | END_DATAThis is the package footer, 7 bytes (TLF2003) | private static final int | DEF_SIZEDefault size on the initial byte buffer | private static final int | DEF_EXTDefault size to extend the buffer with | protected byte[] | bufVariable to hold the data | protected int | bufSizeCurrent length of data in the buffer | protected boolean | discardFlag for discarding invalid packages
If this flag is set to true, and append(byte[],...) is called,
the data added will be inspected, and if it doesn't start with
START_DATA it will be thrown away. | public static int | invokecount |
Constructors Summary |
---|
public XByteBuffer(int size, boolean discard)Constructs a new XByteBuffer
buf = new byte[size];
this.discard = discard;
| public XByteBuffer(byte[] data, boolean discard)
this(data,data.length+128,discard);
| public XByteBuffer(byte[] data, int size, boolean discard)
int length = Math.max(data.length,size);
buf = new byte[length];
System.arraycopy(data,0,buf,0,data.length);
bufSize = data.length;
this.discard = discard;
|
Methods Summary |
---|
public boolean | append(java.nio.ByteBuffer b, int len)Appends the data to the buffer. If the data is incorrectly formatted, ie, the data should always start with the
header, false will be returned and the data will be discarded.
int newcount = bufSize + len;
if (newcount > buf.length) {
expand(newcount);
}
b.get(buf,bufSize,len);
bufSize = newcount;
if ( discard ) {
if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {
bufSize = 0;
log.error("Discarded the package, invalid header");
return false;
}
}
return true;
| public boolean | append(byte i)
int newcount = bufSize + 1;
if (newcount > buf.length) {
expand(newcount);
}
buf[bufSize] = i;
bufSize = newcount;
return true;
| public boolean | append(boolean i)
int newcount = bufSize + 1;
if (newcount > buf.length) {
expand(newcount);
}
XByteBuffer.toBytes(i,buf,bufSize);
bufSize = newcount;
return true;
| public boolean | append(long i)
int newcount = bufSize + 8;
if (newcount > buf.length) {
expand(newcount);
}
XByteBuffer.toBytes(i,buf,bufSize);
bufSize = newcount;
return true;
| public boolean | append(int i)
int newcount = bufSize + 4;
if (newcount > buf.length) {
expand(newcount);
}
XByteBuffer.toBytes(i,buf,bufSize);
bufSize = newcount;
return true;
| public boolean | append(byte[] b, int off, int len)
if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return false;
}
int newcount = bufSize + len;
if (newcount > buf.length) {
expand(newcount);
}
System.arraycopy(b, off, buf, bufSize, len);
bufSize = newcount;
if ( discard ) {
if (bufSize > START_DATA.length && (firstIndexOf(buf, 0, START_DATA) == -1)) {
bufSize = 0;
log.error("Discarded the package, invalid header");
return false;
}
}
return true;
| public void | clear()Resets the buffer
bufSize = 0;
| public int | countPackages()Internal mechanism to make a check if a complete package exists
within the buffer
return countPackages(false);
| public int | countPackages(boolean first)
int cnt = 0;
int pos = START_DATA.length;
int start = 0;
while ( start < bufSize ) {
//first check start header
int index = XByteBuffer.firstIndexOf(buf,start,START_DATA);
//if the header (START_DATA) isn't the first thing or
//the buffer isn't even 14 bytes
if ( index != start || ((bufSize-start)<14) ) break;
//next 4 bytes are compress flag not needed for count packages
//then get the size 4 bytes
int size = toInt(buf, pos);
//now the total buffer has to be long enough to hold
//START_DATA.length+4+size+END_DATA.length
pos = start + START_DATA.length + 4 + size;
if ( (pos + END_DATA.length) > bufSize) break;
//and finally check the footer of the package END_DATA
int newpos = firstIndexOf(buf, pos, END_DATA);
//mismatch, there is no package
if (newpos != pos) break;
//increase the packet count
cnt++;
//reset the values
start = pos + END_DATA.length;
pos = start + START_DATA.length;
//we only want to verify that we have at least one package
if ( first ) break;
}
return cnt;
| public static byte[] | createDataPackage(ChannelData cdata)Creates a complete data package
// return createDataPackage(cdata.getDataPackage());
//avoid one extra byte array creation
int dlength = cdata.getDataPackageLength();
int length = getDataPackageLength(dlength);
byte[] data = new byte[length];
int offset = 0;
System.arraycopy(START_DATA, 0, data, offset, START_DATA.length);
offset += START_DATA.length;
toBytes(dlength,data, START_DATA.length);
offset += 4;
cdata.getDataPackage(data,offset);
offset += dlength;
System.arraycopy(END_DATA, 0, data, offset, END_DATA.length);
offset += END_DATA.length;
return data;
| public static byte[] | createDataPackage(byte[] data, int doff, int dlength, byte[] buffer, int bufoff)
if ( (buffer.length-bufoff) > getDataPackageLength(dlength) ) {
throw new ArrayIndexOutOfBoundsException("Unable to create data package, buffer is too small.");
}
System.arraycopy(START_DATA, 0, buffer, bufoff, START_DATA.length);
toBytes(data.length,buffer, bufoff+START_DATA.length);
System.arraycopy(data, doff, buffer, bufoff+START_DATA.length + 4, dlength);
System.arraycopy(END_DATA, 0, buffer, bufoff+START_DATA.length + 4 + data.length, END_DATA.length);
return buffer;
| public static byte[] | createDataPackage(byte[] data)
int length = getDataPackageLength(data.length);
byte[] result = new byte[length];
return createDataPackage(data,0,data.length,result,0);
| public static java.io.Serializable | deserialize(byte[] data)
return deserialize(data,0,data.length);
| public static java.io.Serializable | deserialize(byte[] data, int offset, int length)
return deserialize(data,offset,length,null);
| public static java.io.Serializable | deserialize(byte[] data, int offset, int length, java.lang.ClassLoader[] cls)
synchronized (XByteBuffer.class) { invokecount++;}
Object message = null;
if ( cls == null ) cls = new ClassLoader[0];
if (data != null) {
InputStream instream = new ByteArrayInputStream(data,offset,length);
ObjectInputStream stream = null;
stream = (cls.length>0)? new ReplicationStream(instream,cls):new ObjectInputStream(instream);
message = stream.readObject();
instream.close();
stream.close();
}
if ( message == null ) {
return null;
} else if (message instanceof Serializable)
return (Serializable) message;
else {
throw new ClassCastException("Message has the wrong class. It should implement Serializable, instead it is:"+message.getClass().getName());
}
| public boolean | doesPackageExist()Method to check if a package exists in this byte buffer.
return (countPackages(true)>0);
| public void | expand(int newcount)
//don't change the allocation strategy
byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
System.arraycopy(buf, 0, newbuf, 0, bufSize);
buf = newbuf;
| public org.apache.catalina.tribes.io.XByteBuffer | extractDataPackage(boolean clearFromBuffer)Extracts the message bytes from a package.
If no package exists, a IllegalStateException will be thrown.
int psize = countPackages(true);
if (psize == 0) {
throw new java.lang.IllegalStateException("No package exists in XByteBuffer");
}
int size = toInt(buf, START_DATA.length);
XByteBuffer xbuf = BufferPool.getBufferPool().getBuffer(size,false);
xbuf.setLength(size);
System.arraycopy(buf, START_DATA.length + 4, xbuf.getBytesDirect(), 0, size);
if (clearFromBuffer) {
int totalsize = START_DATA.length + 4 + size + END_DATA.length;
bufSize = bufSize - totalsize;
System.arraycopy(buf, totalsize, buf, 0, bufSize);
}
return xbuf;
| public ChannelData | extractPackage(boolean clearFromBuffer)
XByteBuffer xbuf = extractDataPackage(clearFromBuffer);
ChannelData cdata = ChannelData.getDataFromPackage(xbuf);
return cdata;
| public static int | firstIndexOf(byte[] src, int srcOff, byte[] find)Similar to a String.IndexOf, but uses pure bytes
int result = -1;
if (find.length > src.length) return result;
if (find.length == 0 || src.length == 0) return result;
if (srcOff >= src.length ) throw new java.lang.ArrayIndexOutOfBoundsException();
boolean found = false;
int srclen = src.length;
int findlen = find.length;
byte first = find[0];
int pos = srcOff;
while (!found) {
//find the first byte
while (pos < srclen){
if (first == src[pos])
break;
pos++;
}
if (pos >= srclen)
return -1;
//we found the first character
//match the rest of the bytes - they have to match
if ( (srclen - pos) < findlen)
return -1;
//assume it does exist
found = true;
for (int i = 1; ( (i < findlen) && found); i++)
found = found && (find[i] == src[pos + i]);
if (found)
result = pos;
else if ( (srclen - pos) < findlen)
return -1; //no more matches possible
else
pos++;
}
return result;
| public byte[] | getBytes()Returns the bytes in the buffer, in its exact length
byte[] b = new byte[bufSize];
System.arraycopy(buf,0,b,0,bufSize);
return b;
| public byte[] | getBytesDirect()
return this.buf;
| public int | getCapacity()
return buf.length;
| public static int | getDataPackageLength(int datalength)
int length =
START_DATA.length + //header length
4 + //data length indicator
datalength + //actual data length
END_DATA.length; //footer length
return length;
| public boolean | getDiscard()
return discard;
| public int | getLength()
return bufSize;
| public void | reset()
bufSize = 0;
| public static byte[] | serialize(java.io.Serializable msg)Serializes a message into cluster data
ByteArrayOutputStream outs = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(outs);
out.writeObject(msg);
out.flush();
byte[] data = outs.toByteArray();
return data;
| public void | setDiscard(boolean discard)
this.discard = discard;
| public void | setLength(int size)
if ( size > buf.length ) throw new ArrayIndexOutOfBoundsException("Size is larger than existing buffer.");
bufSize = size;
| public static boolean | toBoolean(byte[] b, int offset)
return b[offset] != 0;
| public static byte[] | toBytes(boolean bool)Converts an integer to four bytes
byte[] b = new byte[1] ;
return toBytes(bool,b,0);
| public static byte[] | toBytes(boolean bool, byte[] data, int offset)
data[offset] = (byte)(bool?1:0);
return data;
| public static byte[] | toBytes(int n)Converts an integer to four bytes
return toBytes(n,new byte[4],0);
| public static byte[] | toBytes(int n, byte[] b, int offset)
b[offset+3] = (byte) (n);
n >>>= 8;
b[offset+2] = (byte) (n);
n >>>= 8;
b[offset+1] = (byte) (n);
n >>>= 8;
b[offset+0] = (byte) (n);
return b;
| public static byte[] | toBytes(long n)Converts an long to eight bytes
return toBytes(n,new byte[8],0);
| public static byte[] | toBytes(long n, byte[] b, int offset)
b[offset+7] = (byte) (n);
n >>>= 8;
b[offset+6] = (byte) (n);
n >>>= 8;
b[offset+5] = (byte) (n);
n >>>= 8;
b[offset+4] = (byte) (n);
n >>>= 8;
b[offset+3] = (byte) (n);
n >>>= 8;
b[offset+2] = (byte) (n);
n >>>= 8;
b[offset+1] = (byte) (n);
n >>>= 8;
b[offset+0] = (byte) (n);
return b;
| public static int | toInt(byte[] b, int off)Convert four bytes to an int
return ( ( (int) b[off+3]) & 0xFF) +
( ( ( (int) b[off+2]) & 0xFF) << 8) +
( ( ( (int) b[off+1]) & 0xFF) << 16) +
( ( ( (int) b[off+0]) & 0xFF) << 24);
| public static long | toLong(byte[] b, int off)Convert eight bytes to a long
return ( ( (long) b[off+7]) & 0xFF) +
( ( ( (long) b[off+6]) & 0xFF) << 8) +
( ( ( (long) b[off+5]) & 0xFF) << 16) +
( ( ( (long) b[off+4]) & 0xFF) << 24) +
( ( ( (long) b[off+3]) & 0xFF) << 32) +
( ( ( (long) b[off+2]) & 0xFF) << 40) +
( ( ( (long) b[off+1]) & 0xFF) << 48) +
( ( ( (long) b[off+0]) & 0xFF) << 56);
| public void | trim(int length)
if ( (bufSize - length) < 0 )
throw new ArrayIndexOutOfBoundsException("Can't trim more bytes than are available. length:"+bufSize+" trim:"+length);
bufSize -= length;
|
|