Remove duplicate future code.

This commit is contained in:
Cody Henthorne
2024-02-12 15:00:27 -05:00
committed by GitHub
parent 577b11a349
commit 19cfae1da5
28 changed files with 75 additions and 179 deletions

View File

@@ -0,0 +1,99 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.signal.core.util.concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* A future that allows you to have multiple ways to compute a result. If one fails, the calculation
* will fall back to the next in the list.
*
* You will only see a failure if the last attempt in the list fails.
*/
public final class CascadingFuture<T> implements ListenableFuture<T> {
private static final String TAG = CascadingFuture.class.getSimpleName();
private SettableFuture<T> result;
public CascadingFuture(List<Callable<ListenableFuture<T>>> callables, ExceptionChecker exceptionChecker) {
if (callables.isEmpty()) {
throw new IllegalArgumentException("Must have at least one callable!");
}
this.result = new SettableFuture<>();
doNext(new ArrayList<>(callables), exceptionChecker);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return result.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return result.isCancelled();
}
@Override
public boolean isDone() {
return result.isDone();
}
@Override
public T get() throws ExecutionException, InterruptedException {
return result.get();
}
@Override
public T get(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
return result.get(timeout, unit);
}
@Override
public void addListener(Listener<T> listener) {
result.addListener(listener);
}
private void doNext(List<Callable<ListenableFuture<T>>> callables, ExceptionChecker exceptionChecker) {
Callable<ListenableFuture<T>> callable = callables.remove(0);
try {
ListenableFuture<T> future = callable.call();
future.addListener(new ListenableFuture.Listener<T>() {
@Override
public void onSuccess(T value) {
result.set(value);
}
@Override
public void onFailure(ExecutionException e) {
if (callables.isEmpty() || !exceptionChecker.shouldContinue(e)) {
result.setException(e.getCause());
} else if (!result.isCancelled()) {
doNext(callables, exceptionChecker);
}
}
});
} catch (Exception e) {
if (callables.isEmpty() || !exceptionChecker.shouldContinue(e)) {
result.setException(e.getCause());
} else if (!result.isCancelled()) {
doNext(callables, exceptionChecker);
}
}
}
public interface ExceptionChecker {
boolean shouldContinue(Exception e);
}
}

View File

@@ -0,0 +1,78 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.signal.core.util.concurrent;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Lets you perform a simple transform on the result of a future that maps it to a different value.
*/
class FutureMapTransformer<Input, Output> implements ListenableFuture<Output> {
private final ListenableFuture<Input> future;
private final FutureTransformers.Transformer<Input, Output> transformer;
FutureMapTransformer(ListenableFuture<Input> future, FutureTransformers.Transformer<Input, Output> transformer) {
this.future = future;
this.transformer = transformer;
}
@Override
public void addListener(Listener<Output> listener) {
future.addListener(new Listener<Input>() {
@Override
public void onSuccess(Input result) {
try {
listener.onSuccess(transformer.transform(result));
} catch (Exception e) {
listener.onFailure(new ExecutionException(e));
}
}
@Override
public void onFailure(ExecutionException e) {
listener.onFailure(e);
}
});
}
@Override
public boolean cancel(boolean b) {
return future.cancel(b);
}
@Override
public boolean isCancelled() {
return future.isCancelled();
}
@Override
public boolean isDone() {
return future.isDone();
}
@Override
public Output get() throws InterruptedException, ExecutionException {
Input input = future.get();
try {
return transformer.transform(input);
} catch (Exception e) {
throw new ExecutionException(e);
}
}
@Override
public Output get(long l, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
Input input = future.get(l, timeUnit);
try {
return transformer.transform(input);
} catch (Exception e) {
throw new ExecutionException(e);
}
}
}

View File

@@ -0,0 +1,17 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.signal.core.util.concurrent;
public final class FutureTransformers {
public static <Input, Output> ListenableFuture<Output> map(ListenableFuture<Input> future, Transformer<Input, Output> transformer) {
return new FutureMapTransformer<>(future, transformer);
}
public interface Transformer<Input, Output> {
Output transform(Input a) throws Exception;
}
}

View File

@@ -0,0 +1,18 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.signal.core.util.concurrent;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public interface ListenableFuture<T> extends Future<T> {
void addListener(Listener<T> listener);
public interface Listener<T> {
public void onSuccess(T result);
public void onFailure(ExecutionException e);
}
}

View File

@@ -0,0 +1,146 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.signal.core.util.concurrent;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SettableFuture<T> implements ListenableFuture<T> {
private final List<Listener<T>> listeners = new LinkedList<>();
private boolean completed;
private boolean canceled;
private volatile T result;
private volatile Throwable exception;
public SettableFuture() { }
public SettableFuture(T value) {
this.result = value;
this.completed = true;
}
public SettableFuture(Throwable throwable) {
this.exception = throwable;
this.completed = true;
}
@Override
public synchronized boolean cancel(boolean mayInterruptIfRunning) {
if (!completed && !canceled) {
canceled = true;
return true;
}
return false;
}
@Override
public synchronized boolean isCancelled() {
return canceled;
}
@Override
public synchronized boolean isDone() {
return completed;
}
public boolean set(T result) {
synchronized (this) {
if (completed || canceled) return false;
this.result = result;
this.completed = true;
notifyAll();
}
notifyAllListeners();
return true;
}
public boolean setException(Throwable throwable) {
synchronized (this) {
if (completed || canceled) return false;
this.exception = throwable;
this.completed = true;
notifyAll();
}
notifyAllListeners();
return true;
}
public void deferTo(ListenableFuture<T> other) {
other.addListener(new Listener<T>() {
@Override
public void onSuccess(T result) {
SettableFuture.this.set(result);
}
@Override
public void onFailure(ExecutionException e) {
SettableFuture.this.setException(e.getCause());
}
});
}
@Override
public synchronized T get() throws InterruptedException, ExecutionException {
while (!completed) wait();
if (exception != null) throw new ExecutionException(exception);
else return result;
}
@Override
public synchronized T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException
{
long startTime = System.currentTimeMillis();
while (!completed && System.currentTimeMillis() - startTime < unit.toMillis(timeout)) {
wait(unit.toMillis(timeout));
}
if (!completed) throw new TimeoutException();
else return get();
}
@Override
public void addListener(Listener<T> listener) {
synchronized (this) {
listeners.add(listener);
if (!completed) return;
}
notifyListener(listener);
}
private void notifyAllListeners() {
List<Listener<T>> localListeners;
synchronized (this) {
localListeners = new LinkedList<>(listeners);
}
for (Listener<T> listener : localListeners) {
notifyListener(listener);
}
}
private void notifyListener(Listener<T> listener) {
if (exception != null) listener.onFailure(new ExecutionException(exception));
else listener.onSuccess(result);
}
}