Data Engineering(Pipeline, Hadoop...)/Airflow

Airflow + Python 스크립트 활용 DAG 구성

dtstory 2022. 10. 21. 20:00

도커 컨테이너 환경에서 Airflow 를 구축하고, 진행하기를 원한다면 이전 포스팅을 참고하도록 하자.

2022.09.21 - [Cloud Computing/GCP] - GCP + Docker + Airflow 를 이용한 데이터 파이프라인 구축 1

 

GCP + Docker + Airflow 를 이용한 데이터 파이프라인 구축 1

# [컨테이너 이름] 을 지정한 후, linux 커맨드에서 아래 명령어를 실행 -v 명령을 이용해, 폴더 마운트 (복수 허용) docker run -it -p 8080:8080 \ -v /home/airflow/dags:/opt/airflow/dags \ -v /home/airflow..

dtstory.tistory.com

 

우선 필요 패키지 및 실행파일(.py) 를 무작정 설치, 작성해보자. 코드 설명은 다음 포스팅에서 상세하게 다룰 예정이다.  (컨테이너 내부라면, docker exec 명령으로 컨테이너 안으로 접속)

# shasum 실행을 위한 패키지 설치

apt-get install libdigest-sha-perl



이제 , dags 폴더로 이동해서, .py 파일 2개를 만들 것이다. 
동일하게 작성해서 실행을 확인하고 사용자에 맞게 바꾸면 된다.

# python script1 작성 (preprocessing.py 파일)

import pandas as pd
import re

def pre_process1():
    data=pd.read_csv("../data/ip_files/iris.csv")
    data['Species'] = data['Species'].str.replace('setosa',' ')
    data.to_csv("../data/ip_files/iris_1.csv")
    
def pre_process2() :
    data=pd.read_csv("../data/ip_files/iris_1.csv")
    data['Petal.Width'] = data['Petal.Width'] *100
    data.to_csv("../data/op_files/iris_2.csv", index=False)

 


# python script2 작성 (dags_test.py 파일)

# -*- config: utf-8 -*-

"""
Created on 22.10.17 
@author : dtstory
"""

from airflow import DAG
from datetime import datetime,timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator

from pre_processing import pre_process1
from pre_processing import pre_process2


default_args = {"owner":"admin", "start_date":datetime(2022,10,16)}
with DAG(dag_id="workflow_test", default_args=default_args, schedule_interval="@daily") as dag :
    check_file = BashOperator(
        task_id="check_file",
        bash_command="shasum ../data/ip_files/iris.csv",
        retries = 2,
        retry_delay=timedelta(seconds=15))
    
    proc1 = PythonOperator(
        task_id = "pre_process1",
        python_callable = pre_process1
        )
    
    proc2 = PythonOperator(
        task_id = "pre_process2",
        python_callable = pre_process2
        )

    check_file >> proc1 >> proc2

 

자 이제, 파이썬 스크립트 작성은 모두 끝났다. 이제, 에어플로우 웹 url로 접속해보자.

 

# url 접속

[호스트ip]:[포트] 
예시) 123.123.123.123:8080

접속하면 workflow_test가 생성된 것을 확인할 수 있다.

 

 

# 실행버튼 클릭

# Trigger 버튼 클릭

 

# 프로젝트명 클릭 (실행을 했기 때문에 회색 토글이 푸른색 토글로 변경되었을 것이다.)

 

# Grapth View 클릭

 

# workflow 흐름1

check_file 작업은 수행이 완료되었고, pre_process1 작업이 계획되어 있는 상태

 

 

# workflow 흐름2

모든 작업이 수행이 성공적으로 완료되었다. 만약 스크립트상 코드에러가 발생한 경우, 빨간색(failed) 작업으로 표시된다.

그런 경우, 해당 work(작업) 을 누르면 아래와 같은 그림이 나오는데 log를 눌러 에러 로그를 확인할 수 있다.

 

이상, 무작정 따라해서 에어플로우로 작업(데이터전처리) 파이프라인을 만들어 보았다. 다음 포스팅에선, 해당 python으로 작성된 코드들에 대한 상세한 설명을 다뤄보고자 한다. 

728x90