FileDocCategorySizeDatePackage
LoadTest.javaAPI DocApache Tomcat 6.0.1416581Fri Jul 20 04:20:36 BST 2007org.apache.catalina.tribes.demos

LoadTest

public class LoadTest extends Object implements org.apache.catalina.tribes.ChannelListener, Runnable, org.apache.catalina.tribes.MembershipListener

Title:

Description:

Company:

author
not attributable
version
1.0

Fields Summary
protected static org.apache.juli.logging.Log
log
public static int
size
public static Object
mutex
public boolean
doRun
public long
bytesReceived
public float
mBytesReceived
public int
messagesReceived
public boolean
send
public boolean
debug
public int
msgCount
org.apache.catalina.tribes.ManagedChannel
channel
public int
statsInterval
public long
pause
public boolean
breakonChannelException
public boolean
async
public long
receiveStart
public int
channelOptions
static int
messageSize
public static long
messagesSent
public static long
messageStartSendTime
public static long
messageEndSendTime
public static int
threadCount
Constructors Summary
public LoadTest(org.apache.catalina.tribes.ManagedChannel channel, boolean send, int msgCount, boolean debug, long pause, int stats, boolean breakOnEx)

        this.channel = channel;
        this.send = send;
        this.msgCount = msgCount;
        this.debug = debug;
        this.pause = pause;
        this.statsInterval = stats;
        this.breakonChannelException = breakOnEx;
    
Methods Summary
public booleanaccept(java.io.Serializable msg, org.apache.catalina.tribes.Member mbr)

 
       return (msg instanceof LoadMessage) || (msg instanceof ByteMessage);
    
public static synchronized longaddSendStats(long count)

        messagesSent+=count;
        return 0l;
    
public static synchronized voidendTest()

        threadCount--;
        if ( messageEndSendTime == 0 && threadCount==0 ) messageEndSendTime = System.currentTimeMillis();
    
public static voidmain(java.lang.String[] args)

        boolean send = true;
        boolean debug = false;
        long pause = 0;
        int count = 1000000;
        int stats = 10000;
        boolean breakOnEx = false;
        int threads = 1;
        boolean shutdown = false;
        int startoptions = Channel.DEFAULT;
        int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
        if ( args.length == 0 ) {
            args = new String[] {"-help"};
        }
        for (int i = 0; i < args.length; i++) {
            if ("-threads".equals(args[i])) {
                threads = Integer.parseInt(args[++i]);
            } else if ("-count".equals(args[i])) {
                count = Integer.parseInt(args[++i]);
                System.out.println("Sending "+count+" messages.");
            } else if ("-pause".equals(args[i])) {
                pause = Long.parseLong(args[++i])*1000;
            } else if ("-break".equals(args[i])) {
                breakOnEx = true;
            } else if ("-shutdown".equals(args[i])) {
                shutdown = true;
            } else if ("-stats".equals(args[i])) {
                stats = Integer.parseInt(args[++i]);
                System.out.println("Stats every "+stats+" message");
            } else if ("-sendoptions".equals(args[i])) {
                channelOptions = Integer.parseInt(args[++i]);
                System.out.println("Setting send options to "+channelOptions);
            } else if ("-startoptions".equals(args[i])) {
                startoptions = Integer.parseInt(args[++i]);
                System.out.println("Setting start options to "+startoptions);
            } else if ("-size".equals(args[i])) {
                size = Integer.parseInt(args[++i])-4;
                System.out.println("Message size will be:"+(size+4)+" bytes");
            } else if ("-mode".equals(args[i])) {
                if ( "receive".equals(args[++i]) ) send = false;
            } else if ("-debug".equals(args[i])) {
                debug = true;
            } else if ("-help".equals(args[i])) 
            {
                usage();
                System.exit(1);
            }
        }
        
        ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args);
        
        LoadTest test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
        test.channelOptions = channelOptions;
        LoadMessage msg = new LoadMessage();
        
        messageSize = LoadMessage.getMessageSize(msg);
        channel.addChannelListener(test);
        channel.addMembershipListener(test);
        channel.start(startoptions);
        Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
        while ( threads > 1 ) {
            Thread t = new Thread(test);
            t.setDaemon(true);
            t.start();
            threads--;
            test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
            test.channelOptions = channelOptions;
        }
        test.run();
        if ( shutdown && send ) channel.stop(channel.DEFAULT);
        System.out.println("System test complete, sleeping to let threads finish.");
        Thread.sleep(60*1000*60);
    
public voidmemberAdded(org.apache.catalina.tribes.Member member)
memberAdded

param
member Member
todo
Implement this org.apache.catalina.tribes.MembershipListener method

        log.info("Member added:"+member);
        synchronized (mutex) {
            mutex.notifyAll();
        }
    
public voidmemberDisappeared(org.apache.catalina.tribes.Member member)
memberDisappeared

param
member Member
todo
Implement this org.apache.catalina.tribes.MembershipListener method

        log.info("Member disappeared:"+member);
    
