|
| 1 | + |
| 2 | +from datetime import datetime, timedelta |
| 3 | + |
| 4 | +from airflow import DAG |
| 5 | + |
| 6 | +from airflow.operators.bash import BashOperator |
| 7 | +from airflow.operators.dummy import DummyOperator |
| 8 | +from airflow.operators.python import BranchPythonOperator |
| 9 | +from airflow.operators.python_operator import PythonOperator |
| 10 | +from airflow.utils.trigger_rule import TriggerRule |
| 11 | + |
| 12 | + |
| 13 | +import sys, os |
| 14 | +sys.path.append(os.getcwd()) |
| 15 | + |
| 16 | +from MLproject.titanic import * |
| 17 | +from utils.slack_alert import SlackAlert |
| 18 | + |
| 19 | +titanic = TitanicMain() |
| 20 | +slack = SlackAlert("#your-channel", "your-token") |
| 21 | + |
| 22 | +def print_result(**kwargs): |
| 23 | + r = kwargs["task_instance"].xcom_pull(key='result_msg') |
| 24 | + print("message : ", r) |
| 25 | + |
| 26 | +default_args = { |
| 27 | + 'owner': 'owner-name', |
| 28 | + 'depends_on_past': False, |
| 29 | + 'email': ['your-email@g.com], |
| 30 | + 'email_on_failure': False, |
| 31 | + 'email_on_retry': False, |
| 32 | + 'retries': 1, |
| 33 | + 'retry_delay': timedelta(minutes=30), |
| 34 | +} |
| 35 | + |
| 36 | +dag_args = dict( |
| 37 | + dag_id="tutorial-slack-ml-op", |
| 38 | + default_args=default_args, |
| 39 | + description='tutorial DAG ml with slack', |
| 40 | + schedule_interval=timedelta(minutes=50), |
| 41 | + start_date=datetime(2022, 2, 1), |
| 42 | + tags=['example-sj'], |
| 43 | + on_success_callback=slack.success_msg, |
| 44 | + on_failure_callback=slack.fail_msg |
| 45 | +) |
| 46 | + |
| 47 | +with DAG( **dag_args ) as dag: |
| 48 | + start = BashOperator( |
| 49 | + task_id='start', |
| 50 | + bash_command='echo "start!"', |
| 51 | + ) |
| 52 | + |
| 53 | + prepro_task = PythonOperator( |
| 54 | + task_id='preprocessing', |
| 55 | + python_callable=titanic.prepro_data, |
| 56 | + op_kwargs={'f_name': "train"} |
| 57 | + ) |
| 58 | + |
| 59 | + modeling_task = PythonOperator( |
| 60 | + task_id='modeling', |
| 61 | + python_callable=titanic.run_modeling, |
| 62 | + op_kwargs={'n_estimator': 100, 'flag' : True} |
| 63 | + ) |
| 64 | + |
| 65 | + msg = PythonOperator( |
| 66 | + task_id='msg', |
| 67 | + python_callable=print_result |
| 68 | + ) |
| 69 | + |
| 70 | + complete = BashOperator( |
| 71 | + task_id='complete_bash', |
| 72 | + bash_command='echo "complete~!"', |
| 73 | + ) |
| 74 | + |
| 75 | + start >> prepro_task >> modeling_task >> msg >> complete |
0 commit comments