CompletableFuture
A CompletableFuture
in Java is a part of the java.util.concurrent
package that represents a promise of a value that will be computed asynchronously at some point in the future. It is a more powerful and flexible way to handle asynchronous programming compared to older constructs like Future
.
Here’s a simplified explanation:
Future
Problem: The olderFuture
interface blocks the thread when you call.get()
to wait for the result, making it less ideal for non-blocking asynchronous operations.CompletableFuture
Solution: ACompletableFuture
allows you to chain multiple operations, handle errors, and combine multiple asynchronous tasks in a non-blocking way.
Key Features of CompletableFuture
-
Asynchronous Execution:
- It can execute a task in a separate thread using methods like
supplyAsync
orrunAsync
.
- It can execute a task in a separate thread using methods like
-
Chaining:
- You can chain multiple computations using methods like
.thenApply()
,.thenAccept()
, and.thenCompose()
.
- You can chain multiple computations using methods like
-
Combining Multiple Futures:
- Combine multiple asynchronous tasks using methods like
thenCombine()
orallOf()
.
- Combine multiple asynchronous tasks using methods like
-
Error Handling:
- Handle exceptions using
.exceptionally()
or.handle()
.
- Handle exceptions using
-
Manually Complete:
- You can complete the future manually using the
.complete()
method.
- You can complete the future manually using the
Example 1: Basic Usage
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
// Simulate a long-running computation
try { Thread.sleep(1000); } catch (InterruptedException e) { }
return "Hello, World!";
});
// Chain the result
future.thenAccept(result -> {
System.out.println("Result: " + result);
});
System.out.println("Processing...");
}
}
Example 2: Combining Two Futures
import java.util.concurrent.CompletableFuture;
public class CombiningFuturesExample {
public static void main(String[] args) {
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture combined = future1.thenCombine(future2, (a, b) -> a + b);
combined.thenAccept(result -> System.out.println("Sum: " + result));
}
}
Fetching data from different API Asyncrhonously and combinging the results
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class ApiDataFetcher {
private static final HttpClient client = HttpClient.newHttpClient();
// Method to fetch data from an API asynchronously
public CompletableFuture fetchData(String apiUrl) {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(apiUrl))
.build();
return client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body);
}
// Method to fetch data from multiple APIs and combine the results
public CompletableFuture> fetchAndCombineData(List apiUrls) {
// Create a stream of asynchronous fetch tasks
List> futures = apiUrls.stream()
.map(this::fetchData)
.collect(Collectors.toList());
// Combine all futures once they're all completed
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join) // Wait for all the responses
.collect(Collectors.toList())); // Collect them into a list
}
// Main method to demonstrate fetching and combining data
public static void main(String[] args) {
ApiDataFetcher fetcher = new ApiDataFetcher();
// List of API URLs to fetch data from
List apiUrls = List.of(
"https://jsonplaceholder.typicode.com/posts",
"https://jsonplaceholder.typicode.com/comments"
);
// Fetch and combine the data asynchronously
CompletableFuture> combinedData = fetcher.fetchAndCombineData(apiUrls);
// Print the results once all data is fetched and combined
combinedData.thenAccept(results -> {
System.out.println("Combined API Data:");
results.forEach(System.out::println);
});
// Block the main thread until the results are available
combinedData.join();
}
}
Executing parallel SELECT in Database
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ParallelPagination {
// MySQL configuration
private static final String DB_URL = "jdbc:mysql://localhost:3306/your_database";
private static final String DB_USER = "your_user";
private static final String DB_PASSWORD = "your_password";
// Batch size for pagination
private static final int BATCH_SIZE = 10000;
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10); // Thread pool
List> futures = new ArrayList<>();
int offset = 0;
while (true) {
int currentOffset = offset;
// Create and store a future for each batch
CompletableFuture future = CompletableFuture.supplyAsync(() -> fetchData(currentOffset, BATCH_SIZE), executorService)
.thenApply(data -> {
processBatch(data);
return data.size() == BATCH_SIZE; // Return true if more records likely exist
});
futures.add(future);
// Break if the current future indicates no more records
if (!future.join()) {
break;
}
offset += BATCH_SIZE;
}
// Wait for all tasks to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
executorService.shutdown();
System.out.println("All data fetched and processed.");
}
private static List fetchData(int offset, int batchSize) {
List results = new ArrayList<>();
try (Connection connection = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD)) {
String sql = "SELECT column_name FROM your_table LIMIT ? OFFSET ?";
try (PreparedStatement statement = connection.prepareStatement(sql)) {
statement.setInt(1, batchSize);
statement.setInt(2, offset);
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
results.add(resultSet.getString("column_name"));
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return results;
}
private static void processBatch(List data) {
// Simulate processing
System.out.println("Processing batch with " + data.size() + " records...");
}
}
Benefits of CompletableFuture
- Non-blocking: Makes better use of resources.
- Flexible: Supports chaining, combining, and handling of async tasks.
- Clean Code: Reduces boilerplate code for handling threads and callbacks.