FileDocCategorySizeDatePackage
ChannelSocket.javaAPI DocApache Tomcat 6.0.1428576Fri Jul 20 04:20:32 BST 2007org.apache.jk.common

ChannelSocket

public class ChannelSocket extends org.apache.jk.core.JkHandler implements NotificationBroadcaster, org.apache.jk.core.JkChannel
Accept ( and send ) TCP messages.
author
Costin Manolache
author
Bill Barker jmx:mbean name="jk:service=ChannelNioSocket" description="Accept socket connections" jmx:notification name="org.apache.coyote.INVOKE jmx:notification-handler name="org.apache.jk.JK_SEND_PACKET jmx:notification-handler name="org.apache.jk.JK_RECEIVE_PACKET jmx:notification-handler name="org.apache.jk.JK_FLUSH Jk can use multiple protocols/transports. Various container adapters should load this object ( as a bean ), set configurations and use it. Note that the connector will handle all incoming protocols - it's not specific to ajp1x. The protocol is abstracted by MsgContext/Message/Channel. A lot of the 'original' behavior is hardcoded - this uses Ajp13 wire protocol, TCP, Ajp14 API etc. As we add other protocols/transports/APIs this will change, the current goal is to get the same level of functionality as in the original jk connector. XXX Make the 'message type' pluggable

Fields Summary
private static org.apache.juli.logging.Log
log
private int
startPort
private int
maxPort
private int
port
private InetAddress
inet
private int
serverTimeout
private boolean
tcpNoDelay
private int
linger
private int
socketTimeout
private int
bufferSize
private int
packetSize
private long
requestCount
org.apache.tomcat.util.threads.ThreadPool
tp
ServerSocket
sSocket
final int
socketNote
final int
isNote
final int
osNote
final int
notifNote
boolean
paused
ObjectName
tpOName
ObjectName
rgOName
org.apache.coyote.RequestGroupInfo
global
int
JMXRequestNote
protected boolean
running
private NotificationBroadcasterSupport
nSupport
MBeanNotificationInfo[]
notifInfo
Constructors Summary
public ChannelSocket()
jmx:managed-constructor description="default constructor"


    /* ==================== Tcp socket options ==================== */

            
      
        // This should be integrated with the  domain setup
    
Methods Summary
public voidaccept(org.apache.jk.core.MsgContext ep)

        if( sSocket==null ) return;
        synchronized(this) {
            while(paused) {
                try{ 
                    wait();
                } catch(InterruptedException ie) {
                    //Ignore, since can't happen
                }
            }
        }
        Socket s=sSocket.accept();
        ep.setNote( socketNote, s );
        if(log.isDebugEnabled() )
            log.debug("Accepted socket " + s );

        try {
            setSocketOptions(s);
        } catch(SocketException sex) {
            log.debug("Error initializing Socket Options", sex);
        }
        
        requestCount++;

        InputStream is=new BufferedInputStream(s.getInputStream());
        OutputStream os;
        if( bufferSize > 0 )
            os = new BufferedOutputStream( s.getOutputStream(), bufferSize);
        else
            os = s.getOutputStream();
        ep.setNote( isNote, is );
        ep.setNote( osNote, os );
        ep.setControl( tp );
    
voidacceptConnections()
Accept incoming connections, dispatch to the thread pool

    
                 
      
        if( log.isDebugEnabled() )
            log.debug("Accepting ajp connections on " + port);
        while( running ) {
	    try{
                MsgContext ep=createMsgContext(packetSize);
                ep.setSource(this);
                ep.setWorkerEnv( wEnv );
                this.accept(ep);

                if( !running ) break;
                
                // Since this is a long-running connection, we don't care
                // about the small GC
                SocketConnection ajpConn=
                    new SocketConnection(this, ep);
                tp.runIt( ajpConn );
	    }catch(Exception ex) {
                if (running)
                    log.warn("Exception executing accept" ,ex);
	    }
        }
    
public voidaddNotificationListener(javax.management.NotificationListener listener, javax.management.NotificationFilter filter, java.lang.Object handback)


       
                                         
                                         
             
    
        if( nSupport==null ) nSupport=new NotificationBroadcasterSupport();
        nSupport.addNotificationListener(listener, filter, handback);
    
