FileDocCategorySizeDatePackage
PipelinedMsgParser.javaAPI DocphoneME MR2 API (J2ME)12605Wed May 02 18:00:42 BST 2007gov.nist.siplite.parser

PipelinedMsgParser

public final class PipelinedMsgParser extends Object implements Runnable
This implements a pipelined message parser suitable for use with a stream - oriented input such as TCP. The client uses this class by instatiating with an input stream from which input is read and fed to a message parser. It keeps reading from the input stream and process messages in a never ending interpreter loop. The message listener interface gets called for processing messages or for processing errors. The payload specified by the content-length header is read directly from the input stream. This can be accessed from the Message using the getContent and getContentBytes methods provided by the Message class.
version
JAIN-SIP-1.1 This code is in the public domain. It was noticed that the parser was blocking so I threw out some cool pipelining which ran fast but only worked when the phase of the full moon matched its mood. Now things are serialized and life goes slower but more reliably.
see
SIPMessageListener

Fields Summary
protected SIPMessageListener
sipMessageListener
The message listener that is registered with this parser. (The message listener has methods that can process correct and erroneous messages.)
private Thread
mythread
Handle to the preprocssor thread.
private InputStream
rawInputStream
Raw data input to be processed.
Constructors Summary
protected PipelinedMsgParser()
Default constructor.

        super();
        
    
public PipelinedMsgParser(SIPMessageListener sipMessageListener, InputStream in, boolean debug)
Constructor when we are given a message listener and an input stream (could be a TCP connection or a file)

param
sipMessageListener Message listener which has methods that get called back from the parser when a parse is complete
param
in Input stream from which to read the input.
param
debug Enable/disable tracing or lexical analyser switch.

        this();
        this.sipMessageListener = sipMessageListener;
        rawInputStream = in;
        mythread = new Thread(this);
        
    
public PipelinedMsgParser(SIPMessageListener mhandler, InputStream in)
This is the constructor for the pipelined parser.

param
mhandler a MessageListener implementation that provides the message handlers to handle correctly and incorrectly parsed messages.
param
in An input stream to read messages from.

        this(mhandler, in, false);
    
public PipelinedMsgParser(InputStream in)
This is the constructor for the pipelined parser.

param
in - An input stream to read messages from.

        this(null, in, false);
    
Methods Summary
protected java.lang.Objectclone()
Create a new pipelined parser from an existing one.

return
A new pipelined parser that reads from the same input stream.

        PipelinedMsgParser p = new PipelinedMsgParser();
        
        p.rawInputStream = this.rawInputStream;
        p.sipMessageListener = this.sipMessageListener;
        return p;
    
public voidprocessInput()
Start reading and processing input.

        mythread.start();
    
