Skip to main content

🌊 Java Streams

Overview 🎯

Java Streams provide a declarative approach to processing sequences of elements. Introduced in Java 8, streams enable functional-style operations on collections and other data sources. They support both sequential and parallel execution of operations.

Real-World Analogy

Think of streams like an assembly line in a factory:

  • Source: Raw materials (collection or data source)
  • Intermediate Operations: Workers performing different tasks (filter, map, sort)
  • Terminal Operation: Final product assembly (collect, reduce, count)

Key Concepts 🔑

Core Components

  1. Stream Creation

    • From Collections
    • From Arrays
    • From values
    • Infinite/finite streams
  2. Operation Types

    • Intermediate Operations (lazy)
      • filter
      • map
      • flatMap
      • sorted
      • peek
    • Terminal Operations (eager)
      • collect
      • reduce
      • forEach
      • count
      • findFirst/findAny
  3. Stream Characteristics

    • Sequential vs Parallel
    • Ordered vs Unordered
    • Sized vs Unsized
    • Distinct vs Non-distinct

Implementation Examples 💻

Basic Stream Operations

import java.util.stream.*;
import java.util.*;

public class StreamBasics {
public List<Integer> processNumbers(List<Integer> numbers) {
return numbers.stream()
.filter(n -> n > 0) // Keep positive numbers
.map(n -> n * 2) // Double each number
.sorted() // Sort the results
.collect(Collectors.toList()); // Collect to list
}

public double calculateAverage(List<Integer> numbers) {
return numbers.stream()
.mapToInt(Integer::intValue)
.average()
.orElse(0.0);
}

public List<String> filterAndUpperCase(List<String> strings) {
return strings.stream()
.filter(s -> !s.isEmpty())
.map(String::toUpperCase)
.collect(Collectors.toList());
}
}

Advanced Stream Operations

import java.util.stream.*;
import java.util.*;
import java.util.function.Function;

public class AdvancedStreams {
public <T> Map<T, Long> frequencyCount(List<T> items) {
return items.stream()
.collect(Collectors.groupingBy(
Function.identity(),
Collectors.counting()
));
}

public List<String> flatMapExample(List<List<String>> nestedLists) {
return nestedLists.stream()
.flatMap(List::stream)
.distinct()
.sorted()
.collect(Collectors.toList());
}

public Map<Boolean, List<Integer>> partitionByPrime(List<Integer> numbers) {
return numbers.stream()
.collect(Collectors.partitioningBy(this::isPrime));
}

private boolean isPrime(int number) {
if (number <= 1) return false;
return IntStream.rangeClosed(2, (int) Math.sqrt(number))
.noneMatch(i -> number % i == 0);
}
}

Parallel Stream Processing

import java.util.stream.*;
import java.util.*;
import java.util.concurrent.*;

public class ParallelStreams {
public double parallelSum(List<Double> numbers) {
return numbers.parallelStream()
.reduce(0.0, Double::sum);
}

public <T> List<T> parallelSort(List<T> items, Comparator<? super T> comparator) {
return items.parallelStream()
.sorted(comparator)
.collect(Collectors.toList());
}

public <T> List<T> parallelFilter(List<T> items, Predicate<T> predicate) {
return items.parallelStream()
.filter(predicate)
.collect(Collectors.toList());
}
}

Best Practices 🌟

Stream Creation and Usage

  1. Choose Appropriate Stream Type
// Good: Use specialized streams for primitives
IntStream.range(0, 100).sum();

// Bad: Boxing/unboxing overhead
Stream.iterate(0, i -> i + 1)
.limit(100)
.mapToInt(Integer::intValue)
.sum();
  1. Parallel Stream Considerations
// Good: Large dataset, independent operations
List<Integer> numbers = // large list
numbers.parallelStream()
.filter(n -> n > 1000)
.collect(Collectors.toList());

// Bad: Small dataset or dependent operations
List<String> smallList = Arrays.asList("a", "b", "c");
smallList.parallelStream() // Overhead > benefit
.map(String::toUpperCase)
.collect(Collectors.toList());

