CompletableFuture is a major new tool added in Java 8 to the java.util.concurrent library. Compared with the traditional Future, it supports many new features such as streaming computation, functional programming, completion notification, and custom exception handling. As functional programming is increasingly used in Java, mastering CompletableFuture proficiently is important for making better use of the main new features introduced after Java 8. For simplicity, this article uses the Java 8 version of CompletableFuture (the CompletableFuture in Java 11 has added some methods).

  1. Why is it called CompletableFuture?

Literally translated, CompletableFuture means “completable Future”. Compared with the traditional Future, CompletableFuture can actively set the result value of the computation (actively terminate the computation process, i.e., completable), thus actively ending the blocking wait in some scenarios. Since the traditional Future cannot actively set the computation result value, once get() is called for blocking wait, it will only return when the computation result is generated or when it times out.

The following example simply illustrates how a CompletableFuture can be actively completed. In the following code, because the complete method is called, the final printed result is “manual test” instead of “test”.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {​
    try {​
        Thread.sleep(1000L);​
        return "test";​
    } catch (Exception e) {​
        return "failed test";​
    }​
});​
future.complete("manual test");​
System.out.println(future.join());
  1. Creating a CompletableFuture

2.1 Creating via constructor

The simplest way is to create a CompletableFuture instance through the constructor. As shown in the following code. Since the newly created CompletableFuture does not have any computation result yet, calling join at this time will cause the current thread to block here indefinitely.

CompletableFuture<String> future = new CompletableFuture();​
String result = future.join();​
System.out.println(result);

At this point, if the value of the CompletableFuture is actively set in another thread, the result in the above thread can be returned.

future.complete("test");

This demonstrates the simplest way to create and use a CompletableFuture.

2.2 Creating via supplyAsync

CompletableFuture.supplyAsync() can also be used to create a CompletableFuture instance. The CompletableFuture instance created by this method will asynchronously execute the passed computation task. On the calling side, the final computation result can be obtained through get or join.

supplyAsync has two signatures:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)​
 ​
 ​
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

The first one only needs to pass a Supplier instance (usually using a lambda expression), and at this time, the framework will use the ForkJoin thread pool by default to execute the submitted task.

The second one can specify a custom thread pool and then submit the task to that thread pool for execution.

The following is an example of creating a CompletableFuture using supplyAsync:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {​
    System.out.println("compute test");​
    return "test";​
});​
 ​
String result = future.join();​
System.out.println("get result: " + result);

In the example, the asynchronous task will print “compute test” and return “test” as the final computation result. Therefore, the final printed information is “get result: test”.

2.3 Creating via runAsync

CompletableFuture.runAsync() can also be used to create a CompletableFuture instance. Different from supplyAsync(), the task passed to runAsync() is required to be of Runnable type, so there is no return value. Therefore, runAsync is suitable for creating computation tasks that do not require a return value. Similar to supplyAsync(), runAsync() also has two signatures:

public static CompletableFuture<Void> runAsync(Runnable runnable)​
 ​
 ​
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

The following is an example of using runAsync():

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {​
    System.out.println("compute test");​
});​
System.out.println("get result: " + future.join());

In the example, since the task has no return value, the final printed result is “get result: null”.

  1. Common usage methods

Compared with Future, the biggest difference of CompletableFuture is that it supports streaming computation processing. Multiple tasks can be connected in sequence to form a computation flow. For example, the result generated by task 1 can be directly used as the input parameter of task 2 to participate in the computation of task 2, and so on.

Common streaming connection functions in CompletableFuture include:

thenApply​
thenApplyAsync​
 ​
thenAccept​
thenAcceptAsync​
 ​
thenRun​
thenRunAsync​
 ​
thenCombine​
thenCombineAsync​
 ​
thenCompose​
thenComposeAsync​
 ​
whenComplete​
whenCompleteAsync​
 ​
handle​
handleAsync

The functions with the Async suffix indicate that the connected subsequent tasks will be submitted to the thread pool separately, thus running asynchronously relative to the preceding tasks. Except for this, there is no other difference between them. Therefore, for quick understanding, in the following introduction, we will mainly introduce the versions without the Async suffix.

3.1 thenApply / thenAccept / thenRun

We will discuss thenApply / thenAccept / thenRun together here because the only difference between these connection functions is the type of task submitted. The task submitted by thenApply needs to follow the Function signature, that is, it has an input parameter and a return value, where the input parameter is the result of the preceding task. The task submitted by thenAccept needs to follow the Consumer signature, that is, it has an input parameter but no return value, where the input parameter is the result of the preceding task. The task submitted by thenRun needs to follow the Runnable signature, that is, it has neither an input parameter nor a return value.

Therefore, for simplicity, we will mainly discuss thenApply here.

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {​
    System.out.println("compute 1");​
    return 1;​
});​
CompletableFuture<Integer> future2 = future1.thenApply((p) -> {​
    System.out.println("compute 2");​
    return p + 10;​
});​
System.out.println("result: " + future2.join());

