MLOps

Airflow 기초 사용법 및 DockerOperator

kimjy 2022. 8. 10. 22:29

사내에서 데이터파이프라인을 담당하시던 동료분이 에어플로우 세미나를 진행하신 적이 있었습니다. 그분은 워낙 내공이 있으신 분이기도 하고, 발표도 잘하는 분이셔서 에어플로우 파악에 많은 도움이 되었습니다. 그 때의 기억을 바탕으로 Airflow 기초 사용법을 간단히 정리하고, 제가 업무에 사용하였던 DockerOperator 사용 방법도 간략히 정리하도록 하겠습니다.

 

먼저 airflow 설치 방법은 아래와 같습니다.

pip install apache-airflow

airflow db init

airflow users create \
      --role Admin \
      --username admin \
      --email admin \
      --firstname admin \
      --lastname admin \
      --password admin

 그 후  airflow webserver와 scheduler를 띄우면 세팅은 끝납니다.

airflow webserver --port 8080

airflow scheduler

이제 에어플로우 태스크 설정을 위해서 DAG를 작성하면 될 것 같습니다.

 

실제 업무에서는요..

모델 서빙 시스템은 데이터 수집-저장-추론-추론 결과 저장으로 이루어지는 태스크들의 파이프라인 구조로 이루어져있어서, 어떻게 보면 간단한 태스크로 이루어져 있습니다. 딥러닝 모델의 특성 상 매우 많은 라이브러리(예를 들어 PyTorch, OpenCV)가 필요했고, 이를 위해서 코드를 도커 이미지로 생성하여 배포를 수행했습니다. 또 딥러닝 연산이 이루어지기 때문에 DockerOperator가 GPU를 지원하는 지를 확인해야 했습니다.

다행이도 DockerOperator가 GPU를 지원하고, 사용방법도 간단하게 되어있어서 큰 어려움이 없었던 것 같습니다.

Airflow 기본 패키지에서는 DockerOperator가 포함되어있지 않으므로, DockerOperator를 사용할 수 있도록 패키지를 설치합니다.

pip install apache-airflow-providers-docker

 그 후 DAG 파일을 작성합니다. DAG 파일은 기존 튜토리얼의 DAG파일과 거의 동일합니다. 다만 DockerOperator를 사용하는 것과, GPU 사용을 위해서 DockerOperator의 인자로 device_requests를 지정하는 것이 다릅니다. DAG파일은 아래와 같이 작성했습니다.

import docker

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.docker_operator import DockerOperator

with DAG(
    'nvidia_docker_test',
    default_args={
        'depends_on_past': False,
        'email': ['kimjy.par@gmail.com'],
        'email_on_failure': True,
        'email_on_retry': True,
        'retry': 3,
        'retry_delay': timedelta(minutes=5),
    },
    description='A simple nvidia gpu test',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021,8,10,22),
    catchup=False,
    tags=['nvidia'],
) as dag:
    t0 = BashOperator(
        task_id='bash_test',
        bash_command='echo hello bash operator'
    )

    t1 = DockerOperator(
        task_id='simple_docker_test',
        image='ubuntu:18.04',
        api_version='auto',
        auto_remove=True,
        command='echo hello docker operator',
        docker_url='unix://var/run/docker.sock',
        network_mode='bridge'
    )

    t2 = DockerOperator(
        task_id='nvidia_docker_test',
        image='ubuntu:18.04',
        api_version='auto',
        auto_remove=True,
        command='nvidia-smi',
        docker_url='unix://var/run/docker.sock',
        network_mode='bridge',
        device_requests=[
            docker.types.DeviceRequest(device_ids=["0"], capabilities=[['gpu']])
        ]
    )

    t0>>t1>>t2

계속 작성 중...

'MLOps' 카테고리의 다른 글

모델 배포 및 모니터링 전략  (0) 2022.08.08
NVIDIA Docker 설치  (0) 2022.08.08
MLFlow 원격 모델 저장소에서 pytorch 모델 로딩 시 에러  (0) 2022.07.20
MLFlow  (0) 2022.07.04