Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.3k views
in Technique[技术] by (71.8m points)

airflow - adhoc运行的airflow命令行参数?(airflow command line parameter for adhoc run?)

We have just started including airflow for scheduling.

(我们刚刚开始包括气流以进行调度。)

One of my scripts runs daily.

(我的脚本之一每天运行。)

It uses the template parameter ({{ ds_nodash }}) to get the dates.

(它使用模板参数({{ds_nodash}})获取日期。)

But I have to rerun for one day load (Past dated), how can I provide input parameter.

(但是我必须重新运行一天的负载(过去的日期),如何提供输入参数。)

Input parameter will override the ds_nodash.

(输入参数将覆盖ds_nodash。)

I have :
trade_acx_ld_cmd = "/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh trade_acx.csv l1_gcb_trxn trade {{ ds_nodash }} " 

Would like to run for 
trade_acx_ld_cmd = "/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh trade_acx.csv l1_gcb_trxn trade **20190601** " 

Code snippet Below:

(下面的代码段:)

import os
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 19),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('create-data-set-job', default_args=default_args)
projct_dr='/home/airflow/projects/'

trade_acx_ld="/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh" 
trade_acx_ld_cmd = "/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh trade_acx.csv l1_gcb_trxn trade {{ ds_nodash }} " 


t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

if os.path.exists(trade_acx_ld):
   t2 = BashOperator(
        task_id= 'Dataset_create',
        bash_command=trade_acx_ld_cmd,
        dag=dag
   )
else:
    raise Exception("Cannot locate {0}".format(trade_acx_ld_cmd))

t2.set_upstream(t1)
  ask by user3858193 translate from so

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You can just manually trigger the DAG using airflow trigger_dag .

(您可以使用airflow trigger_dag手动触发DAG。)

The {{ ds_nodash }} will take execution_date so if you trigger the DAG with an old execution date, {{ ds_nodash }} will use that older execution_date instead of today's date.

({{ ds_nodash }}将采用execution_date日期,因此,如果您使用旧的执行日期来触发DAG,则{{ ds_nodash }}将使用较旧的execution_date日期而不是今天的日期。)

You can pass the execution_date to trigger_dag command as follows.

(您可以按以下方式将execution_date传递给trigger_dag命令。)

airflow trigger_dag gcs-file-load-job -e "2019-01-01"

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...