public voidclose(org.apache.jk.core.MsgContext ep)

        Socket s=(Socket)ep.getNote( socketNote );
        s.close();
    
public voiddestroy()

        running = false;
        try {
            /* If we disabled the channel return */
            if (port == 0)
                return;
            tp.shutdown();

	    if(!paused) {
		unLockSocket();
	    }

            sSocket.close(); // XXX?
            
            if( tpOName != null )  {
                Registry.getRegistry(null, null).unregisterComponent(tpOName);
            }
            if( rgOName != null ) {
                Registry.getRegistry(null, null).unregisterComponent(rgOName);
            }
        } catch(Exception e) {
            log.info("Error shutting down the channel " + port + " " +
                    e.toString());
            if( log.isDebugEnabled() ) log.debug("Trace", e);
        }
    
public intflush(org.apache.jk.core.Msg msg, org.apache.jk.core.MsgContext ep)

        if( bufferSize > 0 ) {
            OutputStream os=(OutputStream)ep.getNote( osNote );
            os.flush();
        }
        return 0;
    
public java.lang.StringgetAddress()

        if( inet!=null)
            return inet.toString();
        return "/0.0.0.0";
    
public intgetBufferSize()

        return bufferSize;
    
public java.lang.StringgetChannelName()

        String encodedAddr = "";
        if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
            encodedAddr = getAddress();
            if (encodedAddr.startsWith("/"))
                encodedAddr = encodedAddr.substring(1);
	    encodedAddr = URLEncoder.encode(encodedAddr) + "-";
        }
        return ("jk-" + encodedAddr + port);
    
public booleangetDaemon()

        return tp.getDaemon();
    
public intgetInstanceId()
At startup we'll look for the first free port in the range. The difference between this port and the beggining of the range is the 'id'. This is usefull for lb cases ( less config ).

        return port-startPort;
    
public intgetMaxPort()

        return maxPort;
    
public intgetMaxSpareThreads()

        return tp.getMaxSpareThreads();   
    
public intgetMaxThreads()

        return tp.getMaxThreads();   
    
public intgetMinSpareThreads()

        return tp.getMinSpareThreads();   
    
public javax.management.MBeanNotificationInfo[]getNotificationInfo()

        return notifInfo;
    
public intgetPacketSize()

        return packetSize;
    
public intgetPort()

        return port;
    
public longgetRequestCount()

        return requestCount;
    
public intgetServerTimeout()

        return serverTimeout;
    
public intgetSoLinger()

        return linger;
    
public intgetSoTimeout()

	return socketTimeout;
    
public booleangetTcpNoDelay()

        return tcpNoDelay;
    
public org.apache.tomcat.util.threads.ThreadPoolgetThreadPool()

        return tp;
    
public voidinit()
jmx:managed-operation

        // Find a port.
        if (startPort == 0) {
            port = 0;
            if(log.isInfoEnabled())
                log.info("JK: ajp13 disabling channelSocket");
            running = true;
            return;
        }
        if (maxPort < startPort)
            maxPort = startPort;
        for( int i=startPort; i<=maxPort; i++ ) {
            try {
                if( inet == null ) {
                    sSocket = new ServerSocket( i, 0 );
                } else {
                    sSocket=new ServerSocket( i, 0, inet );
                }
                port=i;
                break;
            } catch( IOException ex ) {
                if(log.isInfoEnabled())
                    log.info("Port busy " + i + " " + ex.toString());
                continue;
            }
        }

        if( sSocket==null ) {
            log.error("Can't find free port " + startPort + " " + maxPort );
            return;
        }
        if(log.isInfoEnabled())
            log.info("JK: ajp13 listening on " + getAddress() + ":" + port );

        // If this is not the base port and we are the 'main' channleSocket and
        // SHM didn't already set the localId - we'll set the instance id
        if( "channelSocket".equals( name ) &&
            port != startPort &&
            (wEnv.getLocalId()==0) ) {
            wEnv.setLocalId(  port - startPort );
        }
        if( serverTimeout > 0 )
            sSocket.setSoTimeout( serverTimeout );

        // XXX Reverse it -> this is a notification generator !!
        if( next==null && wEnv!=null ) {
            if( nextName!=null )
                setNext( wEnv.getHandler( nextName ) );
            if( next==null )
                next=wEnv.getHandler( "dispatch" );
            if( next==null )
                next=wEnv.getHandler( "request" );
        }
        JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");
        running = true;

        // Run a thread that will accept connections.
        // XXX Try to find a thread first - not sure how...
        if( this.domain != null ) {
            try {
                tpOName=new ObjectName(domain + ":type=ThreadPool,name=" + 
                                       getChannelName());

                Registry.getRegistry(null, null)
                    .registerComponent(tp, tpOName, null);

                rgOName = new ObjectName
                    (domain+":type=GlobalRequestProcessor,name=" + getChannelName());
                Registry.getRegistry(null, null)
                    .registerComponent(global, rgOName, null);
            } catch (Exception e) {
                log.error("Can't register threadpool" );
            }
        }

        tp.start();
        SocketAcceptor acceptAjp=new SocketAcceptor(  this );
        tp.runIt( acceptAjp);

    
