Java程序  |  1537行  |  48.95 KB

/*
 * Copyright 2009 Mike Cumings
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.kenai.jbosh;

import com.kenai.jbosh.ComposableBody.Builder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * BOSH Client session instance.  Each communication session with a remote
 * connection manager is represented and handled by an instance of this
 * class.  This is the main entry point for client-side communications.
 * To create a new session, a client configuration must first be created
 * and then used to create a client instance:
 * <pre>
 * BOSHClientConfig cfg = BOSHClientConfig.Builder.create(
 *         "http://server:1234/httpbind", "jabber.org")
 *     .setFrom("user@jabber.org")
 *     .build();
 * BOSHClient client = BOSHClient.create(cfg);
 * </pre>
 * Additional client configuration options are available.  See the
 * {@code BOSHClientConfig.Builder} class for more information.
 * <p/>
 * Once a {@code BOSHClient} instance has been created, communication with
 * the remote connection manager can begin.  No attempt will be made to
 * establish a connection to the connection manager until the first call
 * is made to the {@code send(ComposableBody)} method.  Note that it is
 * possible to send an empty body to cause an immediate connection attempt
 * to the connection manager.  Sending an empty message would look like
 * the following:
 * <pre>
 * client.send(ComposableBody.builder().build());
 * </pre>
 * For more information on creating body messages with content, see the
 * {@code ComposableBody.Builder} class documentation.
 * <p/>
 * Once a session has been successfully started, the client instance can be
 * used to send arbitrary payload data.  All aspects of the BOSH
 * protocol involving setting and processing attributes in the BOSH
 * namespace will be handled by the client code transparently and behind the
 * scenes.  The user of the client instance can therefore concentrate
 * entirely on the content of the message payload, leaving the semantics of
 * the BOSH protocol to the client implementation.
 * <p/>
 * To be notified of incoming messages from the remote connection manager,
 * a {@code BOSHClientResponseListener} should be added to the client instance.
 * All incoming messages will be published to all response listeners as they
 * arrive and are processed.  As with the transmission of payload data via
 * the {@code send(ComposableBody)} method, there is no need to worry about
 * handling of the BOSH attributes, since this is handled behind the scenes.
 * <p/>
 * If the connection to the remote connection manager is terminated (either
 * explicitly or due to a terminal condition of some sort), all connection
 * listeners will be notified.  After the connection has been closed, the
 * client instance is considered dead and a new one must be created in order
 * to resume communications with the remote server.
 * <p/>
 * Instances of this class are thread-safe.
 *
 * @see BOSHClientConfig.Builder
 * @see BOSHClientResponseListener
 * @see BOSHClientConnListener
 * @see ComposableBody.Builder
 */
public final class BOSHClient {

    /**
     * Logger.
     */
    private static final Logger LOG = Logger.getLogger(
            BOSHClient.class.getName());

    /**
     * Value of the 'type' attribute used for session termination.
     */
    private static final String TERMINATE = "terminate";
    
    /**
     * Value of the 'type' attribute used for recoverable errors.
     */
    private static final String ERROR = "error";

    /**
     * Message to use for interrupted exceptions.
     */
    private static final String INTERRUPTED = "Interrupted";

    /**
     * Message used for unhandled exceptions.
     */
    private static final String UNHANDLED = "Unhandled Exception";

    /**
     * Message used whena null listener is detected.
     */
    private static final String NULL_LISTENER = "Listener may not b enull";

    /**
     * Default empty request delay.
     */
    private static final int DEFAULT_EMPTY_REQUEST_DELAY = 100;

    /**
     * Amount of time to wait before sending an empty request, in
     * milliseconds.
     */
    private static final int EMPTY_REQUEST_DELAY = Integer.getInteger(
            BOSHClient.class.getName() + ".emptyRequestDelay",
            DEFAULT_EMPTY_REQUEST_DELAY);

    /**
     * Default value for the pause margin.
     */
    private static final int DEFAULT_PAUSE_MARGIN = 500;

    /**
     * The amount of time in milliseconds which will be reserved as a
     * safety margin when scheduling empty requests against a maxpause
     * value.   This should give us enough time to build the message
     * and transport it to the remote host.
     */
    private static final int PAUSE_MARGIN = Integer.getInteger(
            BOSHClient.class.getName() + ".pauseMargin",
            DEFAULT_PAUSE_MARGIN);
    
    /**
     * Flag indicating whether or not we want to perform assertions.
     */
    private static final boolean ASSERTIONS;

    /**
     * Connection listeners.
     */
    private final Set<BOSHClientConnListener> connListeners =
            new CopyOnWriteArraySet<BOSHClientConnListener>();

    /**
     * Request listeners.
     */
    private final Set<BOSHClientRequestListener> requestListeners =
            new CopyOnWriteArraySet<BOSHClientRequestListener>();

    /**
     * Response listeners.
     */
    private final Set<BOSHClientResponseListener> responseListeners =
            new CopyOnWriteArraySet<BOSHClientResponseListener>();

    /**
     * Lock instance.
     */
    private final ReentrantLock lock = new ReentrantLock();

