[Apache Spark] RDD 재사용을 위한 persist, cache, checkpointing

Jaemun Jung
6 min readJun 20, 2020

--

Reusing RDDs

스파크는 RDD 재사용을 위해 몇가지 옵션을 제공한다. (persistence, caching, checkpointing)
이러한 재사용 옵션을 사용하기 위해서는 당연히 비용이 수반되며, 무조건 영속화를 해두는 것이 능사는 아니다. 큰 규모의 데이터세트는 영속화 비용도 크게 높아지므로 오히려 재연산을 유도하는 것이 나을 수도 있다.
그러면 어떤 케이스에서 RDD 재사용을 통해 퍼포먼스를 향상시킬 수 있을지 알아보자.

  1. 반복적인 연산
    동일한 데이터세트에 대해 반복적으로 연산을 수행한다면 이 데이터세트를 영속화해두는 것이 좋다. 매번 연산할 때마다 데이터세트가 메모리 내에 존재하고 있는 것이 보장되므로 성능 향상을 기대할 수 있다.
  2. 동일 RDD에 대해 여러 번의 액션 호출
  3. 각 파티션의 연산 비용이 너무 큰 경우
    여러번 사용하지 않더라도, 중간 결과를 저장하여 실패 시의 비용을 줄일 수 있다. 일반적으로 좁은 트랜스포메이션은 넓은 트랜스포메이션보다는 빠르지만, 파티션별 모델 훈련이나 매우 컬럼이 많은 행으로 작업하는 등 일부 좁은 트랜스포메이션이 클러스터의 executor의 처리량보다 더 큰 GC 오버헤드나 메모리 부담을 만들어낸다면 체크포인팅이나 off_heap 영속화가 빛을 발할 수 있다.

재연산 비용 효율성 판단

메모리 영속화(memory persist)는 스파크의 Flagship과 같은 대표적인 튜닝 기능이지만 공짜는 아니다!:
- 메모리 공간이 필요하며
- 직렬화와 역직렬화를 위한 시간도 필요하다.

  • 메모리 영속화든 연산이든 모두 spark executor JVM 안에서 이루어진다. 따라서 메모리를 많이 차지하는 메모리 영속화는 메모리 오류의 위험도를 올릴 수 있다. 혹은 재연산보다 가비지 컬렉션 비용이 더 높은 결과를 가져올 수도 있다.
  • 디스크 영속화나 checkpointing은 읽기와 쓰기 비용이 높아지므로 맵리듀스의 단점을 그대로 가지게 된다.
  • 일반적으로 연산 규모가 클러스터나 작업 규모에 비해 크다면 재연산보다 RDD의 재사용이 더 가치있다.
  • 작업이 GC나 메모리 부족 오류로 실패한다면, 특히 클러스터에 다른 잡들도 많다면, 체크포인팅이나 off_heap 영속화가 도움이 될 수 있다. (하지만 이미 cache를 사용하고 있다면 이 호출을 없애는 것도 고려해보자)

Types of Reuse: Cache, Persist, Checkpoint, Shffule files

재사용 방식들에 대해서 조금 더 자세히 알아보자.

  • cache(= inmemory persist와 동일)와 persist는 RDD를 스파크 잡 동안 유지하므로 재연산을 피하거나 긴 계보를 가진 RDD를 끊는 데 유용하다.
  • checkpointing은 잡의 중간 결과를 저장함으로써 실패에 따른 고비용 재연산을 막는데 사용할 수 있다.

persist and cache

RDD를 영속화(persist)하면 재사용을 위해 RDD를 실제 데이터로 구체화한다. (보통 executor의 메모리에 저장) 또한 스파크는 영속화된 RDD의 유실에 대비해 그 계보 또한 저장한다.

persist 함수는 RDD를 저장하는 StorageLevel을 인자로 받는다.

http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

위 표의 Storage Level은 아래 다섯가지의 속성의 조합으로 만들어졌다: userDisk, useMemory, useOffHeap, deserialized, replication

  • userDisk : 메모리에 들어가지 않는 파티션은 디스크에 기록된다. 디스크 IO 비용도 크므로 이 옵션을 사용하는 경우는 재연산 비용이 큰 경우가 될 것이다. (재연산의 비용이 크지 않은 경우 일부 파티션을 재연산하는 것이 더 빠를 수도 있다.)
  • useMemory : 메모리에 저장되거나 직접 디스크에 기록된다.
  • useOffHeap : Tachyon과 같은 executor 밖의 외부 시스템에 저장. 메모리이슈가 심각하거나 클러스터가 혼잡한 편이고 파티션이 자꾸 메모리에서 제거된다면 고려해볼만 하다.
  • deserialized : 직렬화되지 않은 Java 객체로 저장된다. MEMORY_ONLY_SER 처럼 suffix로 _SER이 붙는 옵션들은 직렬화를 사용한다.
    (RDD가 메모리에 들어가기 너무 크다면 우선 MEMORY_ONLY_SER 옵션으로 직렬화를 시도해본다. RDD에 빨리 접근하도록 유지하면서도 저장에 필요한 메모리는 줄어든다.)
  • replication : 영속화 데이터의 복사본 개수를 정수로 지정. default = 1.

Checkpointing

RDD를 HDFS나 S3같은 외부 저장 시스템에 쓰며, 영속화와 달리 RDD의 계보를 잃어버린다. RDD가 스파크 외부에 저장되므로, 스파크 애플리케이션이 종료된 이후에도 데이터가 유효하며, 실행 시 RDD의 평가를 강제한다.
일반적으로 IO비용이 더 비싼 연산을 필요로 하므로 영속화보다 느릴 수 있으나, 스파크의 메모리를 전혀 쓰지 않으며, 스파크의 worker node에 문제가 생기더라도 재연산이 필요치 않다는 장점이 있다.

Checkpointing은 실패나 재연산의 비용에 대한 우려가 클 때 사용할 수 있다.
러프하게 정리해보면,
중복 작업으로 느린 Job에는 persist를 사용하고,
중간 실패가 우려되는 Job에는 checkpointing
을 고려해볼 수 있겠다.

LRU Caching

더 이상 쓰이지 않는 RDD라고 해서 자동으로 영속화가 해제되지는 않는다.
드라이버에서 명시적으로 unpersist 함수가 호출되거나, 메모리 공간의 압박으로 축출(evict)되기 전까지는 메모리에 남아있게 된다.

스파크는 executor가 메모리 부족을 겪을 때 뺄 파티션을 결정하기 위해 LRU(Least Recently Used) caching 방식을 사용하여 쓰인지 가장 오래된 데이터를 뺀다.

영속화된 RDD를 빼고 공간을 확보하고 싶다면 unpersist를 사용하자.

Shuffle files

persist나 checkpoint를 호출하는 것과 관계없이 스파크는 셔플하는 동안 디스크에 데이터를 쓴다. shuffle files라 부르며, mapper에 의해 정렬된 각 입력 파티션의 모든 레코드를 포함한다. 보통 application이 실행되는 동안 worker node의 로컬 디렉토리에 남아있으며, 드라이버프로그램이 셔플된 RDD를 재사용한다면 해당 지점까지는 재연산을 피할 수 있다.

Reference

Holden Karau, High Performance Spark, JPub, pp.123~134, 2017

--

--