public intinvoke(org.apache.jk.core.Msg msg, org.apache.jk.core.MsgContext ep)

        int type=ep.getType();

        switch( type ) {
        case JkHandler.HANDLE_RECEIVE_PACKET:
            if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? ");
            return receive( msg, ep );
        case JkHandler.HANDLE_SEND_PACKET:
            return send( msg, ep );
        case JkHandler.HANDLE_FLUSH:
            return flush( msg, ep );
        }

        if( log.isDebugEnabled() )
            log.debug("Call next " + type + " " + next);

        // Send notification
        if( nSupport!=null ) {
            Notification notif=(Notification)ep.getNote(notifNote);
            if( notif==null ) {
                notif=new Notification("channelSocket.message", ep, requestCount );
                ep.setNote( notifNote, notif);
            }
            nSupport.sendNotification(notif);
        }

        if( next != null ) {
            return next.invoke( msg, ep );
        } else {
            log.info("No next ");
        }

        return OK;
    
public booleanisSameAddress(org.apache.jk.core.MsgContext ep)

        Socket s=(Socket)ep.getNote( socketNote );
        return isSameAddress( s.getLocalAddress(), s.getInetAddress());
    
public static booleanisSameAddress(java.net.InetAddress server, java.net.InetAddress client)
Return true if the specified client and server addresses are the same. This method works around a bug in the IBM 1.1.8 JVM on Linux, where the address bytes are returned reversed in some circumstances.

param
server The server's InetAddress
param
client The client's InetAddress

	// Compare the byte array versions of the two addresses
	byte serverAddr[] = server.getAddress();
	byte clientAddr[] = client.getAddress();
	if (serverAddr.length != clientAddr.length)
	    return (false);
	boolean match = true;
	for (int i = 0; i < serverAddr.length; i++) {
	    if (serverAddr[i] != clientAddr[i]) {
		match = false;
		break;
	    }
	}
	if (match)
	    return (true);

	// Compare the reversed form of the two addresses
	for (int i = 0; i < serverAddr.length; i++) {
	    if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i])
		return (false);
	}
	return (true);
    
public voidopen(org.apache.jk.core.MsgContext ep)

    
public voidpause()


         
        synchronized(this) {
            paused = true;
            unLockSocket();
        }
    
voidprocessConnection(org.apache.jk.core.MsgContext ep)
Process a single ajp connection.

        try {
            MsgAjp recv=new MsgAjp(packetSize);
            while( running ) {
                if(paused) { // Drop the connection on pause
                    break;
                }
                int status= this.receive( recv, ep );
                if( status <= 0 ) {
                    if( status==-3)
                        log.debug( "server has been restarted or reset this connection" );
                    else 
                        log.warn("Closing ajp connection " + status );
                    break;
                }
                ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
                
                ep.setType( 0 );
                // Will call next
                status= this.invoke( recv, ep );
                if( status!= JkHandler.OK ) {
                    log.warn("processCallbacks status " + status );
                    break;
                }
            }
        } catch( Exception ex ) {
            String msg = ex.getMessage();
            if( msg != null && msg.indexOf( "Connection reset" ) >= 0)
                log.debug( "Server has been restarted or reset this connection");
            else if (msg != null && msg.indexOf( "Read timed out" ) >=0 )
                log.debug( "connection timeout reached");            
            else
                log.error( "Error, processing connection", ex);
        } finally {
	    	/*
	    	 * Whatever happened to this connection (remote closed it, timeout, read error)
	    	 * the socket SHOULD be closed, or we may be in situation where the webserver
	    	 * will continue to think the socket is still open and will forward request
	    	 * to tomcat without receiving ever a reply
	    	 */
            try {
                this.close( ep );
            }
            catch( Exception e) {
                log.error( "Error, closing connection", e);
            }
            try{
                Request req = (Request)ep.getRequest();
                if( req != null ) {
                    ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
                    if( roname != null ) {
                        Registry.getRegistry(null, null).unregisterComponent(roname);
                    }
                    req.getRequestProcessor().setGlobalProcessor(null);
                }
            } catch( Exception ee) {
                log.error( "Error, releasing connection",ee);
            }
        }
    
