Java Streams
Core Understanding
A Stream in Java is a sequence of elements supporting sequential and parallel aggregate operations. Streams provide a high-level abstraction for processing sequences of data in a declarative way.
Stream Types
-
Sequential Streams
- Regular stream processing
- Maintains element order
- Single-threaded execution
-
Parallel Streams
- Concurrent processing
- Potentially unordered
- Uses ForkJoinPool
Stream Operations
-
Intermediate Operations
filter(Predicate<T>)
: Filters elements based on a predicatemap(Function<T,R>)
: Transforms elementsflatMap(Function<T,Stream<R>>)
: One-to-many transformationsorted()
: Sorts elementsdistinct()
: Removes duplicatespeek(Consumer<T>)
: Debug operations
-
Terminal Operations
collect(Collector<T,A,R>)
: Mutable reductionreduce(BinaryOperator<T>)
: Immutable reductionforEach(Consumer<T>)
: Iterationcount()
: Count elementsanyMatch(Predicate<T>)
: Check conditionfindFirst()
: Find first element
Important Collectors
// Key Collectors to know
Collectors.toList() // Convert stream to List
Collectors.toSet() // Convert stream to Set
Collectors.toMap() // Convert stream to Map
Collectors.groupingBy() // Group elements
Collectors.partitioningBy() // Partition elements
Collectors.joining() // Join strings
Collectors.summarizingDouble() // Statistics for doubles
Special Stream Types
- IntStream
IntStream.range(1, 100) // Create number range
IntStream.of(1, 2, 3) // Create from values
list.stream().mapToInt(Integer::intValue) // Convert to IntStream
- LongStream
LongStream.rangeClosed(1, 100) // Inclusive range
array.stream().mapToLong(Long::valueOf)
- DoubleStream
DoubleStream.generate(Math::random) // Infinite stream
list.stream().mapToDouble(Double::valueOf)
❌ Bad Example
public class OrderProcessor {
public double calculateTotal(List<Order> orders) {
// Bad practices:
double total = 0; // Mutable state
for (Order order : orders) {
if (order.getStatus() == Status.COMPLETED) {
for (OrderItem item : order.getItems()) {
total += item.getPrice(); // Multiple iterations
}
}
}
return total;
}
}
Why it's bad:
- Uses mutable state
- Multiple nested loops
- Mixed responsibilities
- Not composable
- Hard to parallelize
✅ Good Example
Let's fix this:
public class OrderProcessor {
public OrderSummary processOrders(List<Order> orders) {
return orders.stream()
.filter(order -> order.getStatus() == Status.COMPLETED)
.flatMap(order -> order.getItems().stream())
.collect(Collectors.teeing(
Collectors.summarizingDouble(OrderItem::getPrice),
Collectors.counting(),
(stats, count) -> new OrderSummary(
stats.getSum(),
stats.getAverage(),
count
)
));
}
public Map<Customer, List<Order>> groupOrdersByCustomer(List<Order> orders) {
return orders.stream()
.collect(Collectors.groupingBy(
Order::getCustomer,
Collectors.mapping(
Function.identity(),
Collectors.toList()
)
));
}
}
Why it's good:
- Declarative style
- Single responsibility
- Composable operations
- No mutable state
- Easy to understand and maintain
Best Practices
- Use Appropriate Stream Type
// For primitives, use specialized streams
IntStream.range(0, 100)
.filter(n -> n % 2 == 0)
.sum();
- Leverage Collectors for Complex Operations
public class OrderAnalyzer {
public Map<Customer, List<Order>> groupOrdersByCustomer(List<Order> orders) {
return orders.stream()
.collect(Collectors.groupingBy(
Order::getCustomer,
Collectors.mapping(
order -> order,
Collectors.toList()
)
));
}
}
- Use Custom Collectors for Specific Needs
public class CustomCollectors {
public static <T> Collector<T, ?, ImmutableList<T>> toImmutableList() {
return Collector.of(
ImmutableList::builder,
ImmutableList.Builder::add,
(b1, b2) -> b1.addAll(b2.build()),
ImmutableList.Builder::build
);
}
}
Use Cases
- Data Transformation Pipelines
public List<ProductDTO> transformProducts(List<Product> products) {
return products.stream()
.filter(Product::isActive)
.map(this::enrichProduct)
.map(productMapper::toDTO)
.collect(Collectors.toList());
}
- Complex Aggregations
public Map<Category, DoubleSummaryStatistics> analyzeCategories(List<Product> products) {
return products.stream()
.collect(Collectors.groupingBy(
Product::getCategory,
Collectors.summarizingDouble(Product::getPrice)
));
}
- Parallel Processing
public class DataProcessor {
public <T> List<T> processLargeDataset(List<T> data, Predicate<T> filter) {
return data.parallelStream()
.filter(filter)
.collect(Collectors.toList());
}
}
Anti-patterns to Avoid
- Stream Over-use
// Bad
list.stream().forEach(System.out::println);
// Good
list.forEach(System.out::println);
- Stateful Operations
// Bad - shared mutable state
List<String> collected = new ArrayList<>();
stream.forEach(collected::add);
// Good - use collectors
List<String> collected = stream.collect(Collectors.toList());
- Multiple Terminal Operations
// Bad - stream is closed after first terminal operation
Stream<String> stream = list.stream();
long count = stream.count();
List<String> collected = stream.collect(Collectors.toList()); // Throws exception
// Good - create new stream for each terminal operation
long count = list.stream().count();
List<String> collected = list.stream().collect(Collectors.toList());
Interview Questions & Answers
Q1: "How would you optimize stream operations for large datasets?" Answer:
public class StreamOptimizer {
public List<Transaction> processLargeTransactions(List<Transaction> transactions) {
return transactions.parallelStream()
.filter(tx -> tx.getAmount().compareTo(threshold) > 0)
.sorted(Comparator.comparing(Transaction::getAmount))
.collect(Collectors.toCollection(ArrayList::new));
}
// Consider data size for parallelization
public <T> Stream<T> createAppropriateStream(List<T> data) {
return data.size() > 10000 ? data.parallelStream() : data.stream();
}
}
Q2: "What's the difference between intermediate and terminal operations?" Answer:
- Intermediate operations (like map, filter) are lazy and return a new Stream
- Terminal operations (like collect, reduce) are eager and produce a result
- Intermediate operations won't execute until a terminal operation is called
Q3: "How do you handle exceptions in streams?" A:
public class StreamExceptionHandler {
private static <T> Function<T, Optional<T>> wrap(ThrowingFunction<T, T> function) {
return t -> {
try {
return Optional.of(function.apply(t));
} catch (Exception e) {
log.error("Error processing: {}", t, e);
return Optional.empty();
}
};
}
// Usage
public List<ProcessedData> processData(List<RawData> data) {
return data.stream()
.map(wrap(this::processItem))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}
}