public voidmessageReceived(java.io.Serializable msg, org.apache.catalina.tribes.Member mbr)

 
        if ( receiveStart == 0 ) receiveStart = System.currentTimeMillis();
        if ( debug ) {
            if ( msg instanceof LoadMessage ) {
                printArray(((LoadMessage)msg).getMessage());
            }
        }
        
        if ( msg instanceof ByteMessage && !(msg instanceof LoadMessage)) {
            LoadMessage tmp = new LoadMessage();
            tmp.setMessage(((ByteMessage)msg).getMessage());
            msg = tmp;
            tmp = null;
        }
        
        
        bytesReceived+=((LoadMessage)msg).getMessage().length;
        mBytesReceived+=((float)((LoadMessage)msg).getMessage().length)/1024f/1024f;
        messagesReceived++;
        if ( (messagesReceived%statsInterval)==0 || (messagesReceived==msgCount)) {
            float bytes = (float)(((LoadMessage)msg).getMessage().length*messagesReceived);
            float seconds = ((float)(System.currentTimeMillis()-receiveStart)) / 1000f;
            log.info("****RECEIVE STATS-"+Thread.currentThread().getName()+"*****"+
                     "\n\tMessage count :"+(long)messagesReceived+
                     "\n\tMessage/sec   :"+messagesReceived/seconds+
                     "\n\tTotal bytes   :"+(long)bytes+
                     "\n\tTotal mbytes  :"+(long)mBytesReceived+
                     "\n\tTime since 1st:"+seconds+" seconds"+
                     "\n\tBytes/second  :"+(bytes/seconds)+
                     "\n\tMBytes/second :"+(mBytesReceived/seconds)+"\n");

        }
    
public static voidprintArray(byte[] data)

        System.out.print("{");
        for (int i=0; i<data.length; i++ ) {
            System.out.print(data[i]);
            System.out.print(",");
        }
        System.out.println("} size:"+data.length);
    
private static voidprintSendStats(long counter, int messageSize)

        float cnt = (float)counter;
        float size = (float)messageSize;
        float time = (float)(System.currentTimeMillis()-messageStartSendTime) / 1000f;
        log.info("****SEND STATS-"+Thread.currentThread().getName()+"*****"+
                 "\n\tMessage count:"+counter+
                 "\n\tTotal bytes  :"+(long)(size*cnt)+
                 "\n\tTotal seconds:"+(time)+
                 "\n\tBytes/second :"+(size*cnt/time)+
                 "\n\tMBytes/second:"+(size*cnt/time/1024f/1024f));
    
public voidrun()

        
        long counter = 0;
        long total = 0;
        LoadMessage msg = new LoadMessage();
        int messageSize = LoadTest.messageSize;
        
        try {
            startTest();
            while (total < msgCount) {
                if (channel.getMembers().length == 0 || (!send)) {
                    synchronized (mutex) {
                        try {
                            mutex.wait();
                        } catch (InterruptedException x) {
                            log.info("Thread interrupted from wait");
                        }
                    }
                } else {
                    try {
                        //msg.setMsgNr((int)++total);
                        counter++;
                        if (debug) {
                            printArray(msg.getMessage());
                        }
                        channel.send(channel.getMembers(), msg, channelOptions);
                        if ( pause > 0 ) {
                            if ( debug) System.out.println("Pausing sender for "+pause+" ms.");
                            Thread.sleep(pause);
                        }
                    } catch (ChannelException x) {
                        if ( debug ) log.error("Unable to send message:"+x.getMessage(),x);
                        log.error("Unable to send message:"+x.getMessage());
                        ChannelException.FaultyMember[] faulty = x.getFaultyMembers();
                        for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]);
                        --counter;
                        if ( this.breakonChannelException ) throw x;
                    }
                }
                if ( (counter % statsInterval) == 0 && (counter > 0)) {
                    //add to the global counter
                    counter = addSendStats(counter);
                    //print from the global counter
                    //printSendStats(LoadTest.messagesSent, LoadTest.messageSize, LoadTest.messageSendTime);
                    printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
                    
                }

            }
        }catch ( Exception x ) {
            log.error("Captured error while sending:"+x.getMessage());
            if ( debug ) log.error("",x);
            printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
        }
        endTest();
    
public static synchronized voidstartTest()

    
         
        threadCount++;
        if ( messageStartSendTime == 0 ) messageStartSendTime = System.currentTimeMillis();
    
public static voidusage()

        System.out.println("Tribes Load tester.");
        System.out.println("The load tester can be used in sender or received mode or both");
        System.out.println("Usage:\n\t"+
                           "java LoadTest [options]\n\t"+
                           "Options:\n\t\t"+
                           "[-mode receive|send|both]  \n\t\t"+
                           "[-startoptions startflags (default is Channel.DEFAULT) ]  \n\t\t"+
                           "[-debug]  \n\t\t"+
                           "[-count messagecount]  \n\t\t"+
                           "[-stats statinterval]  \n\t\t"+
                           "[-pause nrofsecondstopausebetweensends]  \n\t\t"+
                           "[-threads numberofsenderthreads]  \n\t\t"+
                           "[-size messagesize]  \n\t\t"+
                           "[-sendoptions channeloptions]  \n\t\t"+
                           "[-break (halts execution on exception)]\n"+
                           "[-shutdown (issues a channel.stop() command after send is completed)]\n"+
                           "\tChannel options:"+
                           ChannelCreator.usage()+"\n\n"+
                           "Example:\n\t"+
                           "java LoadTest -port 4004\n\t"+
                           "java LoadTest -bind 192.168.0.45 -port 4005\n\t"+
                           "java LoadTest -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n");