In the above example, future1 connects the subsequent task by calling thenApply to form future2. The final printed result of the example is 11, which shows that during the running of the program, after the result of future1 is computed, it will be passed to the task connected through thenApply, so that the final result of future2 is 1 + 10 = 11. Of course, in actual use, we can theoretically connect subsequent computation tasks infinitely to realize a longer streaming computation chain.

It should be noted that the tasks connected through thenApply / thenAccept / thenRun will start the computation of the subsequent tasks only when the preceding task completes its computation. Therefore, this group of functions is mainly used to connect task chains with dependencies between the preceding and subsequent tasks.

3.2 thenCombine

Compared with the previous group of connection functions, the biggest difference of thenCombine is that the connected task can be an independent CompletableFuture (or any type that implements CompletionStage), thus allowing the two tasks connected before and after to be executed in parallel (the subsequent task does not need to wait for the preceding task to complete execution). Finally, when both tasks are completed, their results are passed to the downstream processing task at the same time to obtain the final result.

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {​
    System.out.println("compute 1");​
    return 1;​
});​
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {​
    System.out.println("compute 2");​
    return 10;​
});​
CompletableFuture<Integer> future3 = future1.thenCombine(future2, (r1, r2) -> r1 + r2);​
System.out.println("result: " + future3.join());

In the above example code, future1 and future2 are independent CompletableFuture tasks, which will be executed in parallel in their respective threads. Then future1 is connected with future2 through thenCombine, and an expression for processing the results is passed in with a lambda expression. The task represented by this expression will take the results of future1 and future2 as input parameters and compute their sum.

Therefore, in the above example code, the final printed result is 11.

Generally, when there is no dependency between connected tasks, thenCombine can be used to connect the tasks to improve the concurrency between tasks.

Note that the functions thenAcceptBoth, thenAcceptBothAsync, runAfterBoth, and runAfterBothAsync have similar functions to thenCombine, with the only difference being the task types, which are BiConsumer and Runnable respectively.

3.3 thenCompose

Earlier, we talked about that thenCombine is mainly used to connect tasks that have no dependency between them. Then, if there is a dependency between two tasks, but the connected task is an independent CompletableFuture, how to implement it?

First, let’s look at the implementation using thenApply directly:

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {​
    System.out.println("compute 1");​
    return 1;​
});​
CompletableFuture<CompletableFuture<Integer>> future2 =​
        future1.thenApply((r) -> CompletableFuture.supplyAsync(() -> r + 10));​
System.out.println(future2.join().join());

It can be found that in the above example code, the type of future2 becomes a nested CompletableFuture, and when obtaining the result, it is also necessary to call join or get nestedly. In this way, as more tasks are connected, the code will become more and more complex, and the nested acquisition levels will become deeper and deeper. Therefore, a way is needed to unfold this nested pattern to avoid so many levels. The main purpose of thenCompose is to solve this problem (here, the role of thenCompose can also be compared to flatMap in the stream interface, as both can unfold type nesting).

Let’s see how to implement the above code through thenCompose:

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {​
    System.out.println("compute 1");​
    return 1;​
});​
CompletableFuture<Integer> future2 = future1.thenCompose((r) -> CompletableFuture.supplyAsync(() -> r + 10));​
System.out.println(future2.join());

It can be seen from the example code that after using thenCompose, future2 no longer has nested CompletableFuture types, thus achieving our goal concisely.

3.4 whenComplete

whenComplete is mainly used to inject callback notification logic when the task is completed. This solves the problem that the traditional future cannot actively send notifications when the task is completed. The preceding task will pass the computation result or the thrown exception as input parameters to the callback notification function.

The following is an example:

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {​
    System.out.println("compute 1");​
    return 1;​
});​
CompletableFuture future2 = future1.whenComplete((r, e) -> {​
    if (e != null) {​
        System.out.println("compute failed!");​
    } else {​
        System.out.println("received result is " + r);​
    }​
});​
System.out.println("result: " + future2.join());

It should be noted that the result obtained by future2 is the result of the preceding task, and the logic in whenComplete will not affect the computation result.

3.5 handle

handle is somewhat similar to whenComplete in function, but the processing function received by handle has a return value, and the return value will affect the finally obtained computation result.

The following is an example:

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {​
    System.out.println("compute 1");​
    return 1;​
});​
CompletableFuture<Integer> future2 = future1.handle((r, e) -> {​
    if (e != null) {​
        System.out.println("compute failed!");​
        return r;​
    } else {​
        System.out.println("received result is " + r);​
        return r + 10;​
    }​
});​
System.out.println("result: " + future2.join());

In the above example, the final printed result is 11. This indicates that a new result is generated after computation by handle.

Avatar

By BytePilot

Because sharing makes us better. Let’s learn, build, and grow — one byte at a time.

Leave a Reply

Your email address will not be published. Required fields are marked *