Stack Overflow Asked by alltej on November 24, 2021
I have a manually triggered dag. It takes a parameters like:
{"id_list":"3,5,1"}
In the DAG, I create the operators dynamically based on this list of integers:
for id in id_list:
task = create_task(id)
I need to initialize the id_list
based on the parameter values of id_list
.
How can I initialize that list since I cannot reference that parameter directly when not in a templated field? This is how I want to see it in the Graph View where the process tasks are based on the id_list
params.
I have seen examples of dynamically created tasks but they are not really dynamic in the sense that the list values are hard-coded. The tasks are created dynamically based on the list of hard-code values, if that makes sense.
First, create a fixed number of tasks to execute. This example is using PythonOperator. In the python_callable
, if the index
is less than the length of the param_list
then execute else raise AirflowSkipException
def execute(index, account_ids):
param_list = account_ids.split(',')
if index < len(param_list):
print(f"execute task index {index}")
else:
raise AirflowSkipException
def create_task(task_id, index):
return PythonOperator(task_id=task_id,
python_callable=execute,
op_kwargs={
"index": index,
"account_ids": "{{ dag_run.conf['account_ids'] }}"}
)
record_size_limit = 5
ACCOUNT_LIST = [None] * record_size_limit
for idx in range(record_size_limit):
task = create_task(f"task_{idx}", idx)
task
Trigger DAG and pass this as parameters:
Graph View:
Answered by alltej on November 24, 2021
A DAG
and its tasks must be resolved prior to being available for use; this includes the webserver, scheduler, everywhere. The webserver is actually a perfect example why: how would you render the process to the user?
The only dynamic components of a process are the parameters that are available during template rendering. In most cases I've seen people use a PythonOperator
to loop over the input and perform some action N
times to solve the same issue.
Answered by joebeeson on November 24, 2021
Get help from others!
Recent Answers
Recent Questions
© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP