FileDocCategorySizeDatePackage
SnmpQManager.javaAPI DocJava SE 5 API9978Fri Aug 26 14:55:06 BST 2005com.sun.jmx.snmp.daemon

SnmpQManager.java

/*
 * @(#)file      SnmpQManager.java
 * @(#)author    Sun Microsystems, Inc.
 * @(#)version   1.6
 * @(#)date      05/08/26
 *
 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
 * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 *
 */

package com.sun.jmx.snmp.daemon;

import java.util.Vector;
import java.io.Serializable;

// import debug stuff
//
import com.sun.jmx.trace.Trace;

/**
 * This class implements a server queue manager.
 * This class is for internal use.
 */

final class SnmpQManager implements Serializable {

    // VARIABLES
    //----------
        
    private SendQ  newq ;
    private WaitQ  waitq ;

    private ThreadGroup queueThreadGroup = null ;
    private Thread requestQThread = null ;
    private Thread timerQThread = null ;

    static String dbgTag = "SnmpQManager";
    
    
    // CONSTRUCTORS
    //-------------
    
    SnmpQManager() {
        newq = new SendQ(20, 5) ;
        waitq = new WaitQ(20, 5) ;

        queueThreadGroup = new ThreadGroup("Qmanager Thread Group") ;
    
        // TIME BOMB HERE
        startQThreads() ;
    }
    
    public void startQThreads() {
        if (timerQThread == null || timerQThread.isAlive() == false) {
            timerQThread   = new SnmpTimerServer(queueThreadGroup, this) ;
        }
        if (requestQThread == null || requestQThread.isAlive() == false) {
            requestQThread = new SnmpSendServer(queueThreadGroup, this) ;
        }
    }

    public void stopQThreads() {
        
        ((SnmpTimerServer)timerQThread).isBeingDestroyed = true;
        waitq.isBeingDestroyed = true;
        ((SnmpSendServer)requestQThread).isBeingDestroyed = true;
        newq.isBeingDestroyed = true;
        
        if (timerQThread != null && timerQThread.isAlive() == true) {   
            ((SnmpTimerServer)timerQThread).stopTimerServer();
        }
        waitq = null;
        timerQThread = null;
        
        if (requestQThread != null && requestQThread.isAlive() == true) {            
            ((SnmpSendServer)requestQThread).stopSendServer();
        }
        newq = null;
        requestQThread = null;
    }
    
    public void addRequest(SnmpInformRequest reqc) {
        newq.addRequest(reqc) ;
        return ;
    }

    public void addWaiting(SnmpInformRequest reqc) {
        waitq.addWaiting(reqc) ;
        return ;
    }

    public Vector getAllOutstandingRequest(long range) {
        return newq.getAllOutstandingRequest(range) ;
    }

    public SnmpInformRequest getTimeoutRequests() {
        return waitq.getTimeoutRequests() ;
    }

    public void removeRequest(SnmpInformRequest reqc) {
        newq.removeElement(reqc) ;
        waitq.removeElement(reqc) ;
    }

    public SnmpInformRequest removeRequest(long reqid) {
        SnmpInformRequest reqc = null ;

        if ((reqc = newq.removeRequest(reqid)) == null) 
            reqc = waitq.removeRequest(reqid) ;
	
        return reqc ;
    }

    // TRACES & DEBUG
    //---------------
    
    static boolean isTraceOn() {
        return Trace.isSelected(Trace.LEVEL_TRACE, Trace.INFO_ADAPTOR_SNMP);
    }

    static void trace(String clz, String func, String info) {
        Trace.send(Trace.LEVEL_TRACE, Trace.INFO_ADAPTOR_SNMP, clz, func, info);
    }

    static void trace(String func, String info) {
        SnmpQManager.trace(dbgTag, func, info);
    }
    
    static boolean isDebugOn() {
        return Trace.isSelected(Trace.LEVEL_DEBUG, Trace.INFO_ADAPTOR_SNMP);
    }

    static void debug(String clz, String func, String info) {
        Trace.send(Trace.LEVEL_DEBUG, Trace.INFO_ADAPTOR_SNMP, clz, func, info);
    }

    static void debug(String clz, String func, Throwable exception) {
        Trace.send(Trace.LEVEL_DEBUG, Trace.INFO_ADAPTOR_SNMP, clz, func, exception);
    }
    
    static void debug(String func, String info) {
        SnmpQManager.debug(dbgTag, func, info);
    }
    
    static void debug(String func, Throwable exception) {
        SnmpQManager.debug(dbgTag, func, exception);
    }
    
}

/**
 * This vector manages the inform requests to be sent to the manager.
 */
class SendQ extends Vector {
    
    SendQ(int initialCapacity, int capacityIncr) {
        super(initialCapacity , capacityIncr) ;
    }

    private synchronized void notifyClients() {
        this.notifyAll() ;
    }

    public synchronized void addRequest(SnmpInformRequest req) {
                
        long nextPoll = req.getAbsNextPollTime() ;

        int i ;
        for (i = size() ; i > 0 ; i--) {
            if (nextPoll < getRequestAt(i-1).getAbsNextPollTime())
                break ;
        }
        if (i == size()) {
            addElement(req) ;
            notifyClients() ;
        } else
            insertElementAt(req, i) ;
        return ;
    }