    /**
     * Condition indicating that there are messages to be exchanged.
     */
    private final Condition notEmpty = lock.newCondition();

    /**
     * Condition indicating that there are available slots for sending
     * messages.
     */
    private final Condition notFull = lock.newCondition();

    /**
     * Condition indicating that there are no outstanding connections.
     */
    private final Condition drained = lock.newCondition();

    /**
     * Session configuration.
     */
    private final BOSHClientConfig cfg;

    /**
     * Processor thread runnable instance.
     */
    private final Runnable procRunnable = new Runnable() {
        /**
         * Process incoming messages.
         */
        public void run() {
            processMessages();
        }
    };

    /**
     * Processor thread runnable instance.
     */
    private final Runnable emptyRequestRunnable = new Runnable() {
        /**
         * Process incoming messages.
         */
        public void run() {
            sendEmptyRequest();
        }
    };

    /**
     * HTTPSender instance.
     */
    private final HTTPSender httpSender =
            new ApacheHTTPSender();

    /**
     * Storage for test hook implementation.
     */
    private final AtomicReference<ExchangeInterceptor> exchInterceptor =
            new AtomicReference<ExchangeInterceptor>();

    /**
     * Request ID sequence to use for the session.
     */
    private final RequestIDSequence requestIDSeq = new RequestIDSequence();

    /**
     * ScheduledExcecutor to use for deferred tasks.
     */
    private final ScheduledExecutorService schedExec =
            Executors.newSingleThreadScheduledExecutor();

    /************************************************************
     * The following vars must be accessed via the lock instance.
     */

    /**
     * Thread which is used to process responses from the connection
     * manager.  Becomes null when session is terminated.
     */
    private Thread procThread;

    /**
     * Future for sending a deferred empty request, if needed.
     */
    private ScheduledFuture emptyRequestFuture;

    /**
     * Connection Manager session parameters.  Only available when in a
     * connected state.
     */
    private CMSessionParams cmParams;

    /**
     * List of active/outstanding requests.
     */
    private Queue<HTTPExchange> exchanges = new LinkedList<HTTPExchange>();

    /**
     * Set of RIDs which have been received, for the purpose of sending
     * response acknowledgements.
     */
    private SortedSet<Long> pendingResponseAcks = new TreeSet<Long>();
    
    /**
     * The highest RID that we've already received a response for.  This value
     * is used to implement response acks.
     */
    private Long responseAck = Long.valueOf(-1L);

    /**
     * List of requests which have been made but not yet acknowledged.  This
     * list remains unpopulated if the CM is not acking requests.
     */
    private List<ComposableBody> pendingRequestAcks =
            new ArrayList<ComposableBody>();

    ///////////////////////////////////////////////////////////////////////////
    // Classes:

    /**
     * Class used in testing to dynamically manipulate received exchanges
     * at test runtime.
     */
    abstract static class ExchangeInterceptor {
        /**
         * Limit construction.
         */
        ExchangeInterceptor() {
            // Empty;
        }

        /**
         * Hook to manipulate an HTTPExchange as is is about to be processed.
         *
         * @param exch original exchange that would be processed
         * @return replacement exchange instance, or {@code null} to skip
         *  processing of this exchange
         */
        abstract HTTPExchange interceptExchange(final HTTPExchange exch);
    }

    ///////////////////////////////////////////////////////////////////////////
    // Constructors:

    /**
     * Determine whether or not we should perform assertions.  Assertions
     * can be specified via system property explicitly, or defaulted to
     * the JVM assertions status.
     */
    static {
        final String prop =
                BOSHClient.class.getSimpleName() + ".assertionsEnabled";
        boolean enabled = false;
        if (System.getProperty(prop) == null) {
            assert enabled = true;
        } else {
            enabled = Boolean.getBoolean(prop);
        }
        ASSERTIONS = enabled;
    }

    /**
     * Prevent direct construction.
     */
    private BOSHClient(final BOSHClientConfig sessCfg) {
        cfg = sessCfg;
        init();
    }

    ///////////////////////////////////////////////////////////////////////////
    // Public methods:

    /**
     * Create a new BOSH client session using the client configuration
     * information provided.
     *
     * @param clientCfg session configuration
     * @return BOSH session instance
     */
    public static BOSHClient create(final BOSHClientConfig clientCfg) {
        if (clientCfg == null) {
            throw(new IllegalArgumentException(
                    "Client configuration may not be null"));
        }
        return new BOSHClient(clientCfg);
    }

    /**
     * Get the client configuration that was used to create this client
     * instance.
     *
     * @return client configuration
     */
    public BOSHClientConfig getBOSHClientConfig() {
        return cfg;
    }

    /**
     * Adds a connection listener to the session.
     *
     * @param listener connection listener to add, if not already added
     */
    public void addBOSHClientConnListener(
            final BOSHClientConnListener listener) {
        if (listener == null) {
            throw(new IllegalArgumentException(NULL_LISTENER));
        }
        connListeners.add(listener);
    }

    /**
     * Removes a connection listener from the session.
     *
     * @param listener connection listener to remove, if previously added
     */
    public void removeBOSHClientConnListener(
            final BOSHClientConnListener listener) {
        if (listener == null) {
            throw(new IllegalArgumentException(NULL_LISTENER));
        }
        connListeners.remove(listener);
    }

