问题描述
最近在调研Airflow demo相关的问题和解决方案, 主要问题有:
- Dags中任务启动时,参数如何传递
- Task任务之间的依赖关系,返回值如何被其他task使用
- 运行docker程序
- Http API请求实现
具体说明
Dags中任务启动时,参数如何传递
Airflow中可以使用Variables来定义变量来传递参数,该变量为全局变量
| 1 2 3 4 5 6
 | airflow variables --set keyName value    from airflow.models import Variable message = Variable.get('message')
 | 
Task任务之间的依赖关系,返回值如何被其他task使用
通过xcom来返回给后面的task任务使用任务的返回值,使用kwargs[‘task_instance’].xcom_pull(task_ids=’run_task’)来获取run_task任务的返回值
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
 |   from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import timedelta, datetime from airflow.models import Variable   default_args = {     'owner': 'airflow',     'description': 'Use of the Xcom',     'depend_on_past': False,     'start_date': datetime(2019, 6, 3),     'email_on_failure': False,     'email_on_retry': False,     'retries': 1,     'retry_delay': timedelta(minutes=30)                                                                  }   dag = DAG(     'xcom_demo',     default_args=default_args,     schedule_interval=None )     def run_this_func(**kwargs):     message = Variable.get('message')     print('message------>', message)     return message     run_task = PythonOperator(     task_id='run_task',     provide_context=True,     python_callable=run_this_func,     dag=dag )     def print_hello(**context):     before_data = context['task_instance'].xcom_pull(task_ids='run_task')     return before_data     hello_operator = PythonOperator(     task_id='hello_task',     python_callable=print_hello,     provide_context=True,     dag=dag, )     def three(**kwargs):     frist_data = kwargs['task_instance'].xcom_pull(task_ids='run_task')     two_data = kwargs['task_instance'].xcom_pull(task_ids='hello_task')     return frist_data, two_data     last_operator = PythonOperator(     task_id='last_task',     python_callable=three,     provide_context=True,     dag=dag, )   run_task >> hello_operator >> last_operator     if __name__ == "__main__":     dag.cli()
 | 
运行docker程序
执行器DockerOperator 完成docker运行
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
 |   from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta from airflow.operators.docker_operator import DockerOperator   default_args = {     'owner': 'airflow',     'description': 'Use of the DockerOperator',     'depend_on_past': False,     'start_date': datetime(2019, 6, 3),     'email_on_failure': False,     'email_on_retry': False,     'retries': 1,     'retry_delay': timedelta(minutes=30) }   image = 'docker.api:0.1.0' volumes = ['/home/user/data:/data'] run_commend = 'cd /data/ && ./run.sh' with DAG('docker_demo', default_args=default_args, schedule_interval=None, catchup=False) as dag:     t1 = BashOperator(         task_id='print_current_date',         bash_command='date'     )       t2 = DockerOperator(         task_id='dpt_docker',         image=image,         auto_remove=True,         command=run_commend,         force_pull=True,         volumes=volumes,              )       t3 = BashOperator(         task_id='print_hello',         bash_command='echo "hello world"'     )       t1 >> t2 >> t3   if __name__ == "__main__":     dag.cli()
 | 
Http API请求实现
执行器SimpleHttpOperator 完成http api请求
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
 | import json import os import time from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.trigger_rule import TriggerRule from datetime import datetime, timedelta from airflow.operators.http_operator import SimpleHttpOperator   from airflow.models import Variable   default_args = {     'owner': 'airflow',     'description': 'Use of the SimpleHttpOperator',     'depend_on_past': False,     'start_date': datetime(2019, 6, 3),     'email_on_failure': False,     'email_on_retry': False,     'retries': 1,     'retry_delay': timedelta(minutes=30) }   http_add = 'http://127.0.0.1:8888' api = '/person/' url = 'http://1027.0.0.1:8889' login_api = '/user/login/' get_task_api = '/task/' os.environ[     'AIRFLOW_CONN_HTTP_TEST'] = http_add   os.environ['AIRFLOW_CONN_TEST_HTTP'] = url   token = ''     def get_http_data(**context):     token_data = context['task_instance'].xcom_pull(task_ids='post_login')     token_dict = json.loads(token_data)     token = token_dict['data']['token']     Variable.set('token', token)     return token     def get_data(**context):     time.sleep(10)     return token     with DAG('http_api_demo',          default_args=default_args,          schedule_interval="5 * * * *",          catchup=False) as dag:     t1 = BashOperator(         task_id='print_current_date',         bash_command='date'     )       t2 = SimpleHttpOperator(         task_id='get_person',         http_conn_id='http_test',         method='GET',         headers={"Content-Type": "application/json"},         endpoint=api,         xcom_push=True,           response_check=lambda response: True if response.status_code == 200 else False,     )     t3 = SimpleHttpOperator(         task_id='post_login',         http_conn_id='test_http',         method='POST',         headers={             "X-Requested-With": 'XMLHttpRequest',             "Accept": "application/json",             "Content-Type": "application/json; charset=UTF-8"         },         endpoint=login_api,         xcom_push=True,           response_check=lambda response: True if response.status_code == 200 else False,         data=json.dumps({'username': 'admin', 'password': 'admin'}),     )          t4 = SimpleHttpOperator(         task_id='get_task',         http_conn_id='test_http',         method='GET',         headers={             "X-Requested-With": 'XMLHttpRequest',             "Accept": "application/json",             "Content-Type": "application/json; charset=UTF-8",             "Authorization": 'jwt {}'.format(Variable.get('token')),           },         endpoint=get_task_api,         xcom_push=True,           response_check=lambda response: True if response.status_code == 200 else False,         trigger_rule=TriggerRule.NONE_FAILED     )       t5 = PythonOperator(         task_id='data_task',         python_callable=get_http_data,         provide_context=True,              )     t6 = PythonOperator(         task_id='test_data',         python_callable=get_data,         provide_context=True     )     t7 = PythonOperator(         task_id='sleep_data',         python_callable=get_data,         provide_context=True,     )       t1 >> t2 >> t3 >> [t5, t6] >> t4 >> t7   if __name__ == "__main__":     dag.cli()
 |