future, String name, Runnable finisher) {
this.future = future;
this.name = name;
this.finisher = finisher;
}
}
/**
* A collection of several futures, covering cancellation, success, and
* failure (both {@link ExecutionException} and {@link RuntimeException}),
* both immediate and delayed. We use each possible pair of these futures in
* {@link FuturesTest#runExtensiveMergerTest}.
*
* Each test requires a new {@link TestFutureBatch} because we need new
* delayed futures each time, as the old delayed futures were completed as
* part of the old test.
*/
private static final class TestFutureBatch {
final ListenableFuture doneSuccess = immediateFuture("a");
final ListenableFuture doneFailed =
immediateFailedFuture(new Exception());
final SettableFuture doneCancelled = SettableFuture.create();
{
doneCancelled.cancel(true);
}
final ListenableFuture doneRuntimeException =
new ForwardingListenableFuture() {
final ListenableFuture delegate =
immediateFuture("Should never be seen");
@Override
protected ListenableFuture delegate() {
return delegate;
}
@Override
public String get() {
throw new RuntimeException();
}
@Override
public String get(long timeout, TimeUnit unit) {
throw new RuntimeException();
}
};
final SettableFuture delayedSuccess = SettableFuture.create();
final SettableFuture delayedFailed = SettableFuture.create();
final SettableFuture delayedCancelled = SettableFuture.create();
final SettableFuture delegateForDelayedRuntimeException =
SettableFuture.create();
final ListenableFuture delayedRuntimeException =
new ForwardingListenableFuture() {
@Override
protected ListenableFuture delegate() {
return delegateForDelayedRuntimeException;
}
@Override
public String get() throws ExecutionException, InterruptedException {
delegateForDelayedRuntimeException.get();
throw new RuntimeException();
}
@Override
public String get(long timeout, TimeUnit unit) throws
ExecutionException, InterruptedException, TimeoutException {
delegateForDelayedRuntimeException.get(timeout, unit);
throw new RuntimeException();
}
};
final Runnable doNothing = new Runnable() {
@Override
public void run() {
}
};
final Runnable finishSuccess = new Runnable() {
@Override
public void run() {
delayedSuccess.set("b");
}
};
final Runnable finishFailure = new Runnable() {
@Override
public void run() {
delayedFailed.setException(new Exception());
}
};
final Runnable finishCancelled = new Runnable() {
@Override
public void run() {
delayedCancelled.cancel(true);
}
};
final Runnable finishRuntimeException = new Runnable() {
@Override
public void run() {
delegateForDelayedRuntimeException.set("Should never be seen");
}
};
/**
* All the futures, together with human-readable names for use by
* {@link #smartToString}.
*/
final ImmutableList allFutures =
ImmutableList.of(new TestFuture(doneSuccess, "doneSuccess", doNothing),
new TestFuture(doneFailed, "doneFailed", doNothing),
new TestFuture(doneCancelled, "doneCancelled", doNothing),
new TestFuture(
doneRuntimeException, "doneRuntimeException", doNothing),
new TestFuture(delayedSuccess, "delayedSuccess", finishSuccess),
new TestFuture(delayedFailed, "delayedFailed", finishFailure),
new TestFuture(
delayedCancelled, "delayedCancelled", finishCancelled),
new TestFuture(delayedRuntimeException, "delayedRuntimeException",
finishRuntimeException));
final Function, String> nameGetter =
new Function, String>() {
@Override
public String apply(ListenableFuture input) {
for (TestFuture future : allFutures) {
if (future.future == input) {
return future.name;
}
}
throw new IllegalArgumentException(input.toString());
}
};
static boolean intersect(Set> a, Set> b) {
return !Sets.intersection(a, b).isEmpty();
}
/**
* Like {@code inputs.toString()}, but with the nonsense {@code toString}
* representations replaced with the name of each future from
* {@link #allFutures}.
*/
String smartToString(ImmutableSet> inputs) {
Iterable inputNames = Iterables.transform(inputs, nameGetter);
return Joiner.on(", ").join(inputNames);
}
void smartAssertTrue(ImmutableSet> inputs,
Exception cause, boolean expression) {
if (!expression) {
failWithCause(cause, smartToString(inputs));
}
}
boolean hasDelayed(ListenableFuture a, ListenableFuture b) {
ImmutableSet> inputs = ImmutableSet.of(a, b);
return intersect(inputs, ImmutableSet.of(
delayedSuccess, delayedFailed, delayedCancelled,
delayedRuntimeException));
}
void assertHasDelayed(
ListenableFuture a, ListenableFuture b, Exception e) {
ImmutableSet> inputs = ImmutableSet.of(a, b);
smartAssertTrue(inputs, e, hasDelayed(a, b));
}
void assertHasFailure(
ListenableFuture a, ListenableFuture b, Exception e) {
ImmutableSet> inputs = ImmutableSet.of(a, b);
smartAssertTrue(inputs, e, intersect(inputs, ImmutableSet.of(doneFailed,
doneRuntimeException, delayedFailed, delayedRuntimeException)));
}
void assertHasCancel(
ListenableFuture a, ListenableFuture b, Exception e) {
ImmutableSet> inputs = ImmutableSet.of(a, b);
smartAssertTrue(inputs, e,
intersect(inputs, ImmutableSet.of(doneCancelled, delayedCancelled)));
}
void assertHasImmediateFailure(
ListenableFuture a, ListenableFuture b, Exception e) {
ImmutableSet> inputs = ImmutableSet.of(a, b);
smartAssertTrue(inputs, e, intersect(
inputs, ImmutableSet.of(doneFailed, doneRuntimeException)));
}
void assertHasImmediateCancel(
ListenableFuture a, ListenableFuture b, Exception e) {
ImmutableSet> inputs = ImmutableSet.of(a, b);
smartAssertTrue(inputs, e,
intersect(inputs, ImmutableSet.of(doneCancelled)));
}
}
/**
* {@link Futures#allAsList(Iterable)} or
* {@link Futures#successfulAsList(Iterable)}, hidden behind a common
* interface for testing.
*/
private interface Merger {
ListenableFuture> merged(
ListenableFuture a, ListenableFuture b);
Merger allMerger = new Merger() {
@Override
public ListenableFuture> merged(
ListenableFuture a, ListenableFuture b) {
return allAsList(ImmutableSet.of(a, b));
}
};
Merger successMerger = new Merger() {
@Override
public ListenableFuture> merged(
ListenableFuture a, ListenableFuture b) {
return successfulAsList(ImmutableSet.of(a, b));
}
};
}
/**
* Very rough equivalent of a timed get, produced by calling the no-arg get
* method in another thread and waiting a short time for it.
*
* We need this to test the behavior of no-arg get methods without hanging
* the main test thread forever in the case of failure.
*/
private static V pseudoTimedGet(
final Future input, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
ExecutorService executor = newSingleThreadExecutor();
Future waiter = executor.submit(new Callable() {
@Override
public V call() throws Exception {
return input.get();
}
});
try {
return waiter.get(timeout, unit);
} catch (ExecutionException e) {
propagateIfInstanceOf(e.getCause(), ExecutionException.class);
propagateIfInstanceOf(e.getCause(), CancellationException.class);
AssertionFailedError error =
new AssertionFailedError("Unexpected exception");
error.initCause(e);
throw error;
} finally {
executor.shutdownNow();
assertTrue(executor.awaitTermination(10, SECONDS));
}
}
/**
* For each possible pair of futures from {@link TestFutureBatch}, for each
* possible completion order of those futures, test that various get calls
* (timed before future completion, untimed before future completion, and
* untimed after future completion) return or throw the proper values.
*/
private static void runExtensiveMergerTest(Merger merger)
throws InterruptedException {
int inputCount = new TestFutureBatch().allFutures.size();
for (int i = 0; i < inputCount; i++) {
for (int j = 0; j < inputCount; j++) {
for (boolean iBeforeJ : new boolean[] { true, false }) {
TestFutureBatch inputs = new TestFutureBatch();
ListenableFuture iFuture = inputs.allFutures.get(i).future;
ListenableFuture jFuture = inputs.allFutures.get(j).future;
ListenableFuture> future =
merger.merged(iFuture, jFuture);
// Test timed get before we've completed any delayed futures.
try {
List result = future.get(0, MILLISECONDS);
assertTrue("Got " + result,
Arrays.asList("a", null).containsAll(result));
} catch (CancellationException e) {
assertTrue(merger == Merger.allMerger);
inputs.assertHasImmediateCancel(iFuture, jFuture, e);
} catch (ExecutionException e) {
assertTrue(merger == Merger.allMerger);
inputs.assertHasImmediateFailure(iFuture, jFuture, e);
} catch (TimeoutException e) {
inputs.assertHasDelayed(iFuture, jFuture, e);
}
// Same tests with pseudoTimedGet.
try {
List result = conditionalPseudoTimedGet(
inputs, iFuture, jFuture, future, 20, MILLISECONDS);
assertTrue("Got " + result,
Arrays.asList("a", null).containsAll(result));
} catch (CancellationException e) {
assertTrue(merger == Merger.allMerger);
inputs.assertHasImmediateCancel(iFuture, jFuture, e);
} catch (ExecutionException e) {
assertTrue(merger == Merger.allMerger);
inputs.assertHasImmediateFailure(iFuture, jFuture, e);
} catch (TimeoutException e) {
inputs.assertHasDelayed(iFuture, jFuture, e);
}
// Finish the two futures in the currently specified order:
inputs.allFutures.get(iBeforeJ ? i : j).finisher.run();
inputs.allFutures.get(iBeforeJ ? j : i).finisher.run();
// Test untimed get now that we've completed any delayed futures.
try {
List result = future.get();
assertTrue("Got " + result,
Arrays.asList("a", "b", null).containsAll(result));
} catch (CancellationException e) {
assertTrue(merger == Merger.allMerger);
inputs.assertHasCancel(iFuture, jFuture, e);
} catch (ExecutionException e) {
assertTrue(merger == Merger.allMerger);
inputs.assertHasFailure(iFuture, jFuture, e);
}
}
}
}
}
/**
* Call the non-timed {@link Future#get()} in a way that allows us to abort if
* it's expected to hang forever. More precisely, if it's expected to return,
* we simply call it[*], but if it's expected to hang (because one of the
* input futures that we know makes it up isn't done yet), then we call it in
* a separate thread (using pseudoTimedGet). The result is that we wait as
* long as necessary when the method is expected to return (at the cost of
* hanging forever if there is a bug in the class under test) but that we time
* out fairly promptly when the method is expected to hang (possibly too
* quickly, but too-quick failures should be very unlikely, given that we used
* to bail after 20ms during the expected-successful tests, and there we saw a
* failure rate of ~1/5000, meaning that the other thread's get() call nearly
* always completes within 20ms if it's going to complete at all).
*
* [*] To avoid hangs, I've disabled the in-thread calls. This makes the test
* take (very roughly) 2.5s longer. (2.5s is also the maximum length of time
* we will wait for a timed get that is expected to succeed; the fact that the
* numbers match is only a coincidence.) See the comment below for how to
* restore the fast but hang-y version.
*/
private static List conditionalPseudoTimedGet(
TestFutureBatch inputs,
ListenableFuture iFuture,
ListenableFuture jFuture,
ListenableFuture> future,
int timeout,
TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
/*
* For faster tests (that may hang indefinitely if the class under test has
* a bug!), switch the second branch to call untimed future.get() instead of
* pseudoTimedGet.
*/
return (inputs.hasDelayed(iFuture, jFuture))
? pseudoTimedGet(future, timeout, unit)
: pseudoTimedGet(future, 2500, MILLISECONDS);
}
public void testAllAsList_extensive() throws InterruptedException {
runExtensiveMergerTest(Merger.allMerger);
}
public void testSuccessfulAsList_extensive() throws InterruptedException {
runExtensiveMergerTest(Merger.successMerger);
}
public void testSuccessfulAsList() throws Exception {
// Create input and output
SettableFuture future1 = SettableFuture.create();
SettableFuture future2 = SettableFuture.create();
SettableFuture future3 = SettableFuture.create();
@SuppressWarnings("unchecked") // array is never modified
ListenableFuture> compound =
Futures.successfulAsList(future1, future2, future3);
// Attach a listener
SingleCallListener listener = new SingleCallListener();
compound.addListener(listener, directExecutor());
// Satisfy each input and check the output
assertFalse(compound.isDone());
future1.set(DATA1);
assertFalse(compound.isDone());
future2.set(DATA2);
assertFalse(compound.isDone());
listener.expectCall();
future3.set(DATA3);
assertTrue(compound.isDone());
assertTrue(listener.wasCalled());
List results = compound.get();
assertThat(results).has().exactly(DATA1, DATA2, DATA3).inOrder();
}
public void testSuccessfulAsList_emptyList() throws Exception {
SingleCallListener listener = new SingleCallListener();
listener.expectCall();
List> futures = ImmutableList.of();
ListenableFuture> compound = Futures.successfulAsList(futures);
compound.addListener(listener, directExecutor());
assertTrue(compound.isDone());
assertTrue(compound.get().isEmpty());
assertTrue(listener.wasCalled());
}
public void testSuccessfulAsList_emptyArray() throws Exception {
SingleCallListener listener = new SingleCallListener();
listener.expectCall();
@SuppressWarnings("unchecked") // array is never modified
ListenableFuture> compound = Futures.successfulAsList();
compound.addListener(listener, directExecutor());
assertTrue(compound.isDone());
assertTrue(compound.get().isEmpty());
assertTrue(listener.wasCalled());
}
public void testSuccessfulAsList_partialFailure() throws Exception {
SingleCallListener listener = new SingleCallListener();
SettableFuture future1 = SettableFuture.create();
SettableFuture future2 = SettableFuture.create();
@SuppressWarnings("unchecked") // array is never modified
ListenableFuture> compound =
Futures.successfulAsList(future1, future2);
compound.addListener(listener, directExecutor());
assertFalse(compound.isDone());
future1.setException(new Throwable("failed1"));
assertFalse(compound.isDone());
listener.expectCall();
future2.set(DATA2);
assertTrue(compound.isDone());
assertTrue(listener.wasCalled());
List results = compound.get();
assertThat(results).has().exactly(null, DATA2).inOrder();
}
public void testSuccessfulAsList_totalFailure() throws Exception {
SingleCallListener listener = new SingleCallListener();
SettableFuture future1 = SettableFuture.create();
SettableFuture future2 = SettableFuture.create();
@SuppressWarnings("unchecked") // array is never modified
ListenableFuture> compound =
Futures.successfulAsList(future1, future2);
compound.addListener(listener, directExecutor());
assertFalse(compound.isDone());
future1.setException(new Throwable("failed1"));
assertFalse(compound.isDone());
listener.expectCall();
future2.setException(new Throwable("failed2"));
assertTrue(compound.isDone());
assertTrue(listener.wasCalled());
List results = compound.get();
assertThat(results).has().exactly(null, null).inOrder();
}
public void testSuccessfulAsList_cancelled() throws Exception {
SingleCallListener listener = new SingleCallListener();
SettableFuture future1 = SettableFuture.create();
SettableFuture future2 = SettableFuture.create();
@SuppressWarnings("unchecked") // array is never modified
ListenableFuture> compound =
Futures.successfulAsList(future1, future2);
compound.addListener(listener, directExecutor());
assertFalse(compound.isDone());
future1.cancel(true);
assertFalse(compound.isDone());
listener.expectCall();
future2.set(DATA2);
assertTrue(compound.isDone());
assertTrue(listener.wasCalled());
List results = compound.get();
assertThat(results).has().exactly(null, DATA2).inOrder();
}
public void testSuccessfulAsList_resultCancelled() throws Exception {
SettableFuture future1 = SettableFuture.create();
SettableFuture future2 = SettableFuture.create();
@SuppressWarnings("unchecked") // array is never modified
ListenableFuture> compound =
Futures.successfulAsList(future1, future2);
future2.set(DATA2);
assertFalse(compound.isDone());
assertTrue(compound.cancel(false));
assertTrue(compound.isCancelled());
assertTrue(future1.isCancelled());
assertFalse(future1.wasInterrupted());
}
public void testSuccessfulAsList_resultCancelledRacingInputDone()
throws Exception {
/*
* The IllegalStateException that we're testing for is caught by
* ExecutionList and logged rather than allowed to propagate. We need to
* turn that back into a failure.
*/
Handler throwingHandler = new Handler() {
@Override public void publish(@Nullable LogRecord record) {
AssertionFailedError error = new AssertionFailedError();
error.initCause(record.getThrown());
throw error;
}
@Override public void flush() {}
@Override public void close() {}
};
ExecutionList.log.addHandler(throwingHandler);
try {
doTestSuccessfulAsList_resultCancelledRacingInputDone();
} finally {
ExecutionList.log.removeHandler(throwingHandler);
}
}
private static void doTestSuccessfulAsList_resultCancelledRacingInputDone()
throws Exception {
// Simple (combined.cancel -> input.cancel -> setOneValue):
Futures.successfulAsList(ImmutableList.of(SettableFuture.create()))
.cancel(true);
/*
* Complex (combined.cancel -> input.cancel -> other.set -> setOneValue),
* to show that this isn't just about problems with the input future we just
* cancelled:
*/
final SettableFuture