    /**
     * Adds a request message listener to the session.
     *
     * @param listener request listener to add, if not already added
     */
    public void addBOSHClientRequestListener(
            final BOSHClientRequestListener listener) {
        if (listener == null) {
            throw(new IllegalArgumentException(NULL_LISTENER));
        }
        requestListeners.add(listener);
    }

    /**
     * Removes a request message listener from the session, if previously
     * added.
     *
     * @param listener instance to remove
     */
    public void removeBOSHClientRequestListener(
            final BOSHClientRequestListener listener) {
        if (listener == null) {
            throw(new IllegalArgumentException(NULL_LISTENER));
        }
        requestListeners.remove(listener);
    }

    /**
     * Adds a response message listener to the session.
     *
     * @param listener response listener to add, if not already added
     */
    public void addBOSHClientResponseListener(
            final BOSHClientResponseListener listener) {
        if (listener == null) {
            throw(new IllegalArgumentException(NULL_LISTENER));
        }
        responseListeners.add(listener);
    }

    /**
     * Removes a response message listener from the session, if previously
     * added.
     *
     * @param listener instance to remove
     */
    public void removeBOSHClientResponseListener(
            final BOSHClientResponseListener listener) {
        if (listener == null) {
            throw(new IllegalArgumentException(NULL_LISTENER));
        }
        responseListeners.remove(listener);
    }

    /**
     * Send the provided message data to the remote connection manager.  The
     * provided message body does not need to have any BOSH-specific attribute
     * information set.  It only needs to contain the actual message payload
     * that should be delivered to the remote server.
     * <p/>
     * The first call to this method will result in a connection attempt
     * to the remote connection manager.  Subsequent calls to this method
     * will block until the underlying session state allows for the message
     * to be transmitted.  In certain scenarios - such as when the maximum
     * number of outbound connections has been reached - calls to this method
     * will block for short periods of time.
     *
     * @param body message data to send to remote server
     * @throws BOSHException on message transmission failure
     */
    public void send(final ComposableBody body) throws BOSHException {
        assertUnlocked();
        if (body == null) {
            throw(new IllegalArgumentException(
                    "Message body may not be null"));
        }

        HTTPExchange exch;
        CMSessionParams params;
        lock.lock();
        try {
            blockUntilSendable(body);
            if (!isWorking() && !isTermination(body)) {
                throw(new BOSHException(
                        "Cannot send message when session is closed"));
            }
            
            long rid = requestIDSeq.getNextRID();
            ComposableBody request = body;
            params = cmParams;
            if (params == null && exchanges.isEmpty()) {
                // This is the first message being sent
                request = applySessionCreationRequest(rid, body);
            } else {
                request = applySessionData(rid, body);
                if (cmParams.isAckingRequests()) {
                    pendingRequestAcks.add(request);
                }
            }
            exch = new HTTPExchange(request);
            exchanges.add(exch);
            notEmpty.signalAll();
            clearEmptyRequest();
        } finally {
            lock.unlock();
        }
        AbstractBody finalReq = exch.getRequest();
        HTTPResponse resp = httpSender.send(params, finalReq);
        exch.setHTTPResponse(resp);
        fireRequestSent(finalReq);
    }

    /**
     * Attempt to pause the current session.  When supported by the remote
     * connection manager, pausing the session will result in the connection
     * manager closing out all outstanding requests (including the pause
     * request) and increases the inactivity timeout of the session.  The
     * exact value of the temporary timeout is dependent upon the connection
     * manager.  This method should be used if a client encounters an
     * exceptional temporary situation during which it will be unable to send
     * requests to the connection manager for a period of time greater than
     * the maximum inactivity period.
     *
     * The session will revert back to it's normal, unpaused state when the
     * client sends it's next message.
     *
     * @return {@code true} if the connection manager supports session pausing,
     *  {@code false} if the connection manager does not support session
     *  pausing or if the session has not yet been established
     */
    public boolean pause() {
        assertUnlocked();
        lock.lock();
        AttrMaxPause maxPause = null;
        try {
            if (cmParams == null) {
                return false;
            }

            maxPause = cmParams.getMaxPause();
            if (maxPause == null) {
                return false;
            }
        } finally {
            lock.unlock();
        }
        try {
            send(ComposableBody.builder()
                    .setAttribute(Attributes.PAUSE, maxPause.toString())
                    .build());
        } catch (BOSHException boshx) {
            LOG.log(Level.FINEST, "Could not send pause", boshx);
        }
        return true;
    }

    /**
     * End the BOSH session by disconnecting from the remote BOSH connection
     * manager.
     *
     * @throws BOSHException when termination message cannot be sent
     */
    public void disconnect() throws BOSHException {
        disconnect(ComposableBody.builder().build());
    }