public intread(org.apache.jk.core.MsgContext ep, byte[] b, int offset, int len)
Read N bytes from the InputStream, and ensure we got them all Under heavy load we could experience many fragmented packets just read Unix Network Programming to recall that a call to read didn't ensure you got all the data you want from read() Linux manual On success, the number of bytes read is returned (zero indicates end of file),and the file position is advanced by this number. It is not an error if this number is smaller than the number of bytes requested; this may happen for example because fewer bytes are actually available right now (maybe because we were close to end-of-file, or because we are reading from a pipe, or from a terminal), or because read() was interrupted by a signal. On error, -1 is returned, and errno is set appropriately. In this case it is left unspecified whether the file position (if any) changes.

        InputStream is=(InputStream)ep.getNote( isNote );
        int pos = 0;
        int got;

        while(pos < len) {
            try {
                got = is.read(b, pos + offset, len - pos);
            } catch(SocketException sex) {
                if(pos > 0) {
                    log.info("Error reading data after "+pos+"bytes",sex);
                } else {
                    log.debug("Error reading data", sex);
                }
                got = -1;
            }
            if (log.isTraceEnabled()) {
                log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " +
                          offset + " " + len + " = " + got );
            }

            // connection just closed by remote. 
            if (got <= 0) {
                // This happens periodically, as apache restarts
                // periodically.
                // It should be more gracefull ! - another feature for Ajp14
                // log.warn( "server has closed the current connection (-1)" );
                return -3;
            }

            pos += got;
        }
        return pos;
    
public intreceive(org.apache.jk.core.Msg msg, org.apache.jk.core.MsgContext ep)

        if (log.isDebugEnabled()) {
            log.debug("receive() ");
        }

        byte buf[]=msg.getBuffer();
        int hlen=msg.getHeaderLength();
        
	// XXX If the length in the packet header doesn't agree with the
	// actual number of bytes read, it should probably return an error
	// value.  Also, callers of this method never use the length
	// returned -- should probably return true/false instead.

        int rd = this.read(ep, buf, 0, hlen );
        
        if(rd < 0) {
            // Most likely normal apache restart.
            // log.warn("Wrong message " + rd );
            return rd;
        }

        msg.processHeader();

        /* After processing the header we know the body
           length
        */
        int blen=msg.getLen();
        
	// XXX check if enough space - it's assert()-ed !!!
        
 	int total_read = 0;
        
        total_read = this.read(ep, buf, hlen, blen);
        
        if ((total_read <= 0) && (blen > 0)) {
            log.warn("can't read body, waited #" + blen);
            return  -1;
        }
        
        if (total_read != blen) {
             log.warn( "incomplete read, waited #" + blen +
                        " got only " + total_read);
            return -2;
        }
        
	return total_read;
    
public voidregisterRequest(org.apache.coyote.Request req, org.apache.jk.core.MsgContext ep, int count)

        if(this.domain != null) {
            try {
                RequestInfo rp=req.getRequestProcessor();
                rp.setGlobalProcessor(global);
                ObjectName roname = new ObjectName
                    (getDomain() + ":type=RequestProcessor,worker="+
                     getChannelName()+",name=JkRequest" +count);
                ep.setNote(JMXRequestNote, roname);
                        
                Registry.getRegistry(null, null).registerComponent( rp, roname, null);
            } catch( Exception ex ) {
                log.warn("Error registering request");
            }
        }
    
