AWS Airflow 정리
AirFlow 구조
- 구조
- Scheduler : AirFlow의 DAG와 작업들을 모니터링하고 실행 순서와 상태 관리
- Worker : Airflow의 작업을 실행하는 공간
- Metadata Database : Airflow에서 실행할 작업에 관한 정보들을 저장
- Webserver : AirFlow의 User Interface 제공
- DAG Directory : Airflow에서 실행할 작업들을 파이프라인 형태로 저장
→ Airflow는 Scheduler가 DAG Directory의 작업을 가져와서 Worker에서 실행하는 형태
Doker를 사용하는 경우 docker-compose로 6개의 컨테이너를 한 번에 생성해야 한다.
- Airflow 이미지가 각각 Scheduler, Worker, Webserver, flower 컨테이너로 실행되어야 한다.
- 추가로 PostgreSQL과 redis 컨테이너도 필요하다.
- PostgreSQL은 Airflow의 설정 정보, DAG와 Task들의 메타 정보 등을 저장한다.
- redis는 메시지 브로커로서 Scheduler에서 Worker로 메시지를 전달하는 역할을 수행한다.
Operator
-
Airflow의 Operator 이해하기
-
PythonOperator
: 파이썬 코드(.py)를 실행하기 위한 Operator
일반적으로 자동화 스크립트를 파이썬으로 작성하는 경우가 많다
이러한 파이썬 코드들은 해당 Operator를 통해 실행할 수 있다.
예시
delete_user_demographic = PythonOperator( task_id="teardown__delete_user_demographic", python_callable=delete_user_demographic,#호출할 파이썬 개체(함수,클래스) do_xcom_push=True, dag=dag, )
-
AWSAthenaOperator
: Use the AWSAthenaOperation to run a query in Amazon Athena.
예시
# Using a task-decorated function to create a CSV file in S3 add_sample_data_to_s3 = add_sample_data_to_s3() create_table = AWSAthenaOperator( task_id='setup__create_table', query=QUERY_CREATE_TABLE, database=ATHENA_DATABASE, output_location=f's3://{S3_BUCKET}/{S3_KEY}', sleep_time=30, max_tries=None, aws_conn_id=AWS_CONN_ID, ) read_table = AWSAthenaOperator( task_id='query__read_table', query=QUERY_READ_TABLE, database=ATHENA_DATABASE, output_location=f's3://{S3_BUCKET}/{S3_KEY}', sleep_time=30, max_tries=None, aws_conn_id=AWS_CONN_ID, ) get_read_state = AthenaSensor( task_id='query__get_read_state', query_execution_id="", max_retries=None, sleep_time=10, aws_conn_id=AWS_CONN_ID, ) # Using a task-decorated function to read the results from S3 read_results_from_s3 = read_results_from_s3( "" ) drop_table = AWSAthenaOperator( task_id='teardown__drop_table', query=QUERY_DROP_TABLE, database=ATHENA_DATABASE, output_location=f's3://{S3_BUCKET}/{S3_KEY}', sleep_time=30, max_tries=None, aws_conn_id=AWS_CONN_ID, ) # Using a task-decorated function to delete the S3 file we created earlier remove_sample_data_from_s3 = remove_sample_data_from_s3() ( add_sample_data_to_s3 >> create_table >> read_table >> get_read_state >> read_results_from_s3 >> drop_table >> remove_sample_data_from_s3 )
Sensor
Sensor는 어떤 결과를 만족하는지 외부 이벤트를 주기적으로 체크할때 사용한다.
특정 조건을 만족할때까지 기다리거. 해당 조건을 만족하면 이후 Task를 진행한다.
-
Airflow의 Sensor 이해하기
❗참고로 _sensor로 끝나는 Sensor들은 대부분 Defrecated 되었다. 대신 이름에서 _sensor를 제외한 Sensor를 사용하면 된다.
Sensor에 공통적으로 **Kwargs라는 Keyword Arguments를 전달하는 부분이 있다. 해당 부분은 DAG를 정의할 때 정의했던 default_args가 전달된다고 이해하면 된다.
- Mode
- poke 모드 : Sensor가 Task를 감지할 때 까지 pool의 slot을 계속 점유하면서 일정 시간 간격으로 Sensor를 실행하며, 작업이 완료될 때까지 이러한 행동을 반복합니다.
- reschedule 모드 : Sensor가 Task를 감지하지 못하면 다시 작업을 예약한다. 예약한 작업 시간이 되기 전까지는 sensor의 동작을 일시 중지하기 때문에 pool의 slot을 차지하지 않는다.
- Tip
- poke, reschedule 모드 둘 다 execute메소드 내 무한루프로 감싸져 있고 내부에서 poke 메소드를 호출
- poke 모드는 sensor 작업이 완료되지 않았다면 주어진 인터벌만큼 sleep을 함 (=pool의 slot을 계속 차지)
- reschedule 모드는 sensor 작업이 완료되지 않았다면 다음 스케줄을 등록한 후, 무한루프를 빠져나가고 다음 인터벌에서 다시 execute 메소드를 호출
- reschedule 모드의 동작 코드는 이상하게 짜여져 있음
- custom sensor operator를 만든다면 poke모드가 더 쉽고 원하는대로 할 수 있음
-
AthenaSensor
: Asks for the state of the Query until it reaches a failure state or success state.
if the query fails, the task will fail.
-
HTTPSensor
: Wait for an API to be available. API request가 성공하는 것을 보장해야 할 때 유용한 Sensor.
def response_check(response, task_instance): # The task_instance is injected, so you can pull data form xcom # Other context variables such as dag, ds, execution_date are also available. xcom_data = task_instance.xcom_pull(task_ids="pushing_task") # In practice you would do something more sensible with this data.. print(xcom_data) return True HttpSensor(task_id="my_http_sensor", ..., response_check=response_check)
Parameters
• http_conn_id (str) – The http connection to run the sensor against • method (str) – The HTTP request method to use • endpoint (str) – The relative part of the full url • request_params (dict[str, Any] | None) – The parameters to be added to the GET url • headers (dict[str, Any] | None) – The HTTP headers to be added to the GET request • response_check (Callable[…, bool] | None) – A check against the ‘requests’ response object. The callable takes the response object as the first positional argument and optionally any number of keyword arguments available in the context dictionary. It should return True for ‘pass’ and False otherwise. • extra_options (dict[str, Any] | None) – Extra options for the ‘requests’ library, see the ‘requests’ documentation (options to modify timeout, ssl, etc.) • tcp_keep_alive (bool) – Enable TCP Keep Alive for the connection. • tcp_keep_alive_idle (int) –The TCP Keep Alive Idle parameter (corresponds to
socket.TCP_KEEPIDLE
). • tcp_keep_alive_count (int) – The TCP Keep Alive count parameter (corresponds tosocket.TCP_KEEPCNT
) • tcp_keep_alive_interval (int) – The TCP Keep Alive interval parameter (corresponds tosocket.TCP_KEEPINTVL
)
Sensor를 사용하기 위한 가장 효과적인 방법
Sensorr는 구현하기 쉽지만, 최고의 Airflow 경험을 흭득하기 위해선 몇 가지 기억해야 하는 사항이 있다.다음의 팁을 따르면, Sensor를 사용함으로써 발생할 수 있는 성능 이슈를 피할 수 있다.
- 언제나 의미있는 timeout 파라미터를 Sensor에 설정하자. timeout 파라미터의 기본 값은 7일이다. 7일은 Sensor가 실행되는 시간치고는 긴 편 이므로 Sensor를 정의 할 땐, 사용 목적과 Sensor가 얼마나 기다리게 될 지 등등을 모두 고려해서 Sensor의 Timeout을 적절하게 정의해야한다.
- 긴 시간동안 실행될 거 같은 Sensor라면, reschedule 방식을 사용해서 Sensor가 Worker 슬롯을 지속적으로 점유하지 않도록 하자. 이렇게 하면 Sensor들이 가용한 Worker 슬롯을 모두 점거해버리는 Deadlock 현상을 피할 수 있다. 다만, 다음의 경우는 예외이다.
- poke_interval 이 약 5분보다도 짧다면, poke 방식을 활용하자. 이런 상황에서 reschedule 방식을 사용하는 것은 스케쥴러의 부하를 줄 수 있다.
- 사용 사례에 적절한 poke_interval 값을 설정하자. 예를 들어, 기다려야 하는 시간이 30분이라는 걸 이미 알고 있을때, 기본값인 30초마다 조건을 확인할 필요는 없는 것처럼 말이다.
- ❓Sensor Deadlock
-
실행 중인 Task의 조건이 True가 될 때까지 다른 Task들이 대기하게 되므로, 모든 슬롯이 데드락 상태가 된다.
DAG
-
코드 구조
DAG는 파이썬 코드로 정의한다.
파이썬 코드의 구조와 순서
- | 구분 | 내용 |
- | — | — |
- | 라이브러리 임포트 | DAG와 워크플로우 구성에 필요한 라이브러리 선언 |
- | 공통 변수 정의 | DAG 구성에 사용하기 위해 공통으로 사용하는 변수 정의, 변경이 자주 발생하는 경우 Variables 기능 활용 |
- | DAG 공통 속성값 정의 | DAG를 정의하는데 필요한 공통 속성 값 정의 |
- | DAG 정의 | DAG를 선언하고 공통 속성값 전달 |
- | Task 정의 | DAG에 포함 될 각 작업(Task) 정의
- Operator, Sensor, Hook 등을 사용 | | Task 배열 | 각 Task들의 순서들을 나열 «, » 같은 Shift 연산자를 사용 set_upstream, set_downstream 함수도 사용 가능 |
DAG 정의
파이썬 코드에서 DAG는 전역 변수로 정의해야 한다.
from airflow import DAG dag = DAG(dag_id="dag_global")
아래와 같이 로컬 변수로 정의하면 AirFlow에서 인식하지 못한다.
from airflow import DAG def test_func(): dag = DAG(dag_id="dag_local")
DAG 공통 속성값의 정의
DAG를 정의할때 모든 Task에 공통적으로 적용할 값들은 default_args로 지정하여 사용할 수 있다.
파이썬의 Dictionary 타입이다.
args = { "owner": "airflow", "depends_on_past": True, "wait_for_downstream": False, "retries": 3, "retry_delay": timedelta(minutes=5), "provide_context": True, "on_failure_callback": sm.task_failure_callback, }
-
dependent_on_past
: 이전 날짜의 task 인스턴스 중에서 동일한 task 인스턴스가 실패한 경우 실행하지 않고 대기
-
wait_for_downstream
: 이전 날짜의 task 인스턴스 중 하나라도 실패한 경우에는 해당 DAG는 실행되지 않고 대기
- retries : 작업이 실패하기 전에 수행 할 수 있는 재시도 횟수
- retry_delay : 재시도 사이의 지연 시간
- provide_context : jinja templete 사용 여부
dag = DAG( dag_id = "process_user_demographic", default_args=args, catchup=False, tags=["mixpanel"], start_date=datetime(2023, 2, 15, tzinfo=kst), # schedule_interval='0 14 * * *', schedule_interval=None, is_paused_upon_creation=True, user_defined_filters={ "loadsJson": lambda s: json.loads(s), "dumpsJson": lambda s: json.dumps(s), }, )
-
catchup
: 이전 DAG가 오래 걸려서 다음 DAG 시작 시간을 초과한 경우
True면 이전 DAG 종료 이후 다음 DAG 실행
False면 이전 DAG 실행 중 다음 DAG실행
-
Task 정의
: Task는 Operator, Sensor, Hook 등을 사용할 수 있다.
- 지정한 작업을 수행하는 Operator를 대부분 사용한다.
- Sensor는 어떤 결과를 만족하는지 주기적으로 체크할때 사용한다.
- Hook은 DB나 서비스 같은 외부 시스템과 통신하기 위한 인터페이스를 제공하며 연결 상태를 유지한다.
-
Document of Task in airflow
정의할 때 전달 인자로 어느 DAG에 포함되는지 지정해야한다.
from airflow import DAG from airflow.operator.dummy_operator import DummyOperator from datetime import datatime dag = DAG(dag_id = "test_dag",start_date=datetime(2022,4,17)) operator = DummyOperator( task_id = "dummy_test", dag = dag)#위에 정의한 dag라는 DAG에 포함
-
Task 배열
정의한 Task들의 실행순서를 정의하는 과정이다.
Task 객체의 set_downstream(정방향), set_upstream(반대방향) 함수를 사용할 수 있다.
그러나 일반적으로 » 와 « 연산자를 사용하여 순서를 정의한다 (Airflow 1.8 이후 지원)
다음과 같은 단순한 작업은 task1 » task2 » task3 으로 정의하면 된다.
다음과 같이 분기 되는 경우 task1 » [task2, task3] » task4 로 정의한다.
만약, 아래와 같은 작업이 있는 경우 task1 » task2 » task4 « task3으로 정의할 수 있으나 가독성이 좋지 않아 권하지 않는다.
)
이런 경우 앞뒤에 DummyOperator를 추가해서 정방향으로 만드는 것이 가독성에 도움이 된다.
코드로 표현하면 다음과 같다.
start >> task3 >> task4 >> end start >> task1 >> task2 >>task4 >> end #아래와 같이 표현 가능하다. start >> [task3, [task1,task2]] >> task4 >> end
- Backfill and Catchup
-
Catchup
DAG이 오래 걸려서 다음 DAG 시작 시간보다 오래 걸릴 경우, 예정대로 다음 DAG를 실핼할지 결정하는 옵션으로 True면 이전 DAG가 종료 된 이후 다음 DAG를 실행한다. (기본 값)
False면 이전 DAG가 실행 중이더라도 예정대로 다음 DAG를 실행한다.
-
Backfill
Airflow의 start_date(lofical_date)에 관계 없이 지정한 과거 기간에 대한 DAG를 실행할 수 있다.
과거의 데이터를 생성해서 채워 넣거나, 과거 특정 시점의 데이터 생성을 요구 받은 경우 유용하다. 이러한 경우 Jinja Template의 ds를 통해 Airflow에서 실행되는 작업들의 날짜를 제어해야 한다.
예를 들어 SQL 쿼리에 CURRENT_DATE()를 사용하면 모든 과거 작업에 현재 날짜로 쿼릴 될 것이라서 의미가 없다.
❗WebUI에서는 지원하지 않고 CLI를 통해 사용할 수 잀다.
-
-
Xcom
Cross Communication의 약자로 airflow task간 데이터를 주고 받을 일이 있는데 이 부분을 해결하기 위해서 나왔다. Task에서 연산이나 필요한 데이터가 각각의 Task마다 변경이 될 때가 있는데 이런 경우에 데이터를 처리하기 위해 주로 사용된다. Airflow는 Task Instance간 데이터를 공유하지 않기 때문에 XCom을 이용하여 데이터를 주고 받아야한다. XCom은 DAG Run 내에서만 존재하고 다른 DAG Run과는 공유하지 않는다. Dataframe과 같은 많은 양의 데이터는 지원하지 않는다. PythonOperator를 사용하면 return값은 자동적으로 XCom 변수로 등록된다.
Airflow의 task간에 변수나 데이터 전달을 위해 통신하는 기능이다. 해당 기능을 통해 다른 Task에서 이전 Task의 값을 사용할 수 있다. Work가 한 개인 단독 환경에서는 DAG 코드 내부에서 공유가 가능하다. 그러나 Worker가 여러개인 Airflow의 분산 환경에서는 서로 다른 Worker에서 Task가 실행 될 수 있기 때문에 Xcom을 사용한다.
Variable과 비슷하지만 Xcom은 특정 DAG 내부에서만 공유되는 특징이 있다. 여러 DAG에서 공유해서 사용하려면 Variable을 사용해야한다.
❗Operator를 생성할 때 provide_context 옵션이 True로 되어 있어야한다.
PythonOperator에서는 return 값이 자동으로 Xcom에 push 된다.
Push와 Pull 사용
: Xcom에 값을 넣는 Push(xcom_push)와 가져오는 Pull(xcom_pull)을 사용한다.
PythonOperator가 아니거다 PythonOperator에서 리턴 값이 아닌 다른 값을 전달할 때 사용한다.
git
📚 참조
AirFlow의 DAG을 파이썬 패키지로 구성하기 (1)
[Airflow] Connection을 이용하여 외부 서비스와 연동하기
airflow.providers.http.sensors.http — apache-airflow-providers-http Documentation