private java.lang.StringreadLine(java.io.InputStream inputStream)
Reads a line of input (I cannot use buffered reader because we may need to switch encodings mid-stream!

param
inputStream source for data to be processed
return
the line of text retrieved
exception
IOException if an error occurs reading from input

        StringBuffer retval = new StringBuffer("");
        while (true) {
                char ch;
                int i = inputStream.read();
                if (i == -1) {
                    throw new IOException("End of stream");
                } else {
                    ch = (char) i;
                }
                if (ch != '\r") {
                    retval.append(ch);
                }
                if (ch == '\n") {
                    break;
                }
        }
        return retval.toString();
    
private java.lang.StringreadToBreak(java.io.InputStream inputStream)
Reads to the next break (CRLFCRLF sequence).

param
inputStream source for data to be processed
return
the text up to the break character sequence
exception
IOException if an error occurs reading from input

        StringBuffer retval = new StringBuffer("");
        boolean flag = false;
        while (true) {
                char ch;
                int i = inputStream.read();
                if (i == -1)
                    break;
                else
                    ch = (char) i;
                if (ch != '\r")
                    retval.append(ch);
                if (ch == '\n") {
                    if (flag)
                        break;
                    else
                        flag = true;
                }
        }
        return retval.toString();
    
public voidrun()
This is input reading thread for the pipelined parser. You feed it input through the input stream (see the constructor) and it calls back an event listener interface for message processing or error. It cleans up the input - dealing with things like line continuation

        
        InputStream inputStream = this.rawInputStream;
        
        // I cannot use buffered reader here because we may need to switch
        // encodings to read the message body.
        try {
            while (true) {
                StringBuffer inputBuffer = new StringBuffer();
                
                if (Logging.REPORT_LEVEL <= Logging.INFORMATION) {
                    Logging.report(Logging.INFORMATION, LogChannels.LC_JSR180,
                        "Starting parse!");
                }
                
                String line1;
                String line2 = null;
                
                // ignore blank lines.
                while (true) {
                    line1 = readLine(inputStream);

                    if (line1.equals("\n")) {
                        if (Logging.REPORT_LEVEL <= Logging.INFORMATION) {
                            Logging.report(Logging.INFORMATION,
                                LogChannels.LC_JSR180,
                                "Discarding " + line1);
                        }
                        continue;
                    } else {
                        break;
                    }
                }
                if (Logging.REPORT_LEVEL <= Logging.INFORMATION) {
                    Logging.report(Logging.INFORMATION, LogChannels.LC_JSR180,
                        "line1 = " + line1);
                }
                
                inputBuffer.append(line1);
                
                while (true) {
                    line2 = readLine(inputStream);
                    inputBuffer.append(line2);
                    if (line2.trim().equals("")) break;
                }

                inputBuffer.append(line2);
                StringMsgParser smp =
                        new StringMsgParser(sipMessageListener);
                smp.readBody = false;
                Message sipMessage = null;

                try {
                    sipMessage =
                            smp.parseSIPMessage(inputBuffer.toString());
                    if (sipMessage == null) continue;
                } catch (ParseException ex) {
                    // Just ignore the parse exception.
                    continue;
                }
                
                if (Logging.REPORT_LEVEL <= Logging.INFORMATION) {
                    Logging.report(Logging.INFORMATION, LogChannels.LC_JSR180,
                        "Completed parsing message");
                }
                
                ContentLengthHeader cl =
                        sipMessage.getContentLengthHeader();
                int contentLength = 0;
                if (cl != null) {
                    contentLength = cl.getContentLength();
                } else {
                    contentLength = 0;
                }
                
                if (contentLength == 0) {
                    if (Logging.REPORT_LEVEL <= Logging.INFORMATION) {
                        Logging.report(Logging.INFORMATION,
                            LogChannels.LC_JSR180,
                            "content length " + contentLength);
                    }
                    sipMessage.removeContent();
                } else { // deal with the message body.
                    contentLength = cl.getContentLength();
                    
                    if (Logging.REPORT_LEVEL <= Logging.INFORMATION) {
                        Logging.report(Logging.INFORMATION,
                            LogChannels.LC_JSR180,
                            "content length " + contentLength);
                    }
                    
                    byte[] message_body =  new byte[contentLength];
                    int nread = 0;
                    
                    while (nread < contentLength) {
                        int readlength =
                                inputStream.read
                                (message_body, nread,
                                contentLength -
                                nread);
                                
                        if (readlength > 0) {
                            nread += readlength;
                            
                            if (Logging.REPORT_LEVEL <=
                                    Logging.INFORMATION) {
                                Logging.report(Logging.INFORMATION,
                                    LogChannels.LC_JSR180,
                                    "read " + nread);
                            }
                        } else {
                            break;
                        }
                    }
                    
                    sipMessage.setMessageContent(message_body);
                }
                
                if (sipMessageListener != null) {
                    sipMessageListener.processMessage(sipMessage);
                }
            }
        } catch (IOException ex) {
            if (sipMessageListener != null) {
                sipMessageListener.handleIOException();
            }
        } finally {
        }
    
public voidsetMessageListener(SIPMessageListener mlistener)
Add a class that implements a MessageListener interface whose methods get called * on successful parse and error conditons.

param
mlistener a MessageListener implementation that can react to correct and incorrect pars.

        sipMessageListener = mlistener;