도커 컨테이너 환경에서 Airflow 를 구축하고, 진행하기를 원한다면 이전 포스팅을 참고하도록 하자.
2022.09.21 - [Cloud Computing/GCP] - GCP + Docker + Airflow 를 이용한 데이터 파이프라인 구축 1으
우선 필요 패키지 및 실행파일(.py) 를 무작정 설치, 작성해보자. 코드 설명은 다음 포스팅에서 상세하게 다룰 예정이다. (컨테이너 내부라면, docker exec 명령으로 컨테이너 안으로 접속)
# shasum 실행을 위한 패키지 설치
apt-get install libdigest-sha-perl
이제 , dags 폴더로 이동해서, .py 파일 2개를 만들 것이다.
동일하게 작성해서 실행을 확인하고 사용자에 맞게 바꾸면 된다.
# python script1 작성 (preprocessing.py 파일)
import pandas as pd
import re
def pre_process1():
data=pd.read_csv("../data/ip_files/iris.csv")
data['Species'] = data['Species'].str.replace('setosa',' ')
data.to_csv("../data/ip_files/iris_1.csv")
def pre_process2() :
data=pd.read_csv("../data/ip_files/iris_1.csv")
data['Petal.Width'] = data['Petal.Width'] *100
data.to_csv("../data/op_files/iris_2.csv", index=False)
# python script2 작성 (dags_test.py 파일)
# -*- config: utf-8 -*-
"""
Created on 22.10.17
@author : dtstory
"""
from airflow import DAG
from datetime import datetime,timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from pre_processing import pre_process1
from pre_processing import pre_process2
default_args = {"owner":"admin", "start_date":datetime(2022,10,16)}
with DAG(dag_id="workflow_test", default_args=default_args, schedule_interval="@daily") as dag :
check_file = BashOperator(
task_id="check_file",
bash_command="shasum ../data/ip_files/iris.csv",
retries = 2,
retry_delay=timedelta(seconds=15))
proc1 = PythonOperator(
task_id = "pre_process1",
python_callable = pre_process1
)
proc2 = PythonOperator(
task_id = "pre_process2",
python_callable = pre_process2
)
check_file >> proc1 >> proc2
자 이제, 파이썬 스크립트 작성은 모두 끝났다. 이제, 에어플로우 웹 url로 접속해보자.
# url 접속
[호스트ip]:[포트]
예시) 123.123.123.123:8080
접속하면 workflow_test가 생성된 것을 확인할 수 있다.
# 실행버튼 클릭
# Trigger 버튼 클릭
# 프로젝트명 클릭 (실행을 했기 때문에 회색 토글이 푸른색 토글로 변경되었을 것이다.)
# Grapth View 클릭
# workflow 흐름1
check_file 작업은 수행이 완료되었고, pre_process1 작업이 계획되어 있는 상태
# workflow 흐름2
모든 작업이 수행이 성공적으로 완료되었다. 만약 스크립트상 코드에러가 발생한 경우, 빨간색(failed) 작업으로 표시된다.
그런 경우, 해당 work(작업) 을 누르면 아래와 같은 그림이 나오는데 log를 눌러 에러 로그를 확인할 수 있다.
이상, 무작정 따라해서 에어플로우로 작업(데이터전처리) 파이프라인을 만들어 보았다. 다음 포스팅에선, 해당 python으로 작성된 코드들에 대한 상세한 설명을 다뤄보고자 한다.
'Data Engineering(Pipeline, Hadoop...) > Airflow' 카테고리의 다른 글
[Airflow] 워크플로우 - R Script 실행 (0) | 2023.01.02 |
---|---|
[Airflow] Schedule Timezone을 한국시간(KST) 으로 설정하기 (0) | 2022.12.09 |
[Airflow] Dags 관리(예제 Dags 삭제 및 Dags 경로 설정) (0) | 2022.11.24 |
[Airflow] Devops 환경 구축 (R+Python+Airflow+ssh) (0) | 2022.11.23 |