Testing

  1. Unit Testing Streams
@Test
void testStreamOperations() {
List<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> expected = Arrays.asList(4, 8);

List<Integer> result = input.stream()
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.collect(Collectors.toList());

assertEquals(expected, result);
}
  1. Performance Testing
@Test
void testStreamPerformance() {
List<Integer> numbers = // large list

long start = System.nanoTime();
numbers.stream()
.filter(n -> n > 0)
.count();
long streamTime = System.nanoTime() - start;

start = System.nanoTime();
numbers.parallelStream()
.filter(n -> n > 0)
.count();
long parallelTime = System.nanoTime() - start;

// Compare times
}

Common Pitfalls 🚨

  1. Stream Reuse
// Wrong: Stream reuse
Stream<String> stream = list.stream();
stream.forEach(System.out::println);
stream.count(); // IllegalStateException

// Correct: Create new stream
list.stream().forEach(System.out::println);
list.stream().count();
  1. Side Effects
// Wrong: Side effects in parallel stream
List<String> results = Collections.synchronizedList(new ArrayList<>());
stream.parallel().forEach(results::add); // Order not guaranteed

// Correct: Use collect
List<String> results = stream.parallel()
.collect(Collectors.toList());

Use Cases 🎯

1. Data Processing Pipeline

public class DataProcessor {
public List<CustomerDTO> processCustomers(List<Customer> customers) {
return customers.stream()
.filter(Customer::isActive)
.map(this::enrichCustomerData)
.sorted(Comparator.comparing(CustomerDTO::getScore).reversed())
.limit(100)
.collect(Collectors.toList());
}

private CustomerDTO enrichCustomerData(Customer customer) {
return new CustomerDTO(
customer.getId(),
customer.getName(),
calculateScore(customer),
getCustomerMetrics(customer)
);
}
}

2. Report Generation

public class ReportGenerator {
public Map<String, DoubleSummaryStatistics> generateSalesReport(
List<SalesTransaction> transactions) {
return transactions.stream()
.collect(Collectors.groupingBy(
SalesTransaction::getProduct,
Collectors.summarizingDouble(SalesTransaction::getAmount)
));
}
}

3. Real-time Data Analysis

public class DataAnalyzer {
public Analysis analyzeSensorData(Stream<SensorReading> readings) {
return readings
.filter(reading -> reading.getQuality() >= 0.8)
.collect(Collectors.teeing(
Collectors.averagingDouble(SensorReading::getValue),
Collectors.summarizingDouble(SensorReading::getValue),
(avg, stats) -> new Analysis(avg, stats)
));
}
}

Deep Dive Topics 🔍

Stream Internals

  1. Lazy Evaluation
Stream<Integer> stream = numbers.stream()
.filter(n -> {
System.out.println("Filtering: " + n);
return n > 0;
})
.map(n -> {
System.out.println("Mapping: " + n);
return n * 2;
});
// Nothing happens until terminal operation
  1. Spliterator Mechanism
public class CustomSpliterator<T> implements Spliterator<T> {
@Override
public boolean tryAdvance(Consumer<? super T> action) {
// Process single element
}

@Override
public Spliterator<T> trySplit() {
// Split for parallel processing
}
}

Performance Optimization

  1. Short-circuiting Operations
// Efficient: Stops processing after finding match
boolean hasNegative = numbers.stream()
.anyMatch(n -> n < 0);

// Less efficient: Processes all elements
boolean hasNegative = numbers.stream()
.filter(n -> n < 0)
.findFirst()
.isPresent();
  1. Stream Sizing
// Good: Stream knows its size
IntStream.range(0, 1000)
.boxed()
.collect(Collectors.toList());

// Bad: Stream size unknown
Stream.iterate(0, i -> i + 1)
.limit(1000)
.collect(Collectors.toList());

Additional Resources 📚

Official Documentation

Books and Articles

  • "Modern Java in Action" by Raoul-Gabriel Urma
  • "Java 8 in Action" by Raoul-Gabriel Urma