FileDocCategorySizeDatePackage
BioSender.javaAPI DocApache Tomcat 6.0.1411184Fri Jul 20 04:20:30 BST 2007org.apache.catalina.tribes.transport.bio

BioSender

public class BioSender extends org.apache.catalina.tribes.transport.AbstractSender implements org.apache.catalina.tribes.transport.DataSender
Send cluster messages with only one socket. Ack and keep Alive Handling is supported
author
Peter Rossbach
author
Filip Hanik
version
$Revision: 532608 $ $Date: 2007-04-26 06:58:20 +0200 (jeu., 26 avr. 2007) $
since
5.5.16

Fields Summary
private static org.apache.juli.logging.Log
log
protected static org.apache.catalina.tribes.util.StringManager
sm
The string manager for this package.
private static final String
info
The descriptive information about this implementation.
private Socket
socket
current sender socket
private OutputStream
soOut
private InputStream
soIn
protected org.apache.catalina.tribes.io.XByteBuffer
ackbuf
Constructors Summary
public BioSender()



    // ------------------------------------------------------------- Constructor
    
       
    
Methods Summary
protected voidcloseSocket()
close socket

see
DataSender#disconnect()
see
DataSender#closeSocket()

        if(isConnected()) {
             if (socket != null) {
                try {
                    socket.close();
                } catch (IOException x) {
                } finally {
                    socket = null;
                    soOut = null;
                    soIn = null;
                }
            }
            setRequestCount(0);
            setConnected(false);
            if (log.isDebugEnabled())
                log.debug(sm.getString("IDataSender.closeSocket",getAddress().getHostAddress(), new Integer(getPort()),new Long(0)));
       }
    
public voidconnect()
Connect other cluster member receiver

see
org.apache.catalina.tribes.transport.IDataSender#connect()

        openSocket();
   
public voiddisconnect()
disconnect and close socket

see
IDataSender#disconnect()

        boolean connect = isConnected();
        closeSocket();
        if (connect) {
            if (log.isDebugEnabled())
                log.debug(sm.getString("IDataSender.disconnect", getAddress().getHostAddress(), new Integer(getPort()), new Long(0)));
        }
        
    
public java.lang.StringgetInfo()
Return descriptive information about this implementation and the corresponding version number, in the format <description>/<version>.

        return (info);
    
protected voidopenSocket()
open real socket and set time out when waitForAck is enabled is socket open return directly

       if(isConnected()) return ;
       try {
           socket = new Socket();
           InetSocketAddress sockaddr = new InetSocketAddress(getAddress(), getPort());
           socket.connect(sockaddr,(int)getTimeout());
           socket.setSendBufferSize(getTxBufSize());
           socket.setReceiveBufferSize(getRxBufSize());
           socket.setSoTimeout( (int) getTimeout());
           socket.setTcpNoDelay(getTcpNoDelay());
           socket.setKeepAlive(getSoKeepAlive());
           socket.setReuseAddress(getSoReuseAddress());
           socket.setOOBInline(getOoBInline());
           socket.setSoLinger(getSoLingerOn(),getSoLingerTime());
           socket.setTrafficClass(getSoTrafficClass());
           setConnected(true);
           soOut = socket.getOutputStream();
           soIn  = socket.getInputStream();
           setRequestCount(0);
           setConnectTime(System.currentTimeMillis());
           if (log.isDebugEnabled())
               log.debug(sm.getString("IDataSender.openSocket", getAddress().getHostAddress(), new Integer(getPort()), new Long(0)));
      } catch (IOException ex1) {
          SenderState.getSenderState(getDestination()).setSuspect();
          if (log.isDebugEnabled())
              log.debug(sm.getString("IDataSender.openSocket.failure",getAddress().getHostAddress(), new Integer(getPort()),new Long(0)), ex1);
          throw (ex1);
        }
        
     
protected voidpushMessage(byte[] data, boolean reconnect, boolean waitForAck)
Push messages with only one socket at a time Wait for ack is needed and make auto retry when write message is failed. After sending error close and reopen socket again. After successfull sending update stats WARNING: Subclasses must be very carefull that only one thread call this pushMessage at once!!!

see
#closeSocket()
see
#openSocket()
see
#writeData(ChannelMessage)
param
data data to send
since
5.5.10

        keepalive();
        if ( reconnect ) closeSocket();
        if (!isConnected()) openSocket();
        soOut.write(data);
        soOut.flush();
        if (waitForAck) waitForAck();
        SenderState.getSenderState(getDestination()).setReady();

    
public voidsendMessage(byte[] data, boolean waitForAck)
Send message

see
org.apache.catalina.tribes.transport.IDataSender#sendMessage(, ChannelMessage)

        IOException exception = null;
        setAttempt(0);
        try {
             // first try with existing connection
             pushMessage(data,false,waitForAck);
        } catch (IOException x) {
            SenderState.getSenderState(getDestination()).setSuspect();
            exception = x;
            if (log.isTraceEnabled()) log.trace(sm.getString("IDataSender.send.again", getAddress().getHostAddress(),new Integer(getPort())),x);
            while ( getAttempt()<getMaxRetryAttempts() ) {
                try {
                    setAttempt(getAttempt()+1);
                    // second try with fresh connection
                    pushMessage(data, true,waitForAck);
                    exception = null;
                } catch (IOException xx) {
                    exception = xx;
                    closeSocket();
                }
            }
        } finally {
            setRequestCount(getRequestCount()+1);
            keepalive();
            if ( exception != null ) throw exception;
        }
    
public java.lang.StringtoString()
Name of this SockerSender

        StringBuffer buf = new StringBuffer("DataSender[(");
        buf.append(super.toString()).append(")");
        buf.append(getAddress()).append(":").append(getPort()).append("]");
        return buf.toString();
    
protected voidwaitForAck()
Wait for Acknowledgement from other server FIXME Please, not wait only for three charcters, better control that the wait ack message is correct.

param
timeout
throws
java.io.IOException
throws
java.net.SocketTimeoutException

        try {
            boolean ackReceived = false;
            boolean failAckReceived = false;
            ackbuf.clear();
            int bytesRead = 0;
            int i = soIn.read();
            while ((i != -1) && (bytesRead < Constants.ACK_COMMAND.length)) {
                bytesRead++;
                byte d = (byte)i;
                ackbuf.append(d);
                if (ackbuf.doesPackageExist() ) {
                    byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes();
                    ackReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
                    failAckReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
                    ackReceived = ackReceived || failAckReceived;
                    break;
                }
                i = soIn.read();
            }
            if (!ackReceived) {
                if (i == -1) throw new IOException(sm.getString("IDataSender.ack.eof",getAddress(), new Integer(socket.getLocalPort())));
                else throw new IOException(sm.getString("IDataSender.ack.wrong",getAddress(), new Integer(socket.getLocalPort())));
            } else if ( failAckReceived && getThrowOnFailedAck()) {
                throw new RemoteProcessException("Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA");
            }
        } catch (IOException x) {
            String errmsg = sm.getString("IDataSender.ack.missing", getAddress(),new Integer(socket.getLocalPort()), new Long(getTimeout()));
            if ( SenderState.getSenderState(getDestination()).isReady() ) {
                SenderState.getSenderState(getDestination()).setSuspect();
                if ( log.isWarnEnabled() ) log.warn(errmsg, x);
            } else {
                if ( log.isDebugEnabled() )log.debug(errmsg, x);
            }
            throw x;
        } finally {
            ackbuf.clear();
        }