BioSenderpublic class BioSender extends org.apache.catalina.tribes.transport.AbstractSender implements org.apache.catalina.tribes.transport.DataSenderSend cluster messages with only one socket. Ack and keep Alive Handling is
supported |
Fields Summary |
---|
private static org.apache.juli.logging.Log | log | protected static org.apache.catalina.tribes.util.StringManager | smThe string manager for this package. | private static final String | infoThe descriptive information about this implementation. | private Socket | socketcurrent sender socket | private OutputStream | soOut | private InputStream | soIn | protected org.apache.catalina.tribes.io.XByteBuffer | ackbuf |
Constructors Summary |
---|
public BioSender()
// ------------------------------------------------------------- Constructor
|
Methods Summary |
---|
protected void | closeSocket()close socket
if(isConnected()) {
if (socket != null) {
try {
socket.close();
} catch (IOException x) {
} finally {
socket = null;
soOut = null;
soIn = null;
}
}
setRequestCount(0);
setConnected(false);
if (log.isDebugEnabled())
log.debug(sm.getString("IDataSender.closeSocket",getAddress().getHostAddress(), new Integer(getPort()),new Long(0)));
}
| public void | connect()Connect other cluster member receiver
openSocket();
| public void | disconnect()disconnect and close socket
boolean connect = isConnected();
closeSocket();
if (connect) {
if (log.isDebugEnabled())
log.debug(sm.getString("IDataSender.disconnect", getAddress().getHostAddress(), new Integer(getPort()), new Long(0)));
}
| public java.lang.String | getInfo()Return descriptive information about this implementation and the
corresponding version number, in the format
<description>/<version> .
return (info);
| protected void | openSocket()open real socket and set time out when waitForAck is enabled
is socket open return directly
if(isConnected()) return ;
try {
socket = new Socket();
InetSocketAddress sockaddr = new InetSocketAddress(getAddress(), getPort());
socket.connect(sockaddr,(int)getTimeout());
socket.setSendBufferSize(getTxBufSize());
socket.setReceiveBufferSize(getRxBufSize());
socket.setSoTimeout( (int) getTimeout());
socket.setTcpNoDelay(getTcpNoDelay());
socket.setKeepAlive(getSoKeepAlive());
socket.setReuseAddress(getSoReuseAddress());
socket.setOOBInline(getOoBInline());
socket.setSoLinger(getSoLingerOn(),getSoLingerTime());
socket.setTrafficClass(getSoTrafficClass());
setConnected(true);
soOut = socket.getOutputStream();
soIn = socket.getInputStream();
setRequestCount(0);
setConnectTime(System.currentTimeMillis());
if (log.isDebugEnabled())
log.debug(sm.getString("IDataSender.openSocket", getAddress().getHostAddress(), new Integer(getPort()), new Long(0)));
} catch (IOException ex1) {
SenderState.getSenderState(getDestination()).setSuspect();
if (log.isDebugEnabled())
log.debug(sm.getString("IDataSender.openSocket.failure",getAddress().getHostAddress(), new Integer(getPort()),new Long(0)), ex1);
throw (ex1);
}
| protected void | pushMessage(byte[] data, boolean reconnect, boolean waitForAck)Push messages with only one socket at a time
Wait for ack is needed and make auto retry when write message is failed.
After sending error close and reopen socket again.
After successfull sending update stats
WARNING: Subclasses must be very carefull that only one thread call this pushMessage at once!!!
keepalive();
if ( reconnect ) closeSocket();
if (!isConnected()) openSocket();
soOut.write(data);
soOut.flush();
if (waitForAck) waitForAck();
SenderState.getSenderState(getDestination()).setReady();
| public void | sendMessage(byte[] data, boolean waitForAck)Send message
IOException exception = null;
setAttempt(0);
try {
// first try with existing connection
pushMessage(data,false,waitForAck);
} catch (IOException x) {
SenderState.getSenderState(getDestination()).setSuspect();
exception = x;
if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.again", getAddress().getHostAddress(),new Integer(getPort())),x);
while ( getAttempt()<getMaxRetryAttempts() ) {
try {
setAttempt(getAttempt()+1);
// second try with fresh connection
pushMessage(data, true,waitForAck);
exception = null;
} catch (IOException xx) {
exception = xx;
closeSocket();
}
}
} finally {
setRequestCount(getRequestCount()+1);
keepalive();
if ( exception != null ) throw exception;
}
| public java.lang.String | toString()Name of this SockerSender
StringBuffer buf = new StringBuffer("DataSender[(");
buf.append(super.toString()).append(")");
buf.append(getAddress()).append(":").append(getPort()).append("]");
return buf.toString();
| protected void | waitForAck()Wait for Acknowledgement from other server
FIXME Please, not wait only for three charcters, better control that the wait ack message is correct.
try {
boolean ackReceived = false;
boolean failAckReceived = false;
ackbuf.clear();
int bytesRead = 0;
int i = soIn.read();
while ((i != -1) && (bytesRead < Constants.ACK_COMMAND.length)) {
bytesRead++;
byte d = (byte)i;
ackbuf.append(d);
if (ackbuf.doesPackageExist() ) {
byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes();
ackReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
failAckReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
ackReceived = ackReceived || failAckReceived;
break;
}
i = soIn.read();
}
if (!ackReceived) {
if (i == -1) throw new IOException(sm.getString("IDataSender.ack.eof",getAddress(), new Integer(socket.getLocalPort())));
else throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new Integer(socket.getLocalPort())));
} else if ( failAckReceived && getThrowOnFailedAck()) {
throw new RemoteProcessException("Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA");
}
} catch (IOException x) {
String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),new Integer(socket.getLocalPort()), new Long(getTimeout()));
if ( SenderState.getSenderState(getDestination()).isReady() ) {
SenderState.getSenderState(getDestination()).setSuspect();
if ( log.isWarnEnabled() ) log.warn(errmsg, x);
} else {
if ( log.isDebugEnabled() )log.debug(errmsg, x);
}
throw x;
} finally {
ackbuf.clear();
}
|
|