Christopher Anabo
Christopher Anabo
Senior Tech Lead
Christopher Anabo

Notes

CompletableFuture

CompletableFuture

CompletableFuture in Java is a part of the java.util.concurrent package that represents a promise of a value that will be computed asynchronously at some point in the future. It is a more powerful and flexible way to handle asynchronous programming compared to older constructs like Future.

Here’s a simplified explanation:

  • Future Problem: The older Future interface blocks the thread when you call .get() to wait for the result, making it less ideal for non-blocking asynchronous operations.
  • CompletableFuture Solution: A CompletableFuture allows you to chain multiple operations, handle errors, and combine multiple asynchronous tasks in a non-blocking way.

Key Features of CompletableFuture

  1. Asynchronous Execution:

    • It can execute a task in a separate thread using methods like supplyAsync or runAsync.
  2. Chaining:

    • You can chain multiple computations using methods like .thenApply().thenAccept(), and .thenCompose().
  3. Combining Multiple Futures:

    • Combine multiple asynchronous tasks using methods like thenCombine() or allOf().
  4. Error Handling:

    • Handle exceptions using .exceptionally() or .handle().
  5. Manually Complete:

    • You can complete the future manually using the .complete() method.

Example 1: Basic Usage

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {
    public static void main(String[] args) {
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            // Simulate a long-running computation
            try { Thread.sleep(1000); } catch (InterruptedException e) { }
            return "Hello, World!";
        });

        // Chain the result
        future.thenAccept(result -> {
            System.out.println("Result: " + result);
        });

        System.out.println("Processing...");
    }
}

 

Example 2: Combining Two Futures

import java.util.concurrent.CompletableFuture;

public class CombiningFuturesExample {
    public static void main(String[] args) {
        CompletableFuture future1 = CompletableFuture.supplyAsync(() -> 10);
        CompletableFuture future2 = CompletableFuture.supplyAsync(() -> 20);

        CompletableFuture combined = future1.thenCombine(future2, (a, b) -> a + b);

        combined.thenAccept(result -> System.out.println("Sum: " + result));
    }
}


Fetching data from different API Asyncrhonously and combinging the results 

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ApiDataFetcher {
    
    private static final HttpClient client = HttpClient.newHttpClient();

    // Method to fetch data from an API asynchronously
    public CompletableFuture fetchData(String apiUrl) {
        HttpRequest request = HttpRequest.newBuilder()
                .uri(URI.create(apiUrl))
                .build();

        return client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
                .thenApply(HttpResponse::body);
    }

    // Method to fetch data from multiple APIs and combine the results
    public CompletableFuture> fetchAndCombineData(List apiUrls) {
        // Create a stream of asynchronous fetch tasks
        List> futures = apiUrls.stream()
                .map(this::fetchData)
                .collect(Collectors.toList());

        // Combine all futures once they're all completed
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
                .thenApply(v -> futures.stream()
                        .map(CompletableFuture::join) // Wait for all the responses
                        .collect(Collectors.toList())); // Collect them into a list
    }

    // Main method to demonstrate fetching and combining data
    public static void main(String[] args) {
        ApiDataFetcher fetcher = new ApiDataFetcher();

        // List of API URLs to fetch data from
        List apiUrls = List.of(
                "https://jsonplaceholder.typicode.com/posts",
                "https://jsonplaceholder.typicode.com/comments"
        );

        // Fetch and combine the data asynchronously
        CompletableFuture> combinedData = fetcher.fetchAndCombineData(apiUrls);

        // Print the results once all data is fetched and combined
        combinedData.thenAccept(results -> {
            System.out.println("Combined API Data:");
            results.forEach(System.out::println);
        });

        // Block the main thread until the results are available
        combinedData.join(); 
    }
}

Executing parallel SELECT in Database

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ParallelPagination {

    // MySQL configuration
    private static final String DB_URL = "jdbc:mysql://localhost:3306/your_database";
    private static final String DB_USER = "your_user";
    private static final String DB_PASSWORD = "your_password";

    // Batch size for pagination
    private static final int BATCH_SIZE = 10000;

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10); // Thread pool
        List> futures = new ArrayList<>();

        int offset = 0;

        while (true) {
            int currentOffset = offset;

            // Create and store a future for each batch
            CompletableFuture future = CompletableFuture.supplyAsync(() -> fetchData(currentOffset, BATCH_SIZE), executorService)
                .thenApply(data -> {
                    processBatch(data);
                    return data.size() == BATCH_SIZE; // Return true if more records likely exist
                });

            futures.add(future);

            // Break if the current future indicates no more records
            if (!future.join()) {
                break;
            }

            offset += BATCH_SIZE;
        }

        // Wait for all tasks to complete
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        executorService.shutdown();

        System.out.println("All data fetched and processed.");
    }

    private static List fetchData(int offset, int batchSize) {
        List results = new ArrayList<>();
        try (Connection connection = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD)) {
            String sql = "SELECT column_name FROM your_table LIMIT ? OFFSET ?";
            try (PreparedStatement statement = connection.prepareStatement(sql)) {
                statement.setInt(1, batchSize);
                statement.setInt(2, offset);
                try (ResultSet resultSet = statement.executeQuery()) {
                    while (resultSet.next()) {
                        results.add(resultSet.getString("column_name"));
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return results;
    }

    private static void processBatch(List data) {
        // Simulate processing
        System.out.println("Processing batch with " + data.size() + " records...");
    }
}

Benefits of CompletableFuture

  • Non-blocking: Makes better use of resources.
  • Flexible: Supports chaining, combining, and handling of async tasks.
  • Clean Code: Reduces boilerplate code for handling threads and callbacks.