FileDocCategorySizeDatePackage
HttpDataChannel.javaAPI DocAndroid 1.5 API17420Wed May 06 22:42:46 BST 2009com.android.im.imps

HttpDataChannel

public class HttpDataChannel extends DataChannel implements Runnable, HeartbeatService.Callback
The HttpDataChannel is an implementation of IMPS data channel in which the protocol binding is HTTP.

Fields Summary
private static final int
MAX_RETRY_COUNT
private static final int
INIT_RETRY_DELAY_MS
private static final int
MAX_RETRY_DELAY_MS
private Thread
mSendThread
private boolean
mStopped
private boolean
mSuspended
private boolean
mConnected
private boolean
mStopRetry
private Object
mRetryLock
private LinkedBlockingQueue
mSendQueue
private LinkedBlockingQueue
mReceiveQueue
private long
mLastActive
private long
mKeepAliveMillis
private Primitive
mKeepAlivePrimitive
private AtomicBoolean
mHasPendingPolling
private final android.net.http.AndroidHttpClient
mHttpClient
private final Header
mContentTypeHeader
private final Header
mMsisdnHeader
private URI
mPostUri
private ImpsTransactionManager
mTxManager
Constructors Summary
public HttpDataChannel(ImpsConnection connection)
Constructs a new HttpDataChannel for a connection.

param
connection the connection which uses the data channel.


                         
         
        super(connection);
        mTxManager = connection.getTransactionManager();
        ImpsConnectionConfig cfg = connection.getConfig();
        try {
            String host = cfg.getHost();
            if (host == null || host.length() == 0) {
                throw new ImException(ImErrorInfo.INVALID_HOST_NAME,
                       "Empty host name.");
            }
            mPostUri = new URI(cfg.getHost());
            if (mPostUri.getPath() == null || "".equals(mPostUri.getPath())) {
                mPostUri = new URI(cfg.getHost() + "/");
            }
            if (!"http".equalsIgnoreCase(mPostUri.getScheme())
                    && !"https".equalsIgnoreCase(mPostUri.getScheme())) {
                throw new ImException(ImErrorInfo.INVALID_HOST_NAME,
                        "Non HTTP/HTTPS host name.");
            }

            mHttpClient = AndroidHttpClient.newInstance("Android-Imps/0.1");

            HttpParams params = mHttpClient.getParams();
            HttpConnectionParams.setConnectionTimeout(params, cfg.getReplyTimeout());
            HttpConnectionParams.setSoTimeout(params, cfg.getReplyTimeout());
        } catch (URISyntaxException e) {
            throw new ImException(ImErrorInfo.INVALID_HOST_NAME,
                    e.getLocalizedMessage());
        }

        mContentTypeHeader = new BasicHeader("Content-Type", cfg.getTransportContentType());
        String msisdn = cfg.getMsisdn();
        mMsisdnHeader = (msisdn != null) ? new BasicHeader("MSISDN", msisdn) : null;

        mParser = cfg.createPrimitiveParser();
        mSerializer = cfg.createPrimitiveSerializer();
    
Methods Summary
public voidconnect()

        if (mConnected) {
            throw new ImException("Already connected");
        }
        mStopped = false;
        mStopRetry = false;

        mSendQueue = new LinkedBlockingQueue<Primitive>();
        mReceiveQueue = new LinkedBlockingQueue<Primitive>();

        mSendThread = new Thread(this, "HttpDataChannel");
        mSendThread.setDaemon(true);
        mSendThread.start();

        mConnected = true;
    
private voiddoSendPrimitive(Primitive p)
Sends a primitive to the IMPS server through HTTP.

param
p The primitive to send.

        String errorInfo = null;
        int retryCount = 0;
        long retryDelay = INIT_RETRY_DELAY_MS;
        while (retryCount < MAX_RETRY_COUNT) {
            try {
                trySend(p);
                return;
            } catch (IOException e) {
                errorInfo = e.getLocalizedMessage();
                String type = p.getType();
                if (ImpsTags.Login_Request.equals(type)
                        || ImpsTags.Logout_Request.equals(type)) {
                    // we don't retry to send login/logout request. The request
                    // might be sent to the server successfully but we failed to
                    // get the response from the server. Retry in this case might
                    // cause multiple login which is not allowed by some server.
                    break;
                }
                if (p.getTransactionMode() == TransactionMode.Response) {
                    // Ignore the failure of sending response to the server since
                    // it's only an acknowledgment. When we get here, the
                    // primitive might have been sent successfully but failed to
                    // get the http response. The server might or might not send
                    // the request again if it does not receive the acknowledgment,
                    // the client is ok to either case.
                    return;
                }
                retryCount++;
                // sleep for a while and retry to send the primitive in a new
                // transaction if we havn't met the max retry count.
                if (retryCount < MAX_RETRY_COUNT) {
                   mTxManager.reassignTransactionId(p);
                    Log.w(ImpsLog.TAG, "Send primitive failed, retry after " + retryDelay + "ms");
                    synchronized (mRetryLock) {
                        try {
                            mRetryLock.wait(retryDelay);
                        } catch (InterruptedException ignore) {
                        }
                        if (mStopRetry) {
                            break;
                        }
                    }
                    retryDelay = retryDelay * 2;
                    if (retryDelay > MAX_RETRY_DELAY_MS) {
                        retryDelay = MAX_RETRY_DELAY_MS;
                    }
                }
            }
        }
        Log.w(ImpsLog.TAG, "Failed to send primitive after " + MAX_RETRY_COUNT + " retries");
        mTxManager.notifyErrorResponse(p.getTransactionID(),
                ImErrorInfo.NETWORK_ERROR, errorInfo);
    