    /**
     * End the BOSH session by disconnecting from the remote BOSH connection
     * manager, sending the provided content in the final connection
     * termination message.
     *
     * @param msg final message to send
     * @throws BOSHException when termination message cannot be sent
     */
    public void disconnect(final ComposableBody msg) throws BOSHException {
        if (msg == null) {
            throw(new IllegalArgumentException(
                    "Message body may not be null"));
        }

        Builder builder = msg.rebuild();
        builder.setAttribute(Attributes.TYPE, TERMINATE);
        send(builder.build());
    }

    /**
     * Forcibly close this client session instance.  The preferred mechanism
     * to close the connection is to send a disconnect message and wait for
     * organic termination.  Calling this method simply shuts down the local
     * session without sending a termination message, releasing all resources
     * associated with the session.
     */
    public void close() {
        dispose(new BOSHException("Session explicitly closed by caller"));
    }

    ///////////////////////////////////////////////////////////////////////////
    // Package-private methods:

    /**
     * Get the current CM session params.
     *
     * @return current session params, or {@code null}
     */
    CMSessionParams getCMSessionParams() {
        lock.lock();
        try {
            return cmParams;
        } finally {
            lock.unlock();
        }
    }

    /**
     * Wait until no more messages are waiting to be processed.
     */
    void drain() {
        lock.lock();
        try {
            LOG.finest("Waiting while draining...");
            while (isWorking()
                    && (emptyRequestFuture == null
                    || emptyRequestFuture.isDone())) {
                try {
                    drained.await();
                } catch (InterruptedException intx) {
                    LOG.log(Level.FINEST, INTERRUPTED, intx);
                }
            }
            LOG.finest("Drained");
        } finally {
            lock.unlock();
        }
    }

    /**
     * Test method used to forcibly discard next exchange.
     *
     * @param interceptor exchange interceptor
     */
    void setExchangeInterceptor(final ExchangeInterceptor interceptor) {
        exchInterceptor.set(interceptor);
    }


    ///////////////////////////////////////////////////////////////////////////
    // Private methods:

    /**
     * Initialize the session.  This initializes the underlying HTTP
     * transport implementation and starts the receive thread.
     */
    private void init() {
        assertUnlocked();
        
        lock.lock();
        try {
            httpSender.init(cfg);
            procThread = new Thread(procRunnable);
            procThread.setDaemon(true);
            procThread.setName(BOSHClient.class.getSimpleName()
                    + "[" + System.identityHashCode(this)
                    + "]: Receive thread");
            procThread.start();
        } finally {
            lock.unlock();
        }
    }

    /**
     * Destroy this session.
     *
     * @param cause the reason for the session termination, or {@code null}
     *  for normal termination
     */
    private void dispose(final Throwable cause) {
        assertUnlocked();
        
        lock.lock();
        try {
            if (procThread == null) {
                // Already disposed
                return;
            }
            procThread = null;
        } finally {
            lock.unlock();
        }

        if (cause == null) {
            fireConnectionClosed();
        } else {
            fireConnectionClosedOnError(cause);
        }

        lock.lock();
        try {
            clearEmptyRequest();
            exchanges = null;
            cmParams = null;
            pendingResponseAcks = null;
            pendingRequestAcks = null;
            notEmpty.signalAll();
            notFull.signalAll();
            drained.signalAll();
        } finally {
            lock.unlock();
        }
        
        httpSender.destroy();
        schedExec.shutdownNow();
    }

    /**
     * Determines if the message body specified indicates a request to
     * pause the session.
     *
     * @param msg message to evaluate
     * @return {@code true} if the message is a pause request, {@code false}
     *  otherwise
     */
    private static boolean isPause(final AbstractBody msg) {
        return msg.getAttribute(Attributes.PAUSE) != null;
    }
    
    /**
     * Determines if the message body specified indicates a termination of
     * the session.
     *
     * @param msg message to evaluate
     * @return {@code true} if the message is a session termination,
     *  {@code false} otherwise
     */
    private static boolean isTermination(final AbstractBody msg) {
        return TERMINATE.equals(msg.getAttribute(Attributes.TYPE));
    }

    /**
     * Evaluates the HTTP response code and response message and returns the
     * terminal binding condition that it describes, if any.
     *
     * @param respCode HTTP response code
     * @param respBody response body
     * @return terminal binding condition, or {@code null} if not a terminal
     *  binding condition message
     */
    private TerminalBindingCondition getTerminalBindingCondition(
            final int respCode,
            final AbstractBody respBody) {
        assertLocked();

        if (isTermination(respBody)) {
            String str = respBody.getAttribute(Attributes.CONDITION);
            return TerminalBindingCondition.forString(str);
        }
        // Check for deprecated HTTP Error Conditions
        if (cmParams != null && cmParams.getVersion() == null) {
            return TerminalBindingCondition.forHTTPResponseCode(respCode);
        }
        return null;
    }

    /**
     * Determines if the message specified is immediately sendable or if it
     * needs to block until the session state changes.
     *
     * @param msg message to evaluate
     * @return {@code true} if the message can be immediately sent,
     *  {@code false} otherwise
     */
    private boolean isImmediatelySendable(final AbstractBody msg) {
        assertLocked();

        if (cmParams == null) {
            // block if we're waiting for a response to our first request
            return exchanges.isEmpty();
        }

        AttrRequests requests = cmParams.getRequests();
        if (requests == null) {
            return true;
        }
        int maxRequests = requests.intValue();
        if (exchanges.size() < maxRequests) {
            return true;
        }
        if (exchanges.size() == maxRequests
                && (isTermination(msg) || isPause(msg))) {
            // One additional terminate or pause message is allowed
            return true;
        }
        return false;
    }

