[Airflow] Execution Date 사용하기
Airflow의 Execution Date을 한번 정리하자.
가장 기본적이고 간단한 건데, 아.. 왜이렇게 헷갈릴까?
특히 우리 서비스는 UTC, IST, KST의 3개 시간대를 고려해야하는 조금 특이한 경우라서 난이도가 좀 더 올라가게 되었다. (UTC +00:00, IST +05:30, KST +09:00) 서버시간은 UTC고 내 몸은 한국에 있지만 우리 서비스는 인도에서 수행되고.. 몇시간 더하면 날짜가 바뀌고.. 머릿속으로 계산하다보면 항상 헷갈린다.
전통적인 RDBMS ETL 툴인 Data-Stage나 TeraStream이나, Apahce Oozie와 Airflow, Informatica ETL 등의 workflow tool을 써보았지만 Airflow의 날짜 파라미터 세팅 부분이 유독 좀 헷갈리는 부분이 있기는 하다.
수행 시 날짜 파라미터를 명시적으로 받기보다 Execution Date 기준으로 산출되기 때문인데, 이 부분을 한번 정리해보자.
기초 정보
Execution Date?
일반적으로 일자별 배치는 하루가 마감된 새벽에 수행된다. 예를 들어, 2020–04–20 하루 24시간의 데이터를 집계하기 위한 Job은 2020–04–21 새벽에 수행된다.
즉 2020–04–20 00:00 ~ 2020–04–21 00:00의 데이터를 집계한다는 것의 의미는 ‘2020–04–21’의 데이터가 아닌 ‘2020-04–20’ 의 데이터를 집계하는 것이므로, 날짜 파라미터를 left-bound하여 사용한다고 한다.
Airflow의 Execution Date은 현재 수행하는 일자로부터 left-bound된 값이 제공된다.
예시를 들어보면
- 2020–04–21 01:00에 수행되는, 2020–04–20 하루동안의 데이터 집계 목적의 스케쥴을 예시로 들자. (2020–04–20 00:00~2020–04–20 23:59)
step_foo_bar = EmrAddStepsOperator(
task_id='step_foo_bar',
aws_conn_id='aws_default',
steps=add_step('step_foo_bar', 'CONTINUE', 'dbname', 'step_foo_bar', "{{ ds }}", 'FAIR', True),
dag=dag,
trigger_rule=TriggerRule.ALL_DONE
)
- 위 Operator 내부의 날짜 변수로 {{ ds }} 라는 변수를 활용하고 있다. {{ ds }}는 Execution Date과 동일하게 계산된 날짜 변수이다.
- 이 경우 일자 정보의 구성은 아래와 같다.
Job is running on 2020–04–21
Execution Date = 2020–04–20
{{ ds }} = 2020–04–20
서버 시간과 수행하고자 하는 날짜가 다르게 스케쥴 된 경우
위의 예시는 UTC와 KST, IST등의 일자 정보가 다른 경우를 고려할 경우가 없는 경우이다.
UTC 일자와 IST 일자가 다른 경우. 즉, 서버 시간과 집계하고자 하는 일자가 다른 경우를 한번 예시로 들어보자. Job은 IST 기준 05:00에 수행된다.
- UTC 2020–04–20 11:30
- IST 2020–04–21 05:00
내가 집계하고 싶은 데이터는 IST 기준의 2020–04–20의 데이터이다.
그런데 Execution Date은 2020–04–20으로부터 left-bound 되어 2020–04–19로 설정되기 때문에, Execution Date 을 그대로 받아오는 {{ds}}가 아닌 {{tomorrow_ds}}를 사용해야 2020–04–20이라는 원하는 일자 파라미터를 활용할 수 있다.
위 부분이 시간대를 고려할 때 실수하기 쉽다.
UTC 2020–04–20 11:30 — IST 2020–04–21 05:00 까지의 Job과,
UTC 2020–04–21 00:00 — IST 2020–04–21 05:30 부터의 Job이 날짜 파라미터가 달라져야 하기 때문이다.
몇시간만 스케쥴 조정하자~ 하고 별 생각없이 조정하다가 놓치기 쉬운 부분이다. (최소한 나는 그런 적이 있다…)
그 외 우리를 헷갈리게 만드는 정보들
- Base date : 일단 위 시간 설정 창에서 Base date은 크게 신경쓰지 말자. 그냥 DAG 히스토리를 조회할 때만 사용되는 기준일값이다. 보통 workflow에서 ‘Base date’이라는 명칭을 수행일자로 사용하는 경우가 종종 있기때문에 여기서 1차 혼선이 발생한다. ㅎㅎ
- 세번째 박스인 Run: trig_2020–04–30T … 값은 JOB의 ID이다. 여기서 보이는 2020–04–30T… 시간은 해당 Dag가 트리거 된 시간을 나타낸다. 그러나 반드시 이 Job ID가 수행 시간을 나타낸다고 보장하는 것은 아니다. Manual로 수동 생성하는 Job에서는 이 ID를 아무 의미없는 ‘ABCD 1234’ 로 정할 수도 있기 때문이다. 말 그대로 그냥 각 Job의 고유값이라고 이해하면 좋다.
- 가장 우측에 있는 [GO] 버튼은 누르는 순간 마치 뭔가 JOB이 수행 될 것 같은 분위기를 풍기지만 사실은 Run에서 선택한 history를 [조회]만 하는 기능이라는 충격적인 사실!
(아… Airflow와 같은 좋은 툴을 오픈 소스로 제공한 Maxime과 같은 엔지니어에게 정말 감사하지만.. 개발자가 만든 UI란…)
예전에 수행되었던 Job을 재수행 하려면
기존에 수행된 적이 있는 Job을 재수행 하는 것이라면 해당 일자의 Dag에서 [Clear] 를 통해서 수행하면 그 날짜에 수행되었던 execution date 기준으로 다시 재수행된다.
그런데 만약 수행된 적이 없는 일자의 Job을 수행하고 싶다면 아래와 같은 방법을 써야한다.
Manual 수행일자를 지정하려면
새로운 run을 돌리기 위해서 Execution Date의 지정이 필요한 경우 :
- Browse > Dag Runs > Create 메뉴
- Dag Id, Excution Date를 지정하고, Run Id도 원하는 대로 임의의 값을 지정한다.
- 위와 같이 설정하면, 날짜 파라미터인 {{ ds }} 등의 값이 설정한 Execution Date을 기준으로 계산되어 제공될 것이다.
간단한 내용인데, UTC 서버 시간과 우리 job이 수행되는 시간을 고려하고, 이에 따른 Execution Date, 날짜 파라미터까지 고려하다보면 헷갈릴 때가 있다..
이건 나만 헷갈리는 건 아닌 거 같다. 팀원들도 일자 넣기 전에 이게 이게 맞는건가..? 하는 대화를 몇번씩은 했으니까.. 우리만 그런가..?;;
한번 정리했으니까 이제 잘 쓸 수 있겠지..ㅎㅎ
Reference
https://stackoverflow.com/questions/39612488/airflow-trigger-dag-execution-date-is-the-next-day-why
https://airflow.apache.org/docs/stable/faq.html#what-s-the-deal-with-start-date