Methods Summary |
---|
public boolean | accept(java.io.Serializable msg, org.apache.catalina.tribes.Member mbr)
return (msg instanceof LoadMessage) || (msg instanceof ByteMessage);
|
public static synchronized long | addSendStats(long count)
messagesSent+=count;
return 0l;
|
public static synchronized void | endTest()
threadCount--;
if ( messageEndSendTime == 0 && threadCount==0 ) messageEndSendTime = System.currentTimeMillis();
|
public static void | main(java.lang.String[] args)
boolean send = true;
boolean debug = false;
long pause = 0;
int count = 1000000;
int stats = 10000;
boolean breakOnEx = false;
int threads = 1;
boolean shutdown = false;
int startoptions = Channel.DEFAULT;
int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
if ( args.length == 0 ) {
args = new String[] {"-help"};
}
for (int i = 0; i < args.length; i++) {
if ("-threads".equals(args[i])) {
threads = Integer.parseInt(args[++i]);
} else if ("-count".equals(args[i])) {
count = Integer.parseInt(args[++i]);
System.out.println("Sending "+count+" messages.");
} else if ("-pause".equals(args[i])) {
pause = Long.parseLong(args[++i])*1000;
} else if ("-break".equals(args[i])) {
breakOnEx = true;
} else if ("-shutdown".equals(args[i])) {
shutdown = true;
} else if ("-stats".equals(args[i])) {
stats = Integer.parseInt(args[++i]);
System.out.println("Stats every "+stats+" message");
} else if ("-sendoptions".equals(args[i])) {
channelOptions = Integer.parseInt(args[++i]);
System.out.println("Setting send options to "+channelOptions);
} else if ("-startoptions".equals(args[i])) {
startoptions = Integer.parseInt(args[++i]);
System.out.println("Setting start options to "+startoptions);
} else if ("-size".equals(args[i])) {
size = Integer.parseInt(args[++i])-4;
System.out.println("Message size will be:"+(size+4)+" bytes");
} else if ("-mode".equals(args[i])) {
if ( "receive".equals(args[++i]) ) send = false;
} else if ("-debug".equals(args[i])) {
debug = true;
} else if ("-help".equals(args[i]))
{
usage();
System.exit(1);
}
}
ManagedChannel channel = (ManagedChannel)ChannelCreator.createChannel(args);
LoadTest test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
test.channelOptions = channelOptions;
LoadMessage msg = new LoadMessage();
messageSize = LoadMessage.getMessageSize(msg);
channel.addChannelListener(test);
channel.addMembershipListener(test);
channel.start(startoptions);
Runtime.getRuntime().addShutdownHook(new Shutdown(channel));
while ( threads > 1 ) {
Thread t = new Thread(test);
t.setDaemon(true);
t.start();
threads--;
test = new LoadTest(channel,send,count,debug,pause,stats,breakOnEx);
test.channelOptions = channelOptions;
}
test.run();
if ( shutdown && send ) channel.stop(channel.DEFAULT);
System.out.println("System test complete, sleeping to let threads finish.");
Thread.sleep(60*1000*60);
|
public void | memberAdded(org.apache.catalina.tribes.Member member)memberAdded
log.info("Member added:"+member);
synchronized (mutex) {
mutex.notifyAll();
}
|
public void | memberDisappeared(org.apache.catalina.tribes.Member member)memberDisappeared
log.info("Member disappeared:"+member);
|
public void | messageReceived(java.io.Serializable msg, org.apache.catalina.tribes.Member mbr)
if ( receiveStart == 0 ) receiveStart = System.currentTimeMillis();
if ( debug ) {
if ( msg instanceof LoadMessage ) {
printArray(((LoadMessage)msg).getMessage());
}
}
if ( msg instanceof ByteMessage && !(msg instanceof LoadMessage)) {
LoadMessage tmp = new LoadMessage();
tmp.setMessage(((ByteMessage)msg).getMessage());
msg = tmp;
tmp = null;
}
bytesReceived+=((LoadMessage)msg).getMessage().length;
mBytesReceived+=((float)((LoadMessage)msg).getMessage().length)/1024f/1024f;
messagesReceived++;
if ( (messagesReceived%statsInterval)==0 || (messagesReceived==msgCount)) {
float bytes = (float)(((LoadMessage)msg).getMessage().length*messagesReceived);
float seconds = ((float)(System.currentTimeMillis()-receiveStart)) / 1000f;
log.info("****RECEIVE STATS-"+Thread.currentThread().getName()+"*****"+
"\n\tMessage count :"+(long)messagesReceived+
"\n\tMessage/sec :"+messagesReceived/seconds+
"\n\tTotal bytes :"+(long)bytes+
"\n\tTotal mbytes :"+(long)mBytesReceived+
"\n\tTime since 1st:"+seconds+" seconds"+
"\n\tBytes/second :"+(bytes/seconds)+
"\n\tMBytes/second :"+(mBytesReceived/seconds)+"\n");
}
|
public static void | printArray(byte[] data)
System.out.print("{");
for (int i=0; i<data.length; i++ ) {
System.out.print(data[i]);
System.out.print(",");
}
System.out.println("} size:"+data.length);
|
private static void | printSendStats(long counter, int messageSize)
float cnt = (float)counter;
float size = (float)messageSize;
float time = (float)(System.currentTimeMillis()-messageStartSendTime) / 1000f;
log.info("****SEND STATS-"+Thread.currentThread().getName()+"*****"+
"\n\tMessage count:"+counter+
"\n\tTotal bytes :"+(long)(size*cnt)+
"\n\tTotal seconds:"+(time)+
"\n\tBytes/second :"+(size*cnt/time)+
"\n\tMBytes/second:"+(size*cnt/time/1024f/1024f));
|
public void | run()
long counter = 0;
long total = 0;
LoadMessage msg = new LoadMessage();
int messageSize = LoadTest.messageSize;
try {
startTest();
while (total < msgCount) {
if (channel.getMembers().length == 0 || (!send)) {
synchronized (mutex) {
try {
mutex.wait();
} catch (InterruptedException x) {
log.info("Thread interrupted from wait");
}
}
} else {
try {
//msg.setMsgNr((int)++total);
counter++;
if (debug) {
printArray(msg.getMessage());
}
channel.send(channel.getMembers(), msg, channelOptions);
if ( pause > 0 ) {
if ( debug) System.out.println("Pausing sender for "+pause+" ms.");
Thread.sleep(pause);
}
} catch (ChannelException x) {
if ( debug ) log.error("Unable to send message:"+x.getMessage(),x);
log.error("Unable to send message:"+x.getMessage());
ChannelException.FaultyMember[] faulty = x.getFaultyMembers();
for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]);
--counter;
if ( this.breakonChannelException ) throw x;
}
}
if ( (counter % statsInterval) == 0 && (counter > 0)) {
//add to the global counter
counter = addSendStats(counter);
//print from the global counter
//printSendStats(LoadTest.messagesSent, LoadTest.messageSize, LoadTest.messageSendTime);
printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
}
}
}catch ( Exception x ) {
log.error("Captured error while sending:"+x.getMessage());
if ( debug ) log.error("",x);
printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
}
endTest();
|
public static synchronized void | startTest()
threadCount++;
if ( messageStartSendTime == 0 ) messageStartSendTime = System.currentTimeMillis();
|
public static void | usage()
System.out.println("Tribes Load tester.");
System.out.println("The load tester can be used in sender or received mode or both");
System.out.println("Usage:\n\t"+
"java LoadTest [options]\n\t"+
"Options:\n\t\t"+
"[-mode receive|send|both] \n\t\t"+
"[-startoptions startflags (default is Channel.DEFAULT) ] \n\t\t"+
"[-debug] \n\t\t"+
"[-count messagecount] \n\t\t"+
"[-stats statinterval] \n\t\t"+
"[-pause nrofsecondstopausebetweensends] \n\t\t"+
"[-threads numberofsenderthreads] \n\t\t"+
"[-size messagesize] \n\t\t"+
"[-sendoptions channeloptions] \n\t\t"+
"[-break (halts execution on exception)]\n"+
"[-shutdown (issues a channel.stop() command after send is completed)]\n"+
"\tChannel options:"+
ChannelCreator.usage()+"\n\n"+
"Example:\n\t"+
"java LoadTest -port 4004\n\t"+
"java LoadTest -bind 192.168.0.45 -port 4005\n\t"+
"java LoadTest -bind 192.168.0.45 -port 4005 -mbind 192.168.0.45 -count 100 -stats 10\n");
|