    /**
     * Determines whether or not the session is still active.
     *
     * @return {@code true} if it is, {@code false} otherwise
     */
    private boolean isWorking() {
        assertLocked();

        return procThread != null;
    }

    /**
     * Blocks until either the message provided becomes immediately
     * sendable or until the session is terminated.
     *
     * @param msg message to evaluate
     */
    private void blockUntilSendable(final AbstractBody msg) {
        assertLocked();

        while (isWorking() && !isImmediatelySendable(msg)) {
            try {
                notFull.await();
            } catch (InterruptedException intx) {
                LOG.log(Level.FINEST, INTERRUPTED, intx);
            }
        }
    }

    /**
     * Modifies the specified body message such that it becomes a new
     * BOSH session creation request.
     *
     * @param rid request ID to use
     * @param orig original body to modify
     * @return modified message which acts as a session creation request
     */
    private ComposableBody applySessionCreationRequest(
            final long rid, final ComposableBody orig) throws BOSHException {
        assertLocked();
        
        Builder builder = orig.rebuild();
        builder.setAttribute(Attributes.TO, cfg.getTo());
        builder.setAttribute(Attributes.XML_LANG, cfg.getLang());
        builder.setAttribute(Attributes.VER,
                AttrVersion.getSupportedVersion().toString());
        builder.setAttribute(Attributes.WAIT, "60");
        builder.setAttribute(Attributes.HOLD, "1");
        builder.setAttribute(Attributes.RID, Long.toString(rid));
        applyRoute(builder);
        applyFrom(builder);
        builder.setAttribute(Attributes.ACK, "1");

        // Make sure the following are NOT present (i.e., during retries)
        builder.setAttribute(Attributes.SID, null);
        return builder.build();
    }

    /**
     * Applies routing information to the request message who's builder has
     * been provided.
     *
     * @param builder builder instance to add routing information to
     */
    private void applyRoute(final Builder builder) {
        assertLocked();
        
        String route = cfg.getRoute();
        if (route != null) {
            builder.setAttribute(Attributes.ROUTE, route);
        }
    }

    /**
     * Applies the local station ID information to the request message who's
     * builder has been provided.
     *
     * @param builder builder instance to add station ID information to
     */
    private void applyFrom(final Builder builder) {
        assertLocked();

        String from = cfg.getFrom();
        if (from != null) {
            builder.setAttribute(Attributes.FROM, from);
        }
    }

    /**
     * Applies existing session data to the outbound request, returning the
     * modified request.
     *
     * This method assumes the lock is currently held.
     *
     * @param rid request ID to use
     * @param orig original/raw request
     * @return modified request with session information applied
     */
    private ComposableBody applySessionData(
            final long rid,
            final ComposableBody orig) throws BOSHException {
        assertLocked();

        Builder builder = orig.rebuild();
        builder.setAttribute(Attributes.SID,
                cmParams.getSessionID().toString());
        builder.setAttribute(Attributes.RID, Long.toString(rid));
        applyResponseAcknowledgement(builder, rid);
        return builder.build();
    }

    /**
     * Sets the 'ack' attribute of the request to the value of the highest
     * 'rid' of a request for which it has already received a response in the
     * case where it has also received all responses associated with lower
     * 'rid' values.  The only exception is that, after its session creation
     * request, the client SHOULD NOT include an 'ack' attribute in any request
     * if it has received responses to all its previous requests.
     *
     * @param builder message builder
     * @param rid current request RID
     */
    private void applyResponseAcknowledgement(
            final Builder builder,
            final long rid) {
        assertLocked();

        if (responseAck.equals(Long.valueOf(-1L))) {
            // We have not received any responses yet
            return;
        }

        Long prevRID = Long.valueOf(rid - 1L);
        if (responseAck.equals(prevRID)) {
            // Implicit ack
            return;
        }
        
        builder.setAttribute(Attributes.ACK, responseAck.toString());
    }

    /**
     * While we are "connected", process received responses.
     *
     * This method is run in the processing thread.
     */
    private void processMessages() {
        LOG.log(Level.FINEST, "Processing thread starting");
        try {
            HTTPExchange exch;
            do {
                exch = nextExchange();
                if (exch == null) {
                    break;
                }

                // Test hook to manipulate what the client sees:
                ExchangeInterceptor interceptor = exchInterceptor.get();
                if (interceptor != null) {
                    HTTPExchange newExch = interceptor.interceptExchange(exch);
                    if (newExch == null) {
                        LOG.log(Level.FINE, "Discarding exchange on request "
                                + "of test hook: RID="
                                + exch.getRequest().getAttribute(
                                    Attributes.RID));
                        lock.lock();
                        try {
                            exchanges.remove(exch);
                        } finally {
                            lock.unlock();
                        }
                        continue;
                    }
                    exch = newExch;
                }

                processExchange(exch);
            } while (true);
        } finally {
            LOG.log(Level.FINEST, "Processing thread exiting");
        }

    }

