[Apache Spark] Partition 개수와 크기 정하기

Jaemun Jung
8 min readMay 2, 2020

Spark Patition의 개수와 크기는 어떻게 정하는 것이 좋을까?
판단에 기초가 될만한 기본적인 룰들과 파티션 사이즈를 기반으로 개수를 구하는 공식을 통해 알아보자.

(Partition 세팅 전에 고려해보아야 할 Executor 세팅은 [Apache Spark] Executor 사이즈와 개수 정하기에 정리했습니다.)

Spark Patition의 개수를 정하기 위한 정확한 룰과 매커니즘이 있는 것은 아니다. 경험적으로 최적의 설정값을 찾아서 사용해야 한다.

Spark의 transformation을 narrow transformation과 wide transformation으로 나누는데, 이 중 wide transformation이 실행될 때 partition의 개수를 명시적으로 바꿀 수 있다.

https://databricks.com/glossary/what-are-transformations

파티션 개수가 따로 지정되어 있지 않다면, spark.default.parallelism 설정값을 이용하게 된다.

spark.default.parallelism

이 기본값은 실행환경에 따라 달라진다. YARN Cluster Mode에서 이 값은 [Executor 개수 x Core 개수]이다. 이는 파티션 개수로 사용해야하는 최소값이며, 최적값은 아니다.

얼마나 많은 파티션을 써야 하는가?

파티션에 관한 몇가지 팁들

  • 각 Executor는 각 Core당 하나의 Task를 실행할 수 있으며, 한 Partition이 하나의 Task와 연계된다.
Executor JVM내의 partition당 하나의 task와 연결 후 reducer 개수만큼 output file 생성 (from https://0x0fff.com/spark-architecture-shuffle/)
  • 일반적으로 파티션의 개수를 늘리는 것은 오버헤드가 너무 많아지는 수준이 되기 전까지는 성능을 높여준다.
  • 총 코어 개수보다 적은 파티션을 쓰면 일부 CPU가 쉬게 되므로 최소한 총코어 개수 이상의 파티션을 사용해야 한다.
  • 파티션 개수를 늘리는 것은 각 Executor에서 스파크가 한 번에 처리하는 양이 적어지므로 메모리 부족 오류를 줄이는데 도움을 준다.
  • 파티션이 부족한 것보다는 차라리 조금 더 많은 것이 낫다.
    MapReduce에서 보수적으로 task의 수를 늘려나가는 것과는 가이드가 다른데, 이는 MapReduce는 각 task의 스타트업 오버헤드가 큰 반면 스파크는 그렇지 않기 때문이다.

파티션이 너무 많아도 문제

부족한 것보다는 낫지만, 당연히 너무 많아도 문제다. 그 이유는..

  • 스파크 드라이버가 모든 파티션의 메타데이터를 보관해야 한다.
  • Driver memory errors & Driver overhead errors를 유발할 수 있다.
  • 모든 파티션을 Scheduling 하기 위해 드는 시간은 공짜가 아니다.
  • 작은 사이즈의 파일들을 생성하기 위한 I/O가 많이 발생하며, 이 시간 또한 오래 걸릴 수 있다.(특히 block store에서)
from Holden Karaw - Understanding Spark tuning with autotuning

파티션 개수를 구하는 공식

파티션 사이즈 기준으로 역으로 파티션의 개수를 구하는 공식을 고려해볼 수 있다.
Spark Executor가 연산에 쓸 수 있는 공간(M)은 캐시된 데이터의 양(R)에 따라 M에서 M-R 사이가 된다. 따라서

연산에 쓸 수 있는 메모리(M) < (spark.executor.memory - overhead) * spark.memory.fraction

(참고 : spark.memory.fraction의 default값= 0.6)

만약 캐시된 데이터(R)가 있다면

연산에 쓸 수 있는 메모리(M-R) < (spark.executor.memory - overhead) * spark.memory.fraction * (1- spark.memory.storagefraction)

(참고 : spark.memory.storagefraction의 default값 = 0.5)

태스크들 간에 공간이 동일하게 나눠진다고 가정하면

태스크당 메모리 = 연산에 쓸 수 있는 메모리 / spark.executor.cores

따라서 파티션 개수를 구하는 공식은

파티션 개수 = 셔플 단계에서의 크기 / 태스크당 메모리

바로 위의 ‘셔플 단계에서의 크기’를 정확하게 구하기는 힘들다. 다만 heuristic하게 가장 가깝게 추정할 수 있는 공식은

메모리 내의 셔플 크기 = Shuffle Write * Shuffle spill (memory) / Shuffle spill (disk)

음.. 식으로만 보니까 더 복잡한 것 같다.
실제 예시를 들어보면,

Spark UI Environment의 Properties

executor memory가 8g, executor memory overhead는 2g (2048MiB) 인 경우
(지금보니 executor memory overhead가 조금 크게 잡혀있는 것 같다. executor memory overhead의 default 산식인 executorMemory * 0.10, with minimum of 384 대로 두어도 될 듯 하다.)

(spark.executor.memory — overhead) * spark.memory.fraction 는 (8g-2g)*0.6가 된다. 따라서

연산에 쓸 수 있는 메모리(M) < 3.6g

그럼 태스크당 메모리는

0.9g = 3.6g / 4(spark.executor.cores)

파티션 개수는 = 셔플 단계에서의 크기 / 태스크당 메모리

잠깐 셔플 단계에서의 크기를 아래 Spark Stage를 참고해서 구해보면..

셔플 단계에서의 크기 = Shuffle Write * Shuffle spill (memory) / Shuffle spill (disk)

114 * 5 / 0.2 = 2,850

shuffle spill된 예시를 stack overflow에서 찾아왔다

그럼 파티션 개수는 = 2850 / 0.9 = 3166

위 공식들을 하나의 식으로 표현하면 아래와 같다.

https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-2ddd

결론은

주의할 점은 위의 공식은 모든 레코드가 메모리로 읽어 들일 때 동일한 비율로 확장된다는 가정에 기초하고 있는데, 이 가정은 모든 상황에 항상 맞는 것은 아니다.
따라서 기본적으로는 위의 공식을 참고할 수 있지만, 절대적인 최적값을 보장하는 것은 아니다. 결국 각 환경과 데이터 특성에 따라 성능 향상이 더 이상 이루어지지 않을 때까지 파티션 개수를 늘리고 줄여보는 것 이상의 확실한 대안은 없다.

더 알아보기 좋은 글

https://www.slideshare.net/RachelWarren4/spark-autotuning-talk-final-110910491

Reference

Holden Karau, High Performance Spark, JPub, pp.312, 2017

https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-2

--

--