public longgetLastActiveTime()

        return mLastActive;
    
public booleanisSendingQueueEmpty()

        if (!mConnected || mStopped) {
            throw new IllegalStateException();
        }
        return mSendQueue.isEmpty();
    
private booleanneedSendKeepAlive(long inactiveTime)

        return mKeepAliveMillis - inactiveTime <= 500;
    
public PrimitivereceivePrimitive()

        if (!mConnected || mStopped) {
            throw new IllegalStateException();
        }

        return mReceiveQueue.take();
    
public booleanresume()

        long now = SystemClock.elapsedRealtime();
        if (now - mLastActive > mKeepAliveMillis) {
            shutdown();
            return false;
        } else {
            mSuspended = false;

            // Send a polling request after resume in case we missed some
            // updates while we are suspended.
            Primitive polling = new Primitive(ImpsTags.Polling_Request);
            polling.setSession(mConnection.getSession().getID());
            sendPrimitive(polling);
            startHeartbeat();

            return true;
        }
    
public voidrun()

        while (!mStopped) {
            try {
                Primitive primitive = mSendQueue.take();
                if (primitive.getType().equals(ImpsTags.Polling_Request)) {
                    mHasPendingPolling.set(false);
                }
                doSendPrimitive(primitive);
            } catch (InterruptedException e) {
            }
        }
        mHttpClient.close();
    
public longsendHeartbeat()

        if (mSuspended) {
            return 0;
        }

        long inactiveTime = SystemClock.elapsedRealtime() - mLastActive;
        if (needSendKeepAlive(inactiveTime)) {
            sendKeepAlive();
            return mKeepAliveMillis;
        } else {
            return mKeepAliveMillis - inactiveTime;
        }
    
private voidsendKeepAlive()

        ImpsTransactionManager tm = mConnection.getTransactionManager();
        AsyncTransaction tx = new AsyncTransaction(tm) {
            @Override
            public void onResponseError(ImpsErrorInfo error) {
            }

            @Override
            public void onResponseOk(Primitive response) {
                // Since we never request a new timeout value, the response
                // can be ignored
            }
        };
        tx.sendRequest(mKeepAlivePrimitive);
    
public voidsendPrimitive(Primitive p)

        if (!mConnected || mStopped) {
            ImpsLog.log("DataChannel not connected, ignore primitive " + p.getType());
            return;
        }

        if (ImpsTags.Polling_Request.equals(p.getType())) {
            if (!mHasPendingPolling.compareAndSet(false, true)) {
                ImpsLog.log("HttpDataChannel: Ignoring Polling-Request");
                return;
            }
        } else if (ImpsTags.Logout_Request.equals(p.getType())) {
            mStopRetry = true;
            synchronized (mRetryLock) {
                mRetryLock.notify();
            }
        }
        if (!mSendQueue.offer(p)) {
            // This is almost impossible for a LinkedBlockingQueue. We don't
            // even bother to assign an error code for this. ;)
            mTxManager.notifyErrorResponse(p.getTransactionID(),
                    ImErrorInfo.UNKNOWN_ERROR, "sending queue full");
        }
    
public voidshutdown()

        HeartbeatService heartbeatService
            = SystemService.getDefault().getHeartbeatService();
        if (heartbeatService != null) {
            heartbeatService.stopHeartbeat(this);
        }
        // Stop the sending thread
        mStopped = true;
        mSendThread.interrupt();
        mConnected = false;
    
private voidstartHeartbeat()

        HeartbeatService heartbeatService
            = SystemService.getDefault().getHeartbeatService();
        if (heartbeatService != null) {
            heartbeatService.startHeartbeat(this, mKeepAliveMillis);
        }
    
