Christopher Anabo
Christopher Anabo
Senior Tech Lead
Christopher Anabo

Notes

Checkout Service using parallelStream

Checkout Service using parallelStream

parallelStream() - How it works?

  • Split
    • Data source is split in to small data chunks
      • Example - List Collection split into chunks of element to size 1
    • This is done using Spliterators
      • For ArrayList, the Spliterator is ArrayListSpliterator
  • Execute
    • Data chunks are applied to the Stream Pipeline and the intermediate operations executed in a Common ForkJoin Pool
  • Combine
    • Combine the executed results into a final result
    • Combine phase in Streams API maps to terminal operations
    • Uses collect() and reduce() functions
      • collect(toList())

 

Collect() VS Reduce()

Collect -> Mutating the results Reduce
Used as a terminal operation in Streams Used as a terminal operation in Streams
Produces a single result Produces a single result
Result is produced in a mutable fashion Result is produced in a mutable fashion
Feature rich and used for many different use cases

Reduce the computation into a single value

  • Sum, Multiplication

Example

  • collect(toList);
  • collect(toSet);
  • collect(summingDouble(Double::doubleValue));

Example

  • Sum -> reduce(0.0, (x,y) -> x + y)
  • Multiply -> reduce -> (1.0,(x,y) -> x * y)

 

Identity in reduce()

  • Identity gives you the same value when its used in the computation
    • Addition: Identity = 0
      • ​​​​​​​0 + 1 => 1
      • 0 + 20 => 20
    • Multiplication : Identity = 1
      • 1 * 1 => 1​​​​​​​
      • 1 * 20 => 20

 

reduce -> Identity = this is the initial value

 

Example

import com.learnjava.util.DataSet;

import java.util.List;
import java.util.stream.Collectors;

import static com.learnjava.util.LoggerUtil.log;

public class CollectVsReduce {

    public static String collect() {
        List<String> list = DataSet.namesList();

        String result = list.
                parallelStream().
                        collect(Collectors.joining());

        return result;
    }

    public static String reduce() {

        List<String> list = DataSet.namesList();

        String result = list.
                parallelStream().
                reduce("", (s1, s2) -> s1 + s2);

        return result;
    }

    public static void main(String[] args) {

        log("collect : "+ collect());
        log("reduce : "+ reduce());
    }
}

 

Checkout Service using parallelStream

package com.learnjava.service;


import com.learnjava.domain.checkout.Cart;
import com.learnjava.domain.checkout.CartItem;
import com.learnjava.domain.checkout.CheckoutResponse;
import com.learnjava.domain.checkout.CheckoutStatus;

import java.util.List;


import static com.learnjava.util.CommonUtil.*;
import static com.learnjava.util.LoggerUtil.log;
import static java.util.stream.Collectors.summingDouble;
import static java.util.stream.Collectors.toList;

public class CheckoutService {

    private PriceValidatorService priceValidatorService;

    public CheckoutService(PriceValidatorService priceValidatorService) {
        this.priceValidatorService = priceValidatorService;
    }

    public CheckoutResponse checkout(Cart cart) {

        startTimer();
        List priceValidationList = cart.getCartItemList()
                //.stream()
                .parallelStream()
                .map(cartItem -> {
                    boolean isPriceValid = priceValidatorService.isCartItemInvalid(cartItem);
                    cartItem.setExpired(isPriceValid);
                    return cartItem;
                })
                .filter(CartItem::isExpired)
                .collect(toList());
        timeTaken();
        stopWatchReset();

        if (priceValidationList.size() > 0) {
            log("Checkout Error");
            return new CheckoutResponse(CheckoutStatus.FAILURE, priceValidationList);
        }

        //double finalRate = calculateFinalPrice(cart);
        double finalRate = calculateFinalPrice_reduce(cart);
        log("Checkout Complete and the final rate is " + finalRate);

        return new CheckoutResponse(CheckoutStatus.SUCCESS, finalRate);
    }

    private double calculateFinalPrice(Cart cart) {
        return cart.getCartItemList()
                .parallelStream()
                .map(cartItem -> cartItem.getQuantity() * cartItem.getRate())
                .collect(summingDouble(Double::doubleValue));
        //.mapToDouble(Double::doubleValue)
        //.sum();
    }

    private double calculateFinalPrice_reduce(Cart cart) {
        return cart.getCartItemList()
                .parallelStream()
                .map(cartItem -> cartItem.getQuantity() * cartItem.getRate())
                //.reduce(0.0, (x,y)->x+y);
                .reduce(0.0, Double::sum);
        //Identity for multiplication is 1
        //Identity for addition  is 0
    }
}

Testing using JUnit 5

package com.learnjava.service;

import com.learnjava.domain.checkout.Cart;
import com.learnjava.domain.checkout.CheckoutResponse;
import com.learnjava.domain.checkout.CheckoutStatus;
import com.learnjava.util.DataSet;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ForkJoinPool;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

class CheckoutServiceTest {

    PriceValidatorService priceValidatorService = new PriceValidatorService();

    CheckoutService checkoutService = new CheckoutService(priceValidatorService);

    @Test
    void parallelism() {
        // parallelism =  no of cores -1
        System.out.println("parallelism : " +ForkJoinPool.getCommonPoolParallelism());
    }

    @Test
    void no_of_cores() {
        System.out.println("no of cores : " +Runtime.getRuntime().availableProcessors());
    }

    @Test
    void checkout_6_items() {

        //given
        Cart cart = DataSet.createCart(6);

        //when
        CheckoutResponse checkoutResponse = checkoutService.checkout(cart);

        //then
        assertEquals(CheckoutStatus.SUCCESS, checkoutResponse.getCheckoutStatus());
        assertTrue(checkoutResponse.getFinalRate()>0);
    }

    @Test
    void checkout_12_items() {

        //given
        //-Djava.util.concurrent.ForkJoinPool.common.parallelism=100
        //  System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");

        Cart cart = DataSet.createCart(12);
        System.out.println("size of the cart  : "+ cart.getCartItemList().size());
        //when
        CheckoutResponse checkoutResponse = checkoutService.checkout(cart);

        //then
        assertEquals(CheckoutStatus.FAILURE, checkoutResponse.getCheckoutStatus());
    }

    @Test
    void checkout_modify_parallelism() {

        //given
        // -Djava.util.concurrent.ForkJoinPool.common.parallelism=100
          System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "1000");

        Cart cart = DataSet.createCart(1005);

        //when
        CheckoutResponse checkoutResponse = checkoutService.checkout(cart);

        //then
        assertEquals(CheckoutStatus.FAILURE, checkoutResponse.getCheckoutStatus());
    }

    @Test
    void checkout_25_items() {

        //given
        Cart cart = DataSet.createCart(25);

        //when
        CheckoutResponse checkoutResponse = checkoutService.checkout(cart);

        //then
        assertEquals(CheckoutStatus.FAILURE, checkoutResponse.getCheckoutStatus());
    }
}