Checkout Service using parallelStream
parallelStream() - How it works?
- Split
- Data source is split in to small data chunks
- Example - List Collection split into chunks of element to size 1
- This is done using Spliterators
- For ArrayList, the Spliterator is ArrayListSpliterator
- Data source is split in to small data chunks
- Execute
- Data chunks are applied to the Stream Pipeline and the intermediate operations executed in a Common ForkJoin Pool
- Combine
- Combine the executed results into a final result
- Combine phase in Streams API maps to terminal operations
- Uses collect() and reduce() functions
- collect(toList())
Collect() VS Reduce()
Collect -> Mutating the results | Reduce |
Used as a terminal operation in Streams | Used as a terminal operation in Streams |
Produces a single result | Produces a single result |
Result is produced in a mutable fashion | Result is produced in a mutable fashion |
Feature rich and used for many different use cases |
Reduce the computation into a single value
|
Example
|
Example
|
Identity in reduce()
- Identity gives you the same value when its used in the computation
- Addition: Identity = 0
- 0 + 1 => 1
- 0 + 20 => 20
- Multiplication : Identity = 1
- 1 * 1 => 1
- 1 * 20 => 20
- Addition: Identity = 0
reduce -> Identity = this is the initial value
Example
import com.learnjava.util.DataSet;
import java.util.List;
import java.util.stream.Collectors;
import static com.learnjava.util.LoggerUtil.log;
public class CollectVsReduce {
public static String collect() {
List<String> list = DataSet.namesList();
String result = list.
parallelStream().
collect(Collectors.joining());
return result;
}
public static String reduce() {
List<String> list = DataSet.namesList();
String result = list.
parallelStream().
reduce("", (s1, s2) -> s1 + s2);
return result;
}
public static void main(String[] args) {
log("collect : "+ collect());
log("reduce : "+ reduce());
}
}
Checkout Service using parallelStream
package com.learnjava.service;
import com.learnjava.domain.checkout.Cart;
import com.learnjava.domain.checkout.CartItem;
import com.learnjava.domain.checkout.CheckoutResponse;
import com.learnjava.domain.checkout.CheckoutStatus;
import java.util.List;
import static com.learnjava.util.CommonUtil.*;
import static com.learnjava.util.LoggerUtil.log;
import static java.util.stream.Collectors.summingDouble;
import static java.util.stream.Collectors.toList;
public class CheckoutService {
private PriceValidatorService priceValidatorService;
public CheckoutService(PriceValidatorService priceValidatorService) {
this.priceValidatorService = priceValidatorService;
}
public CheckoutResponse checkout(Cart cart) {
startTimer();
List priceValidationList = cart.getCartItemList()
//.stream()
.parallelStream()
.map(cartItem -> {
boolean isPriceValid = priceValidatorService.isCartItemInvalid(cartItem);
cartItem.setExpired(isPriceValid);
return cartItem;
})
.filter(CartItem::isExpired)
.collect(toList());
timeTaken();
stopWatchReset();
if (priceValidationList.size() > 0) {
log("Checkout Error");
return new CheckoutResponse(CheckoutStatus.FAILURE, priceValidationList);
}
//double finalRate = calculateFinalPrice(cart);
double finalRate = calculateFinalPrice_reduce(cart);
log("Checkout Complete and the final rate is " + finalRate);
return new CheckoutResponse(CheckoutStatus.SUCCESS, finalRate);
}
private double calculateFinalPrice(Cart cart) {
return cart.getCartItemList()
.parallelStream()
.map(cartItem -> cartItem.getQuantity() * cartItem.getRate())
.collect(summingDouble(Double::doubleValue));
//.mapToDouble(Double::doubleValue)
//.sum();
}
private double calculateFinalPrice_reduce(Cart cart) {
return cart.getCartItemList()
.parallelStream()
.map(cartItem -> cartItem.getQuantity() * cartItem.getRate())
//.reduce(0.0, (x,y)->x+y);
.reduce(0.0, Double::sum);
//Identity for multiplication is 1
//Identity for addition is 0
}
}
Testing using JUnit 5
package com.learnjava.service;
import com.learnjava.domain.checkout.Cart;
import com.learnjava.domain.checkout.CheckoutResponse;
import com.learnjava.domain.checkout.CheckoutStatus;
import com.learnjava.util.DataSet;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ForkJoinPool;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
class CheckoutServiceTest {
PriceValidatorService priceValidatorService = new PriceValidatorService();
CheckoutService checkoutService = new CheckoutService(priceValidatorService);
@Test
void parallelism() {
// parallelism = no of cores -1
System.out.println("parallelism : " +ForkJoinPool.getCommonPoolParallelism());
}
@Test
void no_of_cores() {
System.out.println("no of cores : " +Runtime.getRuntime().availableProcessors());
}
@Test
void checkout_6_items() {
//given
Cart cart = DataSet.createCart(6);
//when
CheckoutResponse checkoutResponse = checkoutService.checkout(cart);
//then
assertEquals(CheckoutStatus.SUCCESS, checkoutResponse.getCheckoutStatus());
assertTrue(checkoutResponse.getFinalRate()>0);
}
@Test
void checkout_12_items() {
//given
//-Djava.util.concurrent.ForkJoinPool.common.parallelism=100
// System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "100");
Cart cart = DataSet.createCart(12);
System.out.println("size of the cart : "+ cart.getCartItemList().size());
//when
CheckoutResponse checkoutResponse = checkoutService.checkout(cart);
//then
assertEquals(CheckoutStatus.FAILURE, checkoutResponse.getCheckoutStatus());
}
@Test
void checkout_modify_parallelism() {
//given
// -Djava.util.concurrent.ForkJoinPool.common.parallelism=100
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "1000");
Cart cart = DataSet.createCart(1005);
//when
CheckoutResponse checkoutResponse = checkoutService.checkout(cart);
//then
assertEquals(CheckoutStatus.FAILURE, checkoutResponse.getCheckoutStatus());
}
@Test
void checkout_25_items() {
//given
Cart cart = DataSet.createCart(25);
//when
CheckoutResponse checkoutResponse = checkoutService.checkout(cart);
//then
assertEquals(CheckoutStatus.FAILURE, checkoutResponse.getCheckoutStatus());
}
}