public voidreinit()
Called after you change some fields at runtime using jmx. Experimental for now.

        destroy();
        init();
    
public voidremoveNotificationListener(javax.management.NotificationListener listener)

        if( nSupport!=null)
            nSupport.removeNotificationListener(listener);
    
public voidresetCounters()

        requestCount=0;
    
public voidresume()

        synchronized(this) {
            paused = false;
            notify();
        }
    
public intsend(org.apache.jk.core.Msg msg, org.apache.jk.core.MsgContext ep)

        msg.end(); // Write the packet header
        byte buf[]=msg.getBuffer();
        int len=msg.getLen();
        
        if(log.isTraceEnabled() )
            log.trace("send() " + len + " " + buf[4] );

        OutputStream os=(OutputStream)ep.getNote( osNote );
        os.write( buf, 0, len );
        return len;
    
public voidsendNewMessageNotification(javax.management.Notification notification)

        if( nSupport!= null )
            nSupport.sendNotification(notification);
    
public voidsetAddress(java.net.InetAddress inet)

        this.inet=inet;
    
public voidsetAddress(java.lang.String inet)
jmx:managed-attribute description="Bind on a specified address" access="READ_WRITE"

        try {
            this.inet= InetAddress.getByName( inet );
        } catch( Exception ex ) {
            log.error("Error parsing "+inet,ex);
        }
    
public voidsetBacklog(int i)

    
public voidsetBufferSize(int bs)

        bufferSize = bs;
    
public voidsetDaemon(boolean b)
If set to false, the thread pool will be created in non-daemon mode, and will prevent main from exiting

        tp.setDaemon( b );
    
public voidsetMaxPort(int i)

        maxPort=i;
    
public voidsetMaxSpareThreads(int i)

        if( log.isDebugEnabled()) log.debug("Setting maxSpareThreads " + i);
        tp.setMaxSpareThreads(i);
    
public voidsetMaxThreads(int i)

        if( log.isDebugEnabled()) log.debug("Setting maxThreads " + i);
        tp.setMaxThreads(i);
    
public voidsetMinSpareThreads(int i)

        if( log.isDebugEnabled()) log.debug("Setting minSpareThreads " + i);
        tp.setMinSpareThreads(i);
    
public voidsetNotificationInfo(javax.management.MBeanNotificationInfo[] info)


         
        this.notifInfo=info;
    
public voidsetPacketSize(int ps)

        if(ps < 8*1024) {
            ps = 8*1024;
        }
        packetSize = ps;
    
public voidsetPort(int port)
Set the port for the ajp13 channel. To support seemless load balancing and jni, we treat this as the 'base' port - we'll try up until we find one that is not used. We'll also provide the 'difference' to the main coyote handler - that will be our 'sessionID' and the position in the scoreboard and the suffix for the unix domain socket. jmx:managed-attribute description="Port to listen" access="READ_WRITE"

        this.startPort=port;
        this.port=port;
        this.maxPort=port+10;
    
public voidsetServerTimeout(int timeout)
Sets the timeout in ms of the server sockets created by this server. This method allows the developer to make servers more or less responsive to having their server sockets shut down.

By default this value is 1000ms.

	this.serverTimeout = timeout;
    
public voidsetSoLinger(int i)

	linger=i;
    
public voidsetSoTimeout(int i)

	socketTimeout=i;
    
private voidsetSocketOptions(java.net.Socket s)

        if( socketTimeout > 0 ) 
            s.setSoTimeout( socketTimeout );
        
        s.setTcpNoDelay( tcpNoDelay ); // set socket tcpnodelay state

        if( linger > 0 )
            s.setSoLinger( true, linger);
    
public voidsetTcpNoDelay(boolean b)

	tcpNoDelay=b;
    
public voidstart()


        
        if( sSocket==null )
            init();
    
public voidstop()

        destroy();
    
private voidunLockSocket()

        // Need to create a connection to unlock the accept();
        Socket s;
        InetAddress ladr = inet;

        if(port == 0)
            return;
        if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) {
            ladr = InetAddress.getLocalHost();
        }
        s=new Socket(ladr, port );
        // setting soLinger to a small value will help shutdown the
        // connection quicker
        s.setSoLinger(true, 0);

	s.close();