    public synchronized boolean waitUntilReady() {
        while (true) {
            if (isBeingDestroyed == true)
                return false;
            long tmp = 0 ;
            if (isEmpty() == false) {
                long currTime = System.currentTimeMillis() ;
                SnmpInformRequest req = (SnmpInformRequest) lastElement() ;
                tmp = req.getAbsNextPollTime() - currTime ;
                if (tmp <= 0) {
                    return true ;
                }
            }
            waitOnThisQueue(tmp) ;
        }
    }

    public synchronized Vector getAllOutstandingRequest(long margin) {
        int i ;
        Vector outreq = new Vector() ;
        while (true) {
            if (waitUntilReady() == true) {
                long refTime = System.currentTimeMillis() + margin ;

                for (i = size() ; i > 0 ; i--) {
                    SnmpInformRequest req = getRequestAt(i-1) ;
                    if (req.getAbsNextPollTime() > refTime) 
                        break ;
                    outreq.addElement(req) ;
                }
			
                if (! outreq.isEmpty()) {
                    elementCount -= outreq.size() ;
                    return outreq ;
                }
            }
            else
                return null;
        }
    }

    public synchronized void waitOnThisQueue(long time) {
        if (time == 0 && !isEmpty()) {
            if (SnmpQManager.isDebugOn()) {
                SnmpQManager.debug("waitOnThisQueue", "[" + Thread.currentThread().toString() + "]:" +
                                   "Fatal BUG :: Blocking on newq permenantly. But size = " + size());
            }
        }

        try {
            this.wait(time) ;
        } catch (InterruptedException e) {
        }
    }

    public SnmpInformRequest getRequestAt(int idx) {
        return (SnmpInformRequest)elementAt(idx) ;
    }
    
    public synchronized SnmpInformRequest removeRequest(long reqid) {
        int max= size() ;
        for (int i = 0 ; i < max ; i++) {
            SnmpInformRequest reqc = getRequestAt(i) ;
            if (reqid == reqc.getRequestId()) {
                removeElementAt(i) ;
                return reqc ;
            }
        }
        return null ;
    }

    // This boolean is used to stop handling requests while the corresponding SnmpQManager
    // is being destroyed.
    //
    boolean isBeingDestroyed = false;
}

/**
 * This vector manages the inform requests to be retried to the manager.
 */
class WaitQ extends Vector {
    
    WaitQ(int initialCapacity, int capacityIncr) {
        super(initialCapacity , capacityIncr) ;
    }

    public synchronized void addWaiting(SnmpInformRequest req) {
        
        long waitTime = req.getAbsMaxTimeToWait() ;
        int i ;
        for (i = size() ; i > 0 ; i--) {
            if (waitTime < getRequestAt(i-1).getAbsMaxTimeToWait())
                break ;
        }
        if (i == size()) {
            addElement(req) ;
            notifyClients() ;
        } else
            insertElementAt(req, i) ;
        return ;
    }

    public synchronized boolean waitUntilReady() {
        while (true) {
            if (isBeingDestroyed == true)
                return false;
            long tmp = 0 ;
            if (isEmpty() == false) {
                long currTime = System.currentTimeMillis() ;
                SnmpInformRequest req = (SnmpInformRequest) lastElement() ;
                tmp = req.getAbsMaxTimeToWait() - currTime ;
                if (tmp <= 0) {
                    return true ;
                }
            }
            waitOnThisQueue(tmp) ;
        }
    }

    public synchronized SnmpInformRequest getTimeoutRequests() {
        if (waitUntilReady() == true) {
            SnmpInformRequest req = (SnmpInformRequest) lastElement() ;
            elementCount-- ;
            return req ;
        }
        else {
            return null;
        }
    }

    private synchronized void notifyClients() {
        this.notifyAll() ;
    }

    public synchronized void waitOnThisQueue(long time) {
        if (time == 0 && !isEmpty()) {
            if (SnmpQManager.isDebugOn()) {
                SnmpQManager.debug("waitOnThisQueue", "[" + Thread.currentThread().toString() + "]:" +
                                   "Fatal BUG :: Blocking on waitq permenantly. But size = " + size());
            }
        }
				
        try {
            this.wait(time) ;
        } catch (InterruptedException e) {
        }
    }

    public SnmpInformRequest getRequestAt(int idx) {
        return (SnmpInformRequest)elementAt(idx) ;
    }

    public synchronized SnmpInformRequest removeRequest(long reqid) {
        int max= size();
        for (int i = 0 ; i < max ; i++) {
            SnmpInformRequest reqc = getRequestAt(i) ;
            if (reqid == reqc.getRequestId()) {
                removeElementAt(i) ;
                return reqc ;
            }
        }
        return null ;
    }

    // This boolean is used to stop handling requests while the corresponding SnmpQManager
    // is being destroyed.
    //
    boolean isBeingDestroyed = false;
}