Улучшаем allOf и anyOf в CompletableFuture +6




И снова здравствуйте. В преддверии старта курса «Разработчик Java» подготовили для вас перевод полезного материала.



В CompletableFuture есть два метода, дизайн которых меня удивляет:

  • CompletableFuture#allOf
  • CompletableFuture#anyOf


В этой статье мы посмотрим, что с ними не так и как их можно сделать более удобными.

CompletableFuture#allOf


Давайте посмотрим на сигнатуру метода:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    // ...
}


Здесь есть как минимум два спорных момента:

  1. Метод принимает несколько объектов CompletableFuture, возвращающих объекты разных типов.
  2. Метод возвращает CompletableFuture, который возвращает Void


Также некоторым может не нравится переменное число параметров, поэтому давайте посмотрим и на эту часть.

CompletableFuture<Void> часто используется в качестве сигнала завершения операции, однако, внеся небольшое изменение в API, этот метод можно использовать как в качестве сигнального устройства, так и в качестве носителя результатов всех завершенных операций. Давайте попробуем это сделать.

Асинхронный CompletableFuture#allOf


Во-первых, давайте придумаем нужную сигнатуру.

Справедливо предположить, что в большинстве случаев потребуется обработка списка однородных CompletableFuture и возврат CompletableFuture, содержащего список результатов:

public static <T> CompletableFuture<List<T>> allOf(
  Collection<CompletableFuture<T>> futures) {
    // ...
}


Внутренности оригинального метода, скорее всего, более сложные, чем вы ожидаете:

static CompletableFuture<Void> andTree(
  CompletableFuture<?>[] cfs, int lo, int hi) {
    CompletableFuture<Void> d = new CompletableFuture<Void>();
    if (lo > hi) // empty
        d.result = NIL;
    else {
        CompletableFuture<?> a, b;
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid ? cfs[lo] :
                  andTree(cfs, lo, mid))) == null ||
            (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                  andTree(cfs, mid+1, hi)))  == null)
            throw new NullPointerException();
        if (!d.biRelay(a, b)) {
            BiRelay<?,?> c = new BiRelay<>(d, a, b);
            a.bipush(b, c);
            c.tryFire(SYNC);
        }
    }
    return d;
}


Поэтому, вместо того, чтобы создавать его с нуля, попробуем использовать повторно то, что уже есть в оригинальном методе так, как если бы он был предназначен для использования в качестве сигнализатора завершения… а затем просто поменяем void-результат на список future:

CompletableFuture<List<CompletableFuture<T>>> i = futures.stream()
    .collect(collectingAndThen(
      toList(), 
      l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0]))
        .thenApply(__ -> l)));


Пока неплохо. Нам удалось получить
CompletableFuture<List<CompletableFuture<T>>> вместо CompletableFuture<Void>, что уже хорошо. Но нам не нужен список future с результатами, нам нужен список результатов.

Теперь мы можем просто обработать список и удалить из него нежелательные future. Совершенно нормально вызвать методы CompletableFuture#join, потому что мы знаем, что они никогда не будут блокироваться (на этот момент все future уже завершены):

CompletableFuture<List<T>> result = intermediate
    .thenApply(list -> list.stream()
        .map(CompletableFuture::join)
        .collect(toList()));


А теперь давайте объединим все это в окончательное решение:

public static <T> CompletableFuture<List<T>> allOf(
  Collection<CompletableFuture<T>> futures) {
    return futures.stream()
        .collect(collectingAndThen(
          toList(), 
          l -> CompletableFuture.allOf(l.toArray(new CompletableFuture[0]))
        .thenApply(__ -> l.stream()
           .map(CompletableFuture::join)
           .collect(Collectors.toList()))));
}


Асинхронный и падающий CompletableFuture#allOf


При наличии исключений оригинальный CompletableFuture#allOf ожидает завершения всех оставшихся операций.

И если мы хотим сообщить о завершении операции при возникновении в ней исключения, то нам придется изменить реализацию.

Для этого нужно создать новый экземпляр CompletableFuture и завершить его вручную, после того как одна из операций вызовет исключение:

CompletableFuture<List<T>> result = new CompletableFuture<>();
futures.forEach(f -> f
  .handle((__, ex) -> ex == null || result.completeExceptionally(ex)));


… но тогда нам нужно разобраться со сценарием, когда все future завершатся успешно. Это можно легко сделать, используя улучшенный метод allOf(), а затем просто завершить future вручную:

allOf(futures).thenAccept(result::complete);


Теперь мы можем объединить все вместе, чтобы сформировать окончательное решение:

public static <T> CompletableFuture<List<T>> 
  allOfShortcircuiting(Collection<CompletableFuture<T>> futures) {
    CompletableFuture<List<T>> result = new CompletableFuture<>();

    for (CompletableFuture<?> f : futures) {
        f.handle((__, ex) -> ex == null || result.completeExceptionally(ex));
    }

    allOf(futures).thenAccept(result::complete);

    return result;
}


CompletableFuture#anyOf


Начнем также с сигнатуры метода:

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
    // ...
}


Мы сразу можем обнаружить такие же проблемы, как с методом, рассмотренным выше:

  1. Метод принимает несколько объектов CompletableFuture, содержащих объекты разных типов.
  2. Метод возвращает CompletableFuture, содержащий объект типа Object.


Насколько я понимаю, метод CompletableFuture#allOf был спроектирован так, чтобы он использовался в качестве сигнального устройства. Но CompletableFuture#anyOf не соответствует этой философии, возвращая CompletableFuture<Object>, что еще более запутывает.

Посмотрите на следующий пример, где я пытаюсь обработать CompletableFuture, содержащие данные разных типов:

CompletableFuture<Integer> f1 = CompletableFuture.completedFuture(1);
CompletableFuture<String> f2 = CompletableFuture.completedFuture("2");

Integer result = CompletableFuture.anyOf(f1, f2)
  .thenApply(r -> {
      if (r instanceof Integer) {
          return (Integer) r;
      } else if (r instanceof String) {
          return Integer.valueOf((String) r);
      }
      throw new IllegalStateException("unexpected object type!");
  }).join();


Довольно неудобно, не так ли?

К счастью, это довольно легко приспособить для более распространенного сценария (ожидание одного из множества future, содержащих значения одного и того же типа), изменив сигнатуру и введя прямое приведение типов.

Таким образом, с нашими улучшениями, мы можем повторно использовать существующие методы и безопасно привести результат:

public static <T> CompletableFuture<T> anyOf(List<CompletableFuture<T>> cfs) {
    return CompletableFuture.anyOf(cfs.toArray(new CompletableFuture[0]))
      .thenApply(o -> (T) o);
}
 
public static <T> CompletableFuture<T> anyOf(CompletableFuture<T>... cfs) {
    return CompletableFuture.anyOf(cfs).thenApply(o -> (T) o);
}


Исходный код


Исходный код можно найти на Github.

На этом все. До встречи на курсе.




К сожалению, не доступен сервер mySQL