public voidstartKeepAlive(long interval)

        if (!mConnected || mStopped) {
            throw new IllegalStateException();
        }

        if (interval <= 0) {
            interval = mConnection.getConfig().getDefaultKeepAliveInterval();
        }

        mKeepAliveMillis = interval * 1000;
        if (mKeepAliveMillis < 0) {
            ImpsLog.log("Negative keep alive time. Won't send keep-alive");
        }
        mKeepAlivePrimitive = new Primitive(ImpsTags.KeepAlive_Request);
        startHeartbeat();
    
public voidsuspend()

        mSuspended = true;
    
private voidtrySend(Primitive p)

        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try {
            mSerializer.serialize(p, out);
        } catch (SerializerException e) {
            mTxManager.notifyErrorResponse(p.getTransactionID(),
                    ImErrorInfo.SERIALIZER_ERROR,
                    "Internal serializer error, primitive: " + p.getType());
            out.close();
            return;
        }

        HttpPost req = new HttpPost(mPostUri);
        req.addHeader(mContentTypeHeader);
        if (mMsisdnHeader != null) {
            req.addHeader(mMsisdnHeader);
        }
        ByteArrayEntity entity = new ByteArrayEntity(out.toByteArray());
        req.setEntity(entity);

        mLastActive = SystemClock.elapsedRealtime();
        if (Log.isLoggable(ImpsLog.TAG, Log.DEBUG)) {
            long sendBytes = entity.getContentLength() + 176 /* approx. header length */;
            ImpsLog.log(mConnection.getLoginUserName() + " >> " + p.getType() + " HTTP payload approx. " + sendBytes + " bytes");
        }
        if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) {
            ImpsLog.dumpRawPacket(out.toByteArray());
            ImpsLog.dumpPrimitive(p);
        }

        HttpResponse res = mHttpClient.execute(req);
        StatusLine statusLine = res.getStatusLine();
        HttpEntity resEntity = res.getEntity();

        InputStream in = resEntity.getContent();

        if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) {
            Log.d(ImpsLog.PACKET_TAG, statusLine.toString());
            Header[] headers = res.getAllHeaders();
            for (Header h : headers) {
                Log.d(ImpsLog.PACKET_TAG, h.toString());
            }
            int len = (int) resEntity.getContentLength();
            if (len > 0) {
                byte[] content = new byte[len];
                int offset = 0;
                int bytesRead = 0;
                do {
                    bytesRead = in.read(content, offset, len);
                    offset += bytesRead;
                    len -= bytesRead;
                } while (bytesRead > 0);
                in.close();
                ImpsLog.dumpRawPacket(content);
                in = new ByteArrayInputStream(content);
            }
        }

        try {
            if (statusLine.getStatusCode() != HttpURLConnection.HTTP_OK) {
                mTxManager.notifyErrorResponse(p.getTransactionID(), statusLine.getStatusCode(),
                        statusLine.getReasonPhrase());
                return;
            }
            if (resEntity.getContentLength() == 0) {
                // empty responses are only valid for Polling-Request or
                // server initiated transactions
                if ((p.getTransactionMode() != TransactionMode.Response)
                        && !p.getType().equals(ImpsTags.Polling_Request)) {
                    mTxManager.notifyErrorResponse(p.getTransactionID(),
                            ImErrorInfo.ILLEGAL_SERVER_RESPONSE,
                            "bad response from server");
                }
                return;
            }

            Primitive response = mParser.parse(in);

            if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) {
                ImpsLog.dumpPrimitive(response);
            }

            if (Log.isLoggable(ImpsLog.TAG, Log.DEBUG)) {
                long len = 2 + resEntity.getContentLength() + statusLine.toString().length() + 2;
                Header[] headers = res.getAllHeaders();
                for (Header header : headers) {
                    len += header.getName().length() + header.getValue().length() + 4;
                }
                ImpsLog.log(mConnection.getLoginUserName() + " << "
                        + response.getType() + " HTTP payload approx. " + len + "bytes");
            }

            if (!mReceiveQueue.offer(response)) {
                // This is almost impossible for a LinkedBlockingQueue.
                // We don't even bother to assign an error code for it.
                mTxManager.notifyErrorResponse(p.getTransactionID(),
                        ImErrorInfo.UNKNOWN_ERROR, "receiving queue full");
            }
        } catch (ParserException e) {
            ImpsLog.logError(e);
            mTxManager.notifyErrorResponse(p.getTransactionID(),
                    ImErrorInfo.PARSER_ERROR,
                    "Parser error, received a bad response from server");
        } finally {
            //consume all the content so that the connection can be re-used.
            resEntity.consumeContent();
        }