    /**
     * Get the next message exchange to process, blocking until one becomes
     * available if nothing is already waiting for processing.
     *
     * @return next available exchange to process, or {@code null} if no
     *  exchanges are immediately available
     */
    private HTTPExchange nextExchange() {
        assertUnlocked();

        final Thread thread = Thread.currentThread();
        HTTPExchange exch = null;
        lock.lock();
        try {
            do {
                if (!thread.equals(procThread)) {
                    break;
                }
                exch = exchanges.peek();
                if (exch == null) {
                    try {
                        notEmpty.await();
                    } catch (InterruptedException intx) {
                        LOG.log(Level.FINEST, INTERRUPTED, intx);
                    }
                }
            } while (exch == null);
        } finally {
            lock.unlock();
        }
        return exch;
    }

    /**
     * Process the next, provided exchange.  This is the main processing
     * method of the receive thread.
     *
     * @param exch message exchange to process
     */
    private void processExchange(final HTTPExchange exch) {
        assertUnlocked();

        HTTPResponse resp;
        AbstractBody body;
        int respCode;
        try {
            resp = exch.getHTTPResponse();
            body = resp.getBody();
            respCode = resp.getHTTPStatus();
        } catch (BOSHException boshx) {
            LOG.log(Level.FINEST, "Could not obtain response", boshx);
            dispose(boshx);
            return;
        } catch (InterruptedException intx) {
            LOG.log(Level.FINEST, INTERRUPTED, intx);
            dispose(intx);
            return;
        }
        fireResponseReceived(body);

        // Process the message with the current session state
        AbstractBody req = exch.getRequest();
        CMSessionParams params;
        List<HTTPExchange> toResend = null;
        lock.lock();
        try {
            // Check for session creation response info, if needed
            if (cmParams == null) {
                cmParams = CMSessionParams.fromSessionInit(req, body);

                // The following call handles the lock. It's not an escape.
                fireConnectionEstablished();
            }
            params = cmParams;

            checkForTerminalBindingConditions(body, respCode);
            if (isTermination(body)) {
                // Explicit termination
                lock.unlock();
                dispose(null);
                return;
            }
            
            if (isRecoverableBindingCondition(body)) {
                // Retransmit outstanding requests
                if (toResend == null) {
                    toResend = new ArrayList<HTTPExchange>(exchanges.size());
                }
                for (HTTPExchange exchange : exchanges) {
                    HTTPExchange resendExch =
                            new HTTPExchange(exchange.getRequest());
                    toResend.add(resendExch);
                }
                for (HTTPExchange exchange : toResend) {
                    exchanges.add(exchange);
                }
            } else {
                // Process message as normal
                processRequestAcknowledgements(req, body);
                processResponseAcknowledgementData(req);
                HTTPExchange resendExch =
                        processResponseAcknowledgementReport(body);
                if (resendExch != null && toResend == null) {
                    toResend = new ArrayList<HTTPExchange>(1);
                    toResend.add(resendExch);
                    exchanges.add(resendExch);
                }
            }
        } catch (BOSHException boshx) {
            LOG.log(Level.FINEST, "Could not process response", boshx);
            lock.unlock();
            dispose(boshx);
            return;
        } finally {
            if (lock.isHeldByCurrentThread()) {
                try {
                    exchanges.remove(exch);
                    if (exchanges.isEmpty()) {
                        scheduleEmptyRequest(processPauseRequest(req));
                    }
                    notFull.signalAll();
                } finally {
                    lock.unlock();
                }
            }
        }

        if (toResend != null) {
            for (HTTPExchange resend : toResend) {
                HTTPResponse response =
                        httpSender.send(params, resend.getRequest());
                resend.setHTTPResponse(response);
                fireRequestSent(resend.getRequest());
            }
        }
    }
    
    /**
     * Clears any scheduled empty requests.
     */
    private void clearEmptyRequest() {
        assertLocked();

        if (emptyRequestFuture != null) {
            emptyRequestFuture.cancel(false);
            emptyRequestFuture = null;
        }
    }

    /**
     * Calculates the default empty request delay/interval to use for the
     * active session.
     *
     * @return delay in milliseconds
     */
    private long getDefaultEmptyRequestDelay() {
        assertLocked();
        
        // Figure out how long we should wait before sending an empty request
        AttrPolling polling = cmParams.getPollingInterval();
        long delay;
        if (polling == null) {
            delay = EMPTY_REQUEST_DELAY;
        } else {
            delay = polling.getInMilliseconds();
        }
        return delay;
    }

