问题描述
最近在调研Airflow demo相关的问题和解决方案, 主要问题有:
- Dags中任务启动时,参数如何传递
- Task任务之间的依赖关系,返回值如何被其他task使用
- 运行docker程序
- Http API请求实现
具体说明
Dags中任务启动时,参数如何传递
Airflow中可以使用Variables来定义变量来传递参数,该变量为全局变量
1 2 3 4 5 6
| airflow variables --set keyName value from airflow.models import Variable message = Variable.get('message')
|
Task任务之间的依赖关系,返回值如何被其他task使用
通过xcom来返回给后面的task任务使用任务的返回值,使用kwargs[‘task_instance’].xcom_pull(task_ids=’run_task’)来获取run_task任务的返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import timedelta, datetime from airflow.models import Variable default_args = { 'owner': 'airflow', 'description': 'Use of the Xcom', 'depend_on_past': False, 'start_date': datetime(2019, 6, 3), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=30) } dag = DAG( 'xcom_demo', default_args=default_args, schedule_interval=None ) def run_this_func(**kwargs): message = Variable.get('message') print('message------>', message) return message run_task = PythonOperator( task_id='run_task', provide_context=True, python_callable=run_this_func, dag=dag ) def print_hello(**context): before_data = context['task_instance'].xcom_pull(task_ids='run_task') return before_data hello_operator = PythonOperator( task_id='hello_task', python_callable=print_hello, provide_context=True, dag=dag, ) def three(**kwargs): frist_data = kwargs['task_instance'].xcom_pull(task_ids='run_task') two_data = kwargs['task_instance'].xcom_pull(task_ids='hello_task') return frist_data, two_data last_operator = PythonOperator( task_id='last_task', python_callable=three, provide_context=True, dag=dag, ) run_task >> hello_operator >> last_operator if __name__ == "__main__": dag.cli()
|
运行docker程序
执行器DockerOperator 完成docker运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta from airflow.operators.docker_operator import DockerOperator default_args = { 'owner': 'airflow', 'description': 'Use of the DockerOperator', 'depend_on_past': False, 'start_date': datetime(2019, 6, 3), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=30) } image = 'docker.api:0.1.0' volumes = ['/home/user/data:/data'] run_commend = 'cd /data/ && ./run.sh' with DAG('docker_demo', default_args=default_args, schedule_interval=None, catchup=False) as dag: t1 = BashOperator( task_id='print_current_date', bash_command='date' ) t2 = DockerOperator( task_id='dpt_docker', image=image, auto_remove=True, command=run_commend, force_pull=True, volumes=volumes, ) t3 = BashOperator( task_id='print_hello', bash_command='echo "hello world"' ) t1 >> t2 >> t3 if __name__ == "__main__": dag.cli()
|
Http API请求实现
执行器SimpleHttpOperator 完成http api请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
| import json import os import time from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.trigger_rule import TriggerRule from datetime import datetime, timedelta from airflow.operators.http_operator import SimpleHttpOperator from airflow.models import Variable default_args = { 'owner': 'airflow', 'description': 'Use of the SimpleHttpOperator', 'depend_on_past': False, 'start_date': datetime(2019, 6, 3), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=30) } http_add = 'http://127.0.0.1:8888' api = '/person/' url = 'http://1027.0.0.1:8889' login_api = '/user/login/' get_task_api = '/task/' os.environ[ 'AIRFLOW_CONN_HTTP_TEST'] = http_add os.environ['AIRFLOW_CONN_TEST_HTTP'] = url token = '' def get_http_data(**context): token_data = context['task_instance'].xcom_pull(task_ids='post_login') token_dict = json.loads(token_data) token = token_dict['data']['token'] Variable.set('token', token) return token def get_data(**context): time.sleep(10) return token with DAG('http_api_demo', default_args=default_args, schedule_interval="5 * * * *", catchup=False) as dag: t1 = BashOperator( task_id='print_current_date', bash_command='date' ) t2 = SimpleHttpOperator( task_id='get_person', http_conn_id='http_test', method='GET', headers={"Content-Type": "application/json"}, endpoint=api, xcom_push=True, response_check=lambda response: True if response.status_code == 200 else False, ) t3 = SimpleHttpOperator( task_id='post_login', http_conn_id='test_http', method='POST', headers={ "X-Requested-With": 'XMLHttpRequest', "Accept": "application/json", "Content-Type": "application/json; charset=UTF-8" }, endpoint=login_api, xcom_push=True, response_check=lambda response: True if response.status_code == 200 else False, data=json.dumps({'username': 'admin', 'password': 'admin'}), ) t4 = SimpleHttpOperator( task_id='get_task', http_conn_id='test_http', method='GET', headers={ "X-Requested-With": 'XMLHttpRequest', "Accept": "application/json", "Content-Type": "application/json; charset=UTF-8", "Authorization": 'jwt {}'.format(Variable.get('token')), }, endpoint=get_task_api, xcom_push=True, response_check=lambda response: True if response.status_code == 200 else False, trigger_rule=TriggerRule.NONE_FAILED ) t5 = PythonOperator( task_id='data_task', python_callable=get_http_data, provide_context=True, ) t6 = PythonOperator( task_id='test_data', python_callable=get_data, provide_context=True ) t7 = PythonOperator( task_id='sleep_data', python_callable=get_data, provide_context=True, ) t1 >> t2 >> t3 >> [t5, t6] >> t4 >> t7 if __name__ == "__main__": dag.cli()
|