on
Modern Java in Action - Ch.7
참고: 책 - Modern Java in Action
책 Modern Java in Action을 읽고 정리합니다. 이번 포스트에서는 Ch 7.1 ~ Ch 7.4의 내용을 읽고 정리합니다.
Ch 7. 병렬 데이터 처리와 성능
7.1 병렬 스트림
- 7.1.1 순차 스트림을 병렬 스트림으로 변환하기
- 7.1.2 스트림 성능 측정
- 7.1.3 병렬 스트림의 올바른 사용법
- 7.1.4 병렬 스트림 효과적으로 사용하기
7.2 포크/조인 프레임워크
- 7.2.1 RecursiveTask 활용
- 7.2.2 포크/조인 프레임워크를 제대로 사용하는 방법
- 7.2.3 작업 훔치기
7.3 Splitator 인터페이스
- 7.3.1 분할 과정
- 7.3.2 커스텀 Spliterator 구현하기
Java 7이 등장하기 전… 데이터 컬렉션을 병렬로 처리하기가 어려웠습니다.
- 데이터를 -> 서브 파트로 분할하고
- -> 분할된 서브 파트를 각가의 스레드로 할당해야 합니다. 스레드로 할당한 다음엔
- -> 레이스 컨디션(race condition)이 발생하지 않도록 적절한 동기화를 추가해야 하며
- -> 마지막으로 부분 결과를 합쳐야 합니다.
Java 7은… 더 쉽게 병렬화를 수행하면서 에러를 최소화 할 수 있도록 Fork/Join Framework 기능을 제공합니다. 7장에서는 스트림으로 데이터 컬렉션 관련 동작을 얼마나 쉽게 병렬로 실행할 수 있는지를 다룹니다.
7.1 병렬 스트림
컬렉션에 parallelStream
을 호출하면 병렬 스트림이 생성됩니다. 병렬 스트림은 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림입니다.
따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세서가 각각의 청크를 처리하도록 할당할 수 있습니다.
ex. 숫자 n을 인수로 받아서 1부터 n까지의 모든 숫자의 합계를 반환하는 메서드를 구현하는 예제입니다.
public long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1) //무한 자연수 스트림 생성
.limit(n) //주어진 크기로 스트림 제한
.reduce(0L, Long::sum); //모든 숫자를 더하는 스트림 리듀싱 연산
}
public long iterativeSum(long n) {
long result = 0;
for (long i = 1L; i <= n; i++) {
result += i;
}
return result;
}
- 전통적인 자바의 반복문일 경우 n이 커진다면 이 연산을 병렬로 처리하는 것이 좋습니다.
- 무엇을 건드려야 할지?
- 결과 변수는 어떻게 동기화해야 할지?
- 몇 개의 스레드를 사용해야 할지?
- 숫자는 어떻게 생성할지?
- 생성된 숫자는 누가 더할지?
- 등의 고민은 병렬 스트림을 통해 해결할 수 있습니다.
7.1.1 순차 스트림을 병렬 스트림으로 변환하기
public long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel() //스트림을 병렬 스트림으로 변환
.reduce(0L, Long::sum);
}
- 순차 스트림에 parallel 메서드를 호출하면 기존의 함수형 리듀싱 연산(숫자 합계 계산)이 병렬로 처리됩니다.
- 사실 순차 스트림에 parallel을 호출해도 스트림 자체에는 아무 변화도 일어나지 않습니다. 내부적으로는 parallel을 호출하면 이후 연산이 병렬로 수행해야 함을 의미하는 boolean flag가 설정됩니다.
- 반대로 sequential을 호출하면 병렬 스트림을 순차 스트림으로 바꿀 수 있습니다.
parallel
과sequential
두 메서드를 이용해서 어떤 연산을 병렬로 실행하고 어떤 연산을 순차로 실행할지 제어할 수 있습니다.
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.reduce();
- 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미칩니다.
7.1.2 스트림 성능 측정
- 자바 마이크로벤치마크 하니스(Java Microbenchmark Harness - JMH)라는 라이브러리를 이용해 작은 벤치마크를 구현합니다.
- JMH를 이용하면 간단하고, 어노테이션 기반 방식을 지원하며, 안정적으로 자바 프로그램이나 JVM을 대상으로 하는 다른 언어용 벤치마크를 구현할 수 있습니다.
<!--핵심 JMH 구현을 포함-->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.35</version>
</dependency>
<!--JAR 파일을 만드는데 도움을 주는 어노테이션 프로세서를 포함-->
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.35</version>
</dependency>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals><goal>shade</goal></goals>
<configuration>
<finalName>benchmarks</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
ex. n개의 숫자를 더하는 함수의 성능을 측정하는 예제입니다.
sequentialSum
- 순차적 스트림 사용
@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime) //벤치마크 대상 메서드를 실행하는데 걸린 평균 시간 측정
@OutputTimeUnit(TimeUnit.MILLISECONDS) //벤치마크 결과를 밀리초 단위로 출력
@Fork(value = 2, jvmArgs = {"-Xms4G", "-Xms4G"}) //4Gb의 힙 공간을 제공한 환경에서 두 번 벤치마크를 수행해 결과의 신뢰성 확보
@Measurement(iterations = 20)
@Warmup(iterations = 3)
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark
public long seqeuntialSum() {
return Stream.iterate(1L, i -> i + 1).limit(N)
.reduce(0L, Long::sum);
}
@TearDown(Level.Invocation) //매 번 벤치마크를 실행한 다음에는 가비지 컬렉터 동작 시도
public void tearDown() {
System.gc();
}
}
- 벤치마크가 가능한 가비지 컬렉터(GC)의 영향을 받지 않도록 힙의 크기를 충분하게 설정합니다.
- 벤치마크가 끝날 때마다 GC가 실행되도록 강제합니다.
- 코드를 실행할 때 JMH 명령은 핫스팟이 코드를 최적화할 수 있도록 20번을 실행하며 벤치마크를 준비한 다음 20번을 더 실행해 최종 결과를 계산합니다.(기본적으로 40회 반복 실행)
iterativeSum
- 기본 for문 사용
@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(value = 2, jvmArgs = { "-Xms4G", "-Xmx4G" })
@Measurement(iterations = 20)
@Warmup(iterations = 3)
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark
public long iterativeSum() {
long result = 0;
for (long i = 1L; i <= N; i++) {
result += i;
}
return result;
}
@TearDown(Level.Invocation) //매 번 벤치마크를 실행한 다음에는 가비지 컬렉터 동작 시도
public void tearDown() {
System.gc();
}
}
- 전통적인 for반복문을 사용하는 방법이 더 저수준으로 동작할 뿐 아니라 기본값 박싱/언박싱 필요가 없어서 더 빠를 것으로 예상할 수 있습니다.
- 속도를 비교해보면 순차적 스트림을 사용하는 버전에 비해 빠르다는 것을 확인할 수 있습니다.
parallelSum
- 병렬 스트림 사용
@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(value = 2, jvmArgs = { "-Xms4G", "-Xmx4G" })
@Measurement(iterations = 20)
@Warmup(iterations = 3)
public class ParallelStreamBenchmark {
@Benchmark
public long parallelSum() {
return Stream.iterate(1L, i -> i + 1).limit(N).parallel().reduce(0L, Long::sum);
}
@TearDown(Level.Invocation) //매 번 벤치마크를 실행한 다음에는 가비지 컬렉터 동작 시도
public void tearDown() {
System.gc();
}
}
- 각 성능 비교
- sequentialSum(순차 스트림): 66.872ms/op
- iterativeSum(for 반복문): 3.407ms/op
- parallelSum(병렬 스트림): 67.832ms/op
rangedSum
@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(value = 2, jvmArgs = { "-Xms4G", "-Xmx4G" })
@Measurement(iterations = 20)
@Warmup(iterations = 3)
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark
public long rangedSum() {
return LongStream.rangeClosed(1, N).reduce(0L, Long::sum);
}
@TearDown(Level.Invocation)
public void tearDown() {
System.gc();
}
}
- 더 특화된 메서드 사용
- 멀티코어 프로세서를 활용해서 효과적으로 합계 연산을 병렬로 실행하려면
LongStream.rangeClosed()
를 사용할 수 있습니다.
- 멀티코어 프로세서를 활용해서 효과적으로 합계 연산을 병렬로 실행하려면
LongStream.rangeClosed
를 사용하면…- 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤드가 사라집니다.
- 쉽게 청크로 분할할 수 있는 숫자 범위를 생성합니다. (1 ~ 20을 1 ~ 5, 6 ~ 10, 11 ~ 15, 16 ~ 20 등으로 분할)
- 특화된 스트림 메서드는 순차 스트림에 비해 빠른 처리 속도를 보이는데 특화되지 않은 스트림을 처리할 떄는 오토박싱, 언박싱 등의 오버헤드를 수반하기 때문입니다.
parallelRangedSum
- 새로운 버전에 병렬 스트림 적용
@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(value = 2, jvmArgs = { "-Xms4G", "-Xmx4G" })
@Measurement(iterations = 20)
@Warmup(iterations = 3)
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark
public long parallelRangedSum() {
return LongStream.rangeClosed(1, N).parallel().reduce(0L, Long::sum);
}
@TearDown(Level.Invocation)
public void tearDown() {
System.gc();
}
}
- 결과를 통해 병렬 리듀싱 연산이 가장 빠른 성능을 갖는 것을 볼 수 있습니다.
- 이를 통해 올바른 자료구조를 선택해야 병렬 실행도 최적의 성능을 발휘할 수 있으며 함수형 프로그래밍을 올바로 사용하면 반복적으로 코드를 실행하는 방법에 비해 최신 멀티 코어 CPU가 제공하는 병렬 실행의 힘을 단순하게 직접적으로 얻을 수 있음을 알 수 있습니다.
- 그러나 병렬화를 이용하려면 스트림으로 재귀적으로 분할해야 하고, 각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합쳐야 합니다.
- 따라서 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직합니다. 또한 항상 병렬화를 올바르게 사용하고 있는지 확인해야 합니다.
7.1.3 병렬 스트림의 올바른 사용법
스트림을 병렬화해서 코드 실행 속도를 빠르게 하고 싶으면 항상 병렬화를 올바르게 사용하고 있는지 확인해야 합니다. 병렬 스트림을 잘못 사용하면서 발생하는 많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문에 일어납니다.
ex. n까지의 자연수를 더하면서 공유된 누적자를 바꾸는 프로그램을 구현한 예제 입니다.
public class ParallelStreams {
public static long sideEffectSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).forEach(accumulator::add);
return accumulator.total;
}
public static class Accumulator {
private long total = 0;
public void add(long value) {
total += value;
}
}
}
- 해당 코드는 본질적으로 순차 실행할 수 있도록 구현되어 있으므로 병렬로 실행하면 안됩니다. 특히 total을 접근할 때마다(다수의 스레드에서 동시에 데이터에 접근하는) 데이터 레이스 문제가 생깁니다.
- 동기화로 문제를 해결하다보면 -> 결국 병렬화라는 특성이 없어집니다.
public class ParallelStreams {
public static long sideEffectParallelSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
return accumulator.total;
}
}
public class ParallelStreamsHarness {
public static void main(String[] args) {
System.out.println("SideEffect parallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + " msecs" );
}
}
- 위 코드를 실행하면서 각 실행 결과를 출력했을 때 다음과 같은 결과를 얻을 수 있습니다.
- 이 때 올바른 결과값(
50000005000000
)이 나오지 않는걸 알 수 있는데 여러 스레드에서 동시에 누적자, 즉 total += value를 실행하면서 이런 문제가 발생합니다. - 결국 여러 스레드에서 공유하는 객체의 상태를 바꾸는 forEach 블록 내부에서 add 메서드를 호출하면서 이 같은 문제가 발생합니다.
- 병렬 스트림과 병렬 계산에서는 공유된 가변 상태를 피해야 한다는 사실을 알 수 있습니다.
7.1.4 병렬 스트림 효과적으로 사용하기
어떤 상황에서 병렬 스트림을 사용할 것인지 약간의 수량적 힌트를 정하는 것이 도움이 될 때도 있습니다. 아래 기준들을 통해 알아보도록 하겠습니다.
확신이 서지 않으면 직접 측정하라.
- 언제나 병렬 스트림이 순차 스트림보다 빠른 것은 아니며 더욱이 병렬 스트림의 수행 과정은 투명하지 않을 때가 많습니다.
- 따라서 순차 스트림과 병렬 스트림 중 어떤 것이 좋을 지 모르겠다면 적절한 벤치마크로 직접 성능을 측정하는 것이 바람직합니다.
박싱을 주의하라.
- 자동 박싱과 언박싱은 성능을 크게 저하시킬 수 있는 요소입니다.
- Java 8은 박싱 동작을 피할 수 있도록 기본형 특화 스트림(IntStream, LongStream, DoubleStream)을 제공합니다. 따라서 되도록이면 기본형 특화 스트림을 사용하는 것이 좋습니다.
순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다.
- 특히
limit
이나findFirst
처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 합니다. - 정렬된 스트림에
unordered
를 호출하면 비정렬된 스트림을 얻을 수 있습니다. 스트림에 N개 요소가 있을 때 요소의 순서가 상관없다면(ex. 소스가 list) 비정렬된 스트림에limit
을 호출하는 것이 더 효율적입니다.
스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라.
- 처리해야 할 요소 수가 N이고 하나의 요소를 처리하는데 드는 비용을 Q라 하면 전체 스트림 파이프라인 처리 비용을 N * Q 로 예상할 수 있습니다.
- Q가 높아진다는 것은 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음을 의미합니다.
소량의 데이터에서는 병렬 스트림이 도움 되지 않는다.
- 소량의 데이터를 처리하는 상황에서는 병렬화 과정에서 생기는 부가 비용을 상쇄할 수 있을 만큼의 이득을 얻지 못하기 때문입니다.
스트림을 구성하는 자료구조가 적절한지 확인하라.
- ex.
ArrayList
를LinkedList
보다 효율적으로 분할할 수 있는데,LinkedList
는 분할하려면 모든 요소를 탐색해야 하지만ArrayList
는 요소를 탐색하지 않고도 리스트를 분할할 수 있기 때문입니다. - ex.
range
팩토리 메서드로 만든 기본형 스트림도 쉽게 분해할 수 있습니다. - ex. 커스텀
Spliterator
를 구현해서 분해 과정을 완벽하게 제어할 수 있습니다.
스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
- ex.
SIZED
스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 스트림을 병렬 처리할 수 있습니다. - 반면 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없게 됩니다.
최종 연산의 병합 과정 비용을 살펴보라.
- 병합 과정의 비용이 비싸다면 병렬 스트림으로 얻은 성능의 이익이 서브스트림의 부분 결과를 합치는 과정에서 상쇄될 수 있습니다.
참고: 스트림 소스의 병렬화 친밀도(분해성)
ArrayList
-> 훌륭함LinkedList
-> 나쁨IntStream.range
-> 훌륭함Stream.iterate
-> 나쁨HashSet
-> 좋음TreeSet
-> 좋음
7.2 포크/조인 프레임워크
포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었습니다.
포크/조인 프레임워크에서는 서브 태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService
인터페이스를 구현합니다.
7.2.1 Recursive Task 활용
스레드 풀을 이용하려면 RecursiveTask<R>
의 서브클래스를 만들어야 합니다.
여기서 R
은 병렬화된 태스크가 생성하는 결과 형식 또는 결과가 없을 때는 RecursiveAction
형식입니다. RecursiveTask
를 정의하려면 추상 메서드 compute
를 구현해야 합니다.
protected abstract R compute();
compute
메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의합니다.- 따라서 대부분의
compute
메서드 구현은 다음과 같은 Pseudo-code(의사코드) 형식을 유지합니다.
if (태스크가 충분히 작거나 더 이상 분할할 수 없으면) {
순차적으로 태스크 계산
} else {
태스크를 두 서브태스크로 분할
태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함
모든 서브태스크의 연산이 완료될 때까지 기다림
각 서브태스크의 결과를 합침
}
재귀적인 태스크 분할 과정 (포크/조인 과정)
- 이 알고리즘은 분할 후 정복(divide-and-conquer) 알고리즘의 병렬화 버전입니다.
예제를 통해 포크/조인 프레임워크를 사용하는 방법을 확인해보기
ex. 포크/조인 프레임워크를 이용해서 범위의 숫자를 더하는 예제입니다.(long[]으로 이루어진 숫자 배열 사용)
public class ParallelStreamsHarness {
public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();
}
import static modernjavainaction.chap07.ParallelStreamsHarness.FORK_JOIN_POOL;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
//RecursiveTask를 상속받아 포크/조인 프레임워크에서 사용할 태스크를 생성
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
//이 값 이하의 서브태스크는 더 이상 분할할 수 없음
public static final long THRESHOLD = 10_000;
private final long[] numbers; //더할 숫자 배열
private final int start; //이 서브태스크에서 처리할 배열의 초기 위치
private final int end; //최종 위치
//메인 태스크를 생성할 때 사용할 공개 생성자
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
//메인 태스크의 서브태스크를 재귀적으로 만들 때 사용할 비공개 생성자
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
//RecursiveTask의 추상 메서드 오버라이드
@Override
protected Long compute() {
//태스크에서 더할 배열의 길이
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially(); //기준값과 같거나 작으면 순차적으로 결과를 계산
}
//배열의 첫 번째 절반을 더하도록 서브 태스크를 생성
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
leftTask.fork(); //ForkJoinPool의 다른 스레드로 새로 생성한 태스크를 비동기로 실행
//배열의 나머지 절반을 더하도록 서브 태스크를 생성
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
Long rightResult = rightTask.compute(); //두 번째 서브태스크를 동기 실행(이때 추가로 분할이 일어날 수 있음)
Long leftResult = leftTask.join(); //첫 번째 서브태스크의 결과를 읽거나 아직 결과가 없으면 기다림
return leftResult + rightResult; //두 서브 태스크의 결과를 조합한 값이 이 태스크의 결과
}
//더 분할할 수 없을 때 서브태스크의 결과를 계산하는 단순한 알고리즘
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
/*
위 코드는 n까지의 자연수 덧셈 작업을 병렬로 수행하는 방법을 더 직관적으로 보여줍니다.
ForkJoinSumCalculator의 생성자로 원하는 수의 배열을 넘겨줄 수 있습니다.
*/
public static long forkJoinSum(long n) {
//LongStream으로 n까지의 자연수를 포함하는 배열을 생성
long[] numbers = LongStream.rangeClosed(1, n).toArray();
//생성된 배열을 ForkJoinSumCalculator의 생성자로 전달해서 ForkJoinTask를 생성
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
//생성한 태스크를 새로운 ForkJoinPool의 invoke 메서드로 전달
return FORK_JOIN_POOL.invoke(task);
//ForkJoinPool에서 실행되는 마지막 invoke 메서드의 반환값은 ForkJoinSumCalculator에서 정의한 태스크의 결과가 됩니다.
}
}
- 일반적으로는 애플리케이션에서는 둘 이상의 ForkJoinPool을 사용하지 않습니다.
- 즉, 소프트웨어의 필요한 곳에서 언제든 가져다 쓸 수 있도록 ForkJoinPool을 한 번만 인스턴스화해서 정적 필드에 싱글턴으로 저장합니다.
- ForkJoinPool을 만들면서 인수가 없는 디폴트 생성자로 이용했는데, 이는 JVM에서 이용할 수 있는 모든 프로세서가 자유롭게 풀에 접근할 수 있음을 의미합니다.
Runtime.availableProcessors
의 반환값으로 풀에 사용할 스레드 수를 결정합니다.- 이 때
availableProcessors
는 실제 프로세서 외에 하이퍼스레딩과 관련된 가상 프로세서도 개수에 포함됩니다.
ForkJoinSumCalculator 실행
ForkJoinSumCalculator
를 ->ForkJoinPool
로 전달하면 풀의 스레드가ForkJoinSumCalculator
의compute
메서드를 실행하면서 작업을 수행합니다.compute
메서드는 병렬로 실행할 수 있을만큼 태스크의 크기가 충분히 작아졌는지 확인하며, 아직 태스크의 크기가 크다고 판단되면 숫자 배열을 반으로 분할해서 두 개의 새로운ForkJoinSumCalculator
로 할당합니다.- 그러면 다시
ForkJoinPool
이 새로 생성된ForkJoinSumCalculator
를 실행합니다. - 이 과정이 재귀적으로 반복되면서 주어진 조건(ex. 덧셈 수행할 항목이 만 개 이하)을 만족할 때까지 태스크 분할을 반복합니다.
- 이제 각 서브태스크는 순차적으로 처리되며 포킹 프로세스로 만들어진 이진트리의 태스크를 루트에서 역순으로 방문합니다.
- 즉, 각 서브 태스크의 부분 결과를 합쳐서 태스크의 최종 결과를 계산합니다.
7.2.2 포크/조인 프레임워크를 제대로 사용하는 방법
join
메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킵니다.- 따라서 두 서브태스크가 모두 시작된 다음에
join
을 호출해야 합니다. 그렇지 않으면 각각의 서브태스크가 다른 태스크가 끝나길 기다리는 일이 발생하며 원래 순차 알고리즘보다 느리고 복잡한 프로그램이 될 수 있습니다.
- 따라서 두 서브태스크가 모두 시작된 다음에
RecursiveTask
내에서는ForkJoinPool
의invoke
메서드를 사용하지 말아야 합니다.- 대신
compute
나fork
메서드를 직접 호출할 수 있습니다. 순차 코드에서 병렬 계산을 시작할 때만invoke
를 사용합니다.
- 대신
- 서브태스크에
fork
메서드를 호출해서ForkJoinPool
의 일정을 조절할 수 있습니다.- 왼쪽 작업과 오른쪽 작업 모두에
fork
메서드를 호출하는 것이 자연스러울 것 같지만 한쪽 작업에는fork
를 호출하는 것보다는compute
를 호출하는 것이 효율적입니다. - 그러면 두 서브태스크의 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있습니다.
- 왼쪽 작업과 오른쪽 작업 모두에
- 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅하기 어렵습니다.
fork
라 불리는 다른 스레드에서compute
를 호출하므로 IDE의 디버깅하는 스택 트레이스가 도움이 되지 않기 때문입니다.
- 병렬 스트림에서 살펴본 것처럼 멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠를 거라는 생각을 버려야 합니다.
- 병렬 처리로 성능을 개선하려면 태스크를 여러 독립적인 서브 태스크로 분할할 수 있어야 하는데 각 서브태스크의 실행시간은 새로운 태스크를 포킹하는데 드는 시간보다 길어야 합니다.
- 성능을 측정할 때도 여러 번 프로그램을 실행한 결과를 측정해야 합니다.
- 또한 컴파일러 최적화는 병렬 버전보다는 순차 버전에 집중될 수 있다는 사실도 기억해야 합니다.
7.2.3 작업 훔치기
이론적으로는 코어 개수만큼 병렬화된 태스크로 작업부하를 분할하면 모든 CPU 코어에서 태스크를 실행할 것이고 크기가 같은 각각의 태스크는 같은 시간에 종료될 것이라고 생각할 수 있습니다. 그러나 현실에서는 각각의 서브태스크의 작업완료 시간이 크게 달라질 수 있습니다. 분할 기법이 효율적이지 않았기 때문일 수도 있고 아니면 예기치 않게 디스크 접근 속도가 저하되었거나 외부 서비스와 협력하는 과정에서 지연이 생길 수 있기 때문입니다.
포크/조인 프레임워크에서는 작업 훔치기
라는 기법으로 이 문제를 해결합니다.
- 작업 훔치기 기법에서는
ForkJoinPool
의 모든 스레드를 거의 공정하게 분할합니다. - 각각의 스레드는 자신에게 할당된 태스크를 포함된 이중 연결 리스트를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리합니다.
- 이 때 한 스레드는 다른 스레드보다 자신에게 할당된 태스크를 더 빨리 처리할 수 있습니다. 즉, 다른 스레드는 바쁘게 일하고 있는데 한 스레드는 할 일이 다 떨어진 상황입니다.
- 이 때 할 일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라 다른 스레드의 큐의 꼬리(tail)에서 작업을 훔쳐옵니다.
- 모든 태스크가 작업을 끝낼 때까지, 즉 모든 큐가 빌 때까지 이 과정을 반복합니다.
- 따라서 태스크의 크기를 작게 나누어야 작업자 스레드 간의 작업부하를 비슷한 수준으로 유지할 수 있습니다.
- 풀에 있는 작업자 스레드의 태스크를 재분배하고 균형을 맞출 때 작업 훔치기 알고리즘을 사용합니다.
- 작업자의 큐에 있는 태스크를 두 개의 서브 태스크로 분할했을 때 둘 중 하나의 태스크를 다른 유휴 작업자가 훔쳐갈 수 있습니다.
- 그리고 주어진 태스크를 순차 실행할 단계가 될 때까지 이 과정을 재귀적으로 반복합니다.
7.3 Spliterator 인터페이스
Spliterator
는 분할할 수 있는 반복자라는 의미로 Iterator
처럼 소스의 요소 탐색 기능을 제공한다는 점은 같지만 병렬 작업에 특화되어 있습니다.
Java 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator
구현을 제공합니다. 컬렉션은 spliterator
라는 메서드를 제공하는 Splitator
인터페이스를 구현합니다.
Spliterator 인터페이스
public interface Spliterator<T> { // T: 탐색하는 요소의 형식
// tryAdvance: Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 참을 반환 (Iterator 처럼)
boolean tryAdvance(Consumer<? super T> action);
// trySplit: Spliterator의 일부 요소(자신이 반환한 요소)를 분할해서 두 번째 Spliterator를 생성하는 메서드
Spliterator<T> trySplit();
// estimateSize: 탐색해야 할 요소 수 정보를 제공
long estimateSize();
int characteristics();
}
7.3.1 분할 과정
- 1단계: 첫 번째
Spliterator
에trySplit()
을 호출하면 두 번째Spliterator
가 생성됩니다. - 2단계: 두 개의
Spliterator
에trySplit()
을 다시 호출하면 네 개의Spliterator
가 생성됩니다. 이처럼trySplit()
의 결과가 null이 될 때까지 이 과정을 반복합니다.
- 3단계:
trySplit()
이 null을 반환했다는 것은 더 이상 자료구조를 분할할 수 없음을 의미합니다. - 4단계:
Spliterator
에 호출한 모든trySplit()
의 결과가 null이면 재귀 분할 과정이 종료됩니다. - 이 분할 과정은
characteristics
메서드로 정의하는Spliterator
특성에 영향을 받습니다.
Spliterator 특성
int characteristics();
Spliterator
는characteristics
라는 추상 메서드도 정의하는데 이 메서드는Spliterator
자체의 특성 집합을 포함하는 int를 반환합니다.Spliterator
의 특성은 다음과 같습니다.ORDERED
: 리스트처럼 요소에 정해진 순서가 있으므로Spliterator
는 요소를 탐색하고 분할할 때 이 순서에 유의해야 합니다.DISTINCT
: x, y 두 요소를 방문했을 때x.equals(y)
는 항상 false를 반환합니다.SORTED
: 탐색된 요소는 미리 정의된 정렬 순서를 따릅니다.SIZED
: 크기가 알려진 소스(ex. set)로Spliterator
를 생성했으므로estimatedSized()
는 정확한 값을 반환합니다.NON-NULL
: 탐색하는 모든 요소는 null이 아닙니다.IMMUTABLE
: 이Spliterator
의 소스는 불변입니다. 즉, 요소를 탐색하는 동안 요소를 추가하거나, 삭제하거나, 고칠 수 없습니다.CONCURRENT
: 동기화 없이Spliterator
의 소스를 여러 스레드에서 동시에 고칠 수 있습니다.SUBSIZED
: 이Spliterator
그리고 분할되는 모든Spliterator
는SIZED
특성을 갖습니다.
7.3.2 커스텀 Spliterator 구현하기
ex. 문자열의 단어 수를 계산하는 메서드를 구현하는 예제입니다.
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class WordCount {
public static final String SENTENCE =
" Nel mezzo del cammin di nostra vita "
+ "mi ritrovai in una selva oscura"
+ " che la dritta via era smarrita ";
public static void main(String[] args) {
System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");
}
public static int countWordsIteratively(String s) {
int counter = 0;
boolean lastSpace = true;
//문자열의 모든 문자를 하나씩 탐색
for (char c : s.toCharArray()) {
if (Character.isWhitespace(c)) {
lastSpace = true;
}
else {
//문자를 하나씩 탐색하다 공백 문자를 만나면 지금까지 탐색한 문자로 간주하여(공백 문자는 제외) 단어 수를 증가
if (lastSpace) {
counter++;
}
lastSpace = Character.isWhitespace(c);
}
}
return counter;
}
}
결과
- 단어 사이에 공백이 여러 개일 때도 반복 구현 작동을 확인하기 위해 문장에 임의로 공백을 추가했습니다.
- 반복형 대신 함수형을 이용하면 직접 스레드를 동기화하지 않고도 병렬 스트림으로 작업을 병렬화할 수 있습니다.
함수형으로 단어 수를 세는 메서드 재구현하기
- String을 스트림으로 변환해야 하는데 스트림은 int, long, double 기본형만 제공하므로
Stream<Character>
를 사용합니다.
Stream<Character> stream = IntStream.range(0, SENTENCE.length()).mapToObj(SENTENCE::charAt);
- 스트림에 리듀싱 연산을 실행하면서 단어 수를 계산할 수 있습니다.
- 이 때 지금까지 발견한 단어 수를 계산하는 int 변수와 마지막 문자의 공백여부를 기억하는 Boolean 변수 등이 필요합니다.
- 이들 변수 상태를 캡슐화하는
WordCounter
를 만듭니다.
private static class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
//문자열의 문자를 하나씩 탐색
public WordCounter accumulate(Character c) {
if (Character.isWhitespace(c)) {
return lastSpace ? this : new WordCounter(counter, true);
} else {
//문자를 하나씩 탐색하다가 공백 문자를 만나면 지금까지 탐색한 문자를 단어로 간주, 단어 수 증가
return lastSpace ? new WordCounter(counter + 1, false) : this;
}
}
//두 WordCounter의 counter값을 더함
public WordCounter combine(WordCounter wordCounter) {
return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); // 마지막 공백은 신경 안씀
}
public int getCounter() {
return counter;
}
}
accumulate()
메서드는WordCounter
의 상태를 어떻게 바꿀 것인지, 또는WordCounter
는 불변 클래스이므로 새로운WordCounter
클래스를 어떤 상태로 생성할 것인지 정의합니다.- 스트림을 탐색하면서 새로운 문자를 찾을 때마다
accumulate()
메서드를 호출합니다. - 새로운 비공백 문자를 탐색한 다음에 마지막 문자가 공백이면
counter
를 증가시킵니다.
- 스트림을 탐색하면서 새로운 문자를 찾을 때마다
combine()
메서드는 문자열 서브 스트림을 처리한 WordCounter의 결과를 합칩니다.WordCounter
의 내부counter
값을 서로 합칩니다.
private static int countWords(Stream<Character> stream) {
WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine);
return wordCounter.getCounter();
}
- 문자 스트림의 리듀싱 연산을 직관적으로 구현합니다.
public static void main(String[] args) {
System.out.println("Found " + countWords(SENTENCE) + " words");
}
결과
- 반복 버전과 같은 결과가 출력됩니다.
WordCounter 병렬로 수행하기
public static void main(String[] args) {
System.out.println("Found " + countWords(SENTENCE) + " words");
}
public static int countWords(String s) {
Stream<Character> stream = IntStream.range(0, s.length()).mapToObj(SENTENCE::charAt).parallel();
Spliterator<Character> spliterator = new WordCounterSpliterator(s);
return countWords(stream);
}
결과
- 단어 수를 계산하는 연산을 병렬 스트림으로 처리하면 원하는 결과가 나오지 않습니다.
- 원래 문자열을 임의의 위치에서 둘로 나누다보니 예상치 못하게 하나의 단어를 둘로 계산하는 상황이 발생할 수 있습니다.
- 즉, 순차 스트림을 병렬 스트림으로 바꿀 때 스트림 분할 위치에 따라 잘못된 결과가 나올 수 있습니다.
- ✅ 문자열을 임의의 위치에서 분할하지 말고 단어가 끝나는 위치에서만 분할하는 방법으로 이 문제를 해결할 수 있습니다.
- -> 이를 위해 단어 끝에서 문자열을 분할하는 문자
Spliterator
를 구현한 다음에 병렬 스트림으로 전달합니다.
- -> 이를 위해 단어 끝에서 문자열을 분할하는 문자
private static class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
private WordCounterSpliterator(String string) {
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
//현재 문자를 소비
action.accept(string.charAt(currentChar++));
//소비할 문자가 남아있으면 true 반환
return currentChar < string.length();
}
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
//파싱할 문자열을 순차 처리할 수 있을만큼 충분히 작아졌음을 알리는 null 반환
if (currentSize < 10) {
return null;
}
//파싱할 문자열의 중간을 분할 위치로 설정
for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
//다음 공백이 나올 때까지 분할 위치를 뒤로 이동시킴
if (Character.isWhitespace(string.charAt(splitPos))) {
//처음부터 분할 위치까지 문자열을 파싱할 새로운 WordCounterSpliterator를 생성
Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos));
//이 WordCounterSpliterator의 시작 위치를 분할 위치로 설정
currentChar = splitPos;
//공백을 찾았고 문자열을 분리했으므로 루프를 종료
return spliterator;
}
}
return null;
}
//탐색해야 할 요소의 개수
@Override
public long estimateSize() {
//파싱할 문자열 전체 길이 - 현재 반복중인 위치
return string.length() - currentChar;
}
@Override
public int characteristics() {
/*
ORDERED: 문자열의 문자 등장 순서가 유의미함
SIZED: estimatedSize 메서드의 반환값이 정확함
SUBSIZED: trySplit으로 생성된 Spliterator도 정확한 크기를 가짐
NONNULL: 문자열에는 null 문자가 존재하지 않음
IMMUTABLE: 문자열 자체가 불변 클래스이므로 문자열을 파싱하면서 속성이 추가되지 않음
*/
return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
}
}
- 분석 대상 문자열로
Spliterator
를 생성한 다음에 현재 탐색 중인 문자를 가리키는 인덱스를 이용해서 모든 문자를 반복 탐색합니다. tryAdvance()
메서드는 문자열에서 현재 인덱스에 해당하는 문자를Consumer
에 제공한 다음에 인덱스를 증가시킵니다.- 인수로 전달된
Consumer
는 스트림을 탐색하면서 적용해야 하는 함수 집합이 작업을 처리할 수 있도록 소비한 문자를 전달하는 자바 내부 클래스입니다.
- 인수로 전달된
trySplit()
메서드는 이전compute()
메서드처럼 우선 분할 동작을 중단할 한계를 설정해야 하는데 여기서는 아주 작은 한계값(10개)를 사용했지만 실전에서는 너무 많은 태스크를 만들지 않도록 더 높은 한계값을 설정해야 합니다.
결과 - WordCounterSpliterator 활용
public static void main(String[] args) {
System.out.println("Found " + countWords(SENTENCE) + " words");
}
public static int countWords(String s) {
Spliterator<Character> spliterator = new WordCounterSpliterator(s);
//true는 병렬 스트림 생성 여부를 지시
Stream<Character> stream = StreamSupport.stream(spliterator, true);
return countWords(stream);
}
- 예상대로 같은 출력 결과가 나온 것을 알 수 있습니다.
- 지금까지
Spliterator
에서 어떻게 자료구조 분할 과정을 제어할 수 있는지 살펴봤는데Spliterator
는 첫 번째 탐색 시점, 첫 번째 분할 시점, 또는 첫 번째 예상 크기 요청 시점에 요소의 소스를 바인딩할 수 있습니다.- 이와 같은 동작을
늦은 바인딩 Spliterator
라고 부릅니다.
- 이와 같은 동작을