    /**
     * Schedule an empty request to be sent if no other requests are
     * sent in a reasonable amount of time.
     */
    private void scheduleEmptyRequest(long delay) {
        assertLocked();
        if (delay < 0L) {
            throw(new IllegalArgumentException(
                    "Empty request delay must be >= 0 (was: " + delay + ")"));
        }

        clearEmptyRequest();
        if (!isWorking()) {
            return;
        }
        
        // Schedule the transmission
        if (LOG.isLoggable(Level.FINER)) {
            LOG.finer("Scheduling empty request in " + delay + "ms");
        }
        try {
            emptyRequestFuture = schedExec.schedule(emptyRequestRunnable,
                    delay, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException rex) {
            LOG.log(Level.FINEST, "Could not schedule empty request", rex);
        }
        drained.signalAll();
    }

    /**
     * Sends an empty request to maintain session requirements.  If a request
     * is sent within a reasonable time window, the empty request transmission
     * will be cancelled.
     */
    private void sendEmptyRequest() {
        assertUnlocked();
        // Send an empty request
        LOG.finest("Sending empty request");
        try {
            send(ComposableBody.builder().build());
        } catch (BOSHException boshx) {
            dispose(boshx);
        }
    }

    /**
     * Assert that the internal lock is held.
     */
    private void assertLocked() {
        if (ASSERTIONS) {
            if (!lock.isHeldByCurrentThread()) {
                throw(new AssertionError("Lock is not held by current thread"));
            }
            return;
        }
    }

    /**
     * Assert that the internal lock is *not* held.
     */
    private void assertUnlocked() {
        if (ASSERTIONS) {
            if (lock.isHeldByCurrentThread()) {
                throw(new AssertionError("Lock is held by current thread"));
            }
            return;
        }
    }

    /**
     * Checks to see if the response indicates a terminal binding condition
     * (as per XEP-0124 section 17).  If it does, an exception is thrown.
     *
     * @param body response body to evaluate
     * @param code HTTP response code
     * @throws BOSHException if a terminal binding condition is detected
     */
    private void checkForTerminalBindingConditions(
            final AbstractBody body,
            final int code)
            throws BOSHException {
        TerminalBindingCondition cond =
                getTerminalBindingCondition(code, body);
        if (cond != null) {
            throw(new BOSHException(
                    "Terminal binding condition encountered: "
                    + cond.getCondition() + "  ("
                    + cond.getMessage() + ")"));
        }
    }

    /**
     * Determines whether or not the response indicates a recoverable
     * binding condition (as per XEP-0124 section 17).
     *
     * @param resp response body
     * @return {@code true} if it does, {@code false} otherwise
     */
    private static boolean isRecoverableBindingCondition(
            final AbstractBody resp) {
        return ERROR.equals(resp.getAttribute(Attributes.TYPE));
    }

    /**
     * Process the request to determine if the empty request delay
     * can be determined by looking to see if the request is a pause
     * request.  If it can, the request's delay is returned, otherwise
     * the default delay is returned.
     * 
     * @return delay in milliseconds that should elapse prior to an
     *  empty message being sent
     */
    private long processPauseRequest(
            final AbstractBody req) {
        assertLocked();

        if (cmParams != null && cmParams.getMaxPause() != null) {
            try {
                AttrPause pause = AttrPause.createFromString(
                        req.getAttribute(Attributes.PAUSE));
                if (pause != null) {
                    long delay = pause.getInMilliseconds() - PAUSE_MARGIN;
                    if (delay < 0) {
                        delay = EMPTY_REQUEST_DELAY;
                    }
                    return delay;
                }
            } catch (BOSHException boshx) {
                LOG.log(Level.FINEST, "Could not extract", boshx);
            }
        }

        return getDefaultEmptyRequestDelay();
    }

    /**
     * Check the response for request acknowledgements and take appropriate
     * action.
     *
     * This method assumes the lock is currently held.
     *
     * @param req request
     * @param resp response
     */
    private void processRequestAcknowledgements(
            final AbstractBody req, final AbstractBody resp) {
        assertLocked();
        
        if (!cmParams.isAckingRequests()) {
            return;
        }

        // If a report or time attribute is set, we aren't acking anything
        if (resp.getAttribute(Attributes.REPORT) != null) {
            return;
        }

        // Figure out what the highest acked RID is
        String acked = resp.getAttribute(Attributes.ACK);
        Long ackUpTo;
        if (acked == null) {
            // Implicit ack of all prior requests up until RID
            ackUpTo = Long.parseLong(req.getAttribute(Attributes.RID));
        } else {
            ackUpTo = Long.parseLong(acked);
        }

        // Remove the acked requests from the list
        if (LOG.isLoggable(Level.FINEST)) {
            LOG.finest("Removing pending acks up to: " + ackUpTo);
        }
        Iterator<ComposableBody> iter = pendingRequestAcks.iterator();
        while (iter.hasNext()) {
            AbstractBody pending = iter.next();
            Long pendingRID = Long.parseLong(
                    pending.getAttribute(Attributes.RID));
            if (pendingRID.compareTo(ackUpTo) <= 0) {
                iter.remove();
            }
        }
    }

    /**
     * Process the response in order to update the response acknowlegement
     * data.
     *
     * This method assumes the lock is currently held.
     *
     * @param req request
     */
    private void processResponseAcknowledgementData(
            final AbstractBody req) {
        assertLocked();
        
        Long rid = Long.parseLong(req.getAttribute(Attributes.RID));
        if (responseAck.equals(Long.valueOf(-1L))) {
            // This is the first request
            responseAck = rid;
        } else {
            pendingResponseAcks.add(rid);
            // Remove up until the first missing response (or end of queue)
            Long whileVal = responseAck;
            while (whileVal.equals(pendingResponseAcks.first())) {
                responseAck = whileVal;
                pendingResponseAcks.remove(whileVal);
                whileVal = Long.valueOf(whileVal.longValue() + 1);
            }
        }
    }

    /**
     * Process the response in order to check for and respond to any potential
     * ack reports.
     *
     * This method assumes the lock is currently held.
     *
     * @param resp response
     * @return exchange to transmit if a resend is to be performed, or
     *  {@code null} if no resend is necessary
     * @throws BOSHException when a a retry is needed but cannot be performed
     */
    private HTTPExchange processResponseAcknowledgementReport(
            final AbstractBody resp)
            throws BOSHException {
        assertLocked();
        
        String reportStr = resp.getAttribute(Attributes.REPORT);
        if (reportStr == null) {
            // No report on this message
            return null;
        }
        
        Long report = Long.parseLong(reportStr);
        Long time = Long.parseLong(resp.getAttribute(Attributes.TIME));
        if (LOG.isLoggable(Level.FINE)) {
            LOG.fine("Received report of missing request (RID="
                    + report + ", time=" + time + "ms)");
        }

        // Find the missing request
        Iterator<ComposableBody> iter = pendingRequestAcks.iterator();
        AbstractBody req = null;
        while (iter.hasNext() && req == null) {
            AbstractBody pending = iter.next();
            Long pendingRID = Long.parseLong(
                    pending.getAttribute(Attributes.RID));
            if (report.equals(pendingRID)) {
                req = pending;
            }
        }

        if (req == null) {
            throw(new BOSHException("Report of missing message with RID '"
                    + reportStr
                    + "' but local copy of that request was not found"));
        }

        // Resend the missing request
        HTTPExchange exch = new HTTPExchange(req);
        exchanges.add(exch);
        notEmpty.signalAll();
        return exch;
    }

    /**
     * Notifies all request listeners that the specified request is being
     * sent.
     *
     * @param request request being sent
     */
    private void fireRequestSent(final AbstractBody request) {
        assertUnlocked();

        BOSHMessageEvent event = null;
        for (BOSHClientRequestListener listener : requestListeners) {
            if (event == null) {
                event = BOSHMessageEvent.createRequestSentEvent(this, request);
            }
            try {
                listener.requestSent(event);
            } catch (Exception ex) {
                LOG.log(Level.WARNING, UNHANDLED, ex);
            }
        }
    }

    /**
     * Notifies all response listeners that the specified response has been
     * received.
     *
     * @param response response received
     */
    private void fireResponseReceived(final AbstractBody response) {
        assertUnlocked();

        BOSHMessageEvent event = null;
        for (BOSHClientResponseListener listener : responseListeners) {
            if (event == null) {
                event = BOSHMessageEvent.createResponseReceivedEvent(
                        this, response);
            }
            try {
                listener.responseReceived(event);
            } catch (Exception ex) {
                LOG.log(Level.WARNING, UNHANDLED, ex);
            }
        }
    }

    /**
     * Notifies all connection listeners that the session has been successfully
     * established.
     */
    private void fireConnectionEstablished() {
        final boolean hadLock = lock.isHeldByCurrentThread();
        if (hadLock) {
            lock.unlock();
        }
        try {
            BOSHClientConnEvent event = null;
            for (BOSHClientConnListener listener : connListeners) {
                if (event == null) {
                    event = BOSHClientConnEvent
                            .createConnectionEstablishedEvent(this);
                }
                try {
                    listener.connectionEvent(event);
                } catch (Exception ex) {
                    LOG.log(Level.WARNING, UNHANDLED, ex);
                }
            }
        } finally {
            if (hadLock) {
                lock.lock();
            }
        }
    }

    /**
     * Notifies all connection listeners that the session has been
     * terminated normally.
     */
    private void fireConnectionClosed() {
        assertUnlocked();

        BOSHClientConnEvent event = null;
        for (BOSHClientConnListener listener : connListeners) {
            if (event == null) {
                event = BOSHClientConnEvent.createConnectionClosedEvent(this);
            }
            try {
                listener.connectionEvent(event);
            } catch (Exception ex) {
                LOG.log(Level.WARNING, UNHANDLED, ex);
            }
        }
    }

    /**
     * Notifies all connection listeners that the session has been
     * terminated due to the exceptional condition provided.
     *
     * @param cause cause of the termination
     */
    private void fireConnectionClosedOnError(
            final Throwable cause) {
        assertUnlocked();

        BOSHClientConnEvent event = null;
        for (BOSHClientConnListener listener : connListeners) {
            if (event == null) {
                event = BOSHClientConnEvent
                        .createConnectionClosedOnErrorEvent(
                        this, pendingRequestAcks, cause);
            }
            try {
                listener.connectionEvent(event);
            } catch (Exception ex) {
                LOG.log(Level.WARNING, UNHANDLED, ex);
            }
        }
    }

}