Trong Airflow, quản lý và điều khiển task là một phần cốt lõi của việc xây dựng và vận hành workflow. Lịch trình task giúp xác định thời điểm các tác vụ sẽ chạy, trong khi điều khiển task giúp bạn quản lý sự phụ thuộc giữa các tác vụ, bảo đảm chúng chạy theo thứ tự hợp lý. Phần này sẽ giải thích chi tiết về cách cấu hình lịch trình cho DAG và cách thiết lập các quy tắc điều khiển sự phụ thuộc giữa các task.
Lịch Trình Chạy DAG Trong Airflow
Một DAG (Directed Acyclic Graph) trong Airflow là tập hợp các task được định nghĩa để thực hiện một quy trình. Lịch trình của DAG giúp xác định thời gian mà DAG sẽ được kích hoạt.
Cấu Hình Schedule Interval
schedule_interval
là tham số quan trọng giúp xác định khoảng thời gian chạy của DAG. Nó có thể được định nghĩa bằng các biểu thức cron, chuỗi mô tả hoặc các định nghĩa thời gian cố định. Ví dụ:
@daily
: Chạy DAG mỗi ngày.
@hourly
: Chạy DAG mỗi giờ.
@once
: Chạy DAG một lần duy nhất khi được kích hoạt.
- Cron expression: Bạn có thể định nghĩa lịch trình chi tiết bằng cách sử dụng biểu thức cron. Ví dụ, để chạy DAG mỗi ngày lúc 7 giờ sáng:
from airflow import DAG
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG(
dag_id='daily_dag_example',
default_args=default_args,
schedule_interval='0 7 * * *', # Chạy vào 7 giờ sáng mỗi ngày
) as dag:
# Các task sẽ được định nghĩa ở đây
pass
start_date và end_date
start_date
là thời gian mà DAG sẽ bắt đầu được lên lịch. Airflow sử dụng thông số này để xác định thời gian đầu tiên DAG được kích hoạt, và nó cần phải là thời điểm trong quá khứ hoặc hiện tại. Ví dụ:
start_date=datetime(2023, 1, 1)
DAG sẽ bắt đầu chạy vào ngày 1 tháng 1 năm 2023.
Bạn cũng có thể chỉ định end_date
để giới hạn thời gian mà DAG sẽ ngừng chạy. Nếu không định nghĩa end_date
, DAG sẽ tiếp tục chạy theo lịch trình định sẵn.
catchup
Mặc định, Airflow sẽ tự động “bắt kịp” các lần chạy DAG bị bỏ lỡ từ thời điểm start_date
. Nếu bạn không muốn chạy các lần bị lỡ, bạn có thể tắt tính năng này bằng cách thiết lập catchup=False
:
with DAG(
dag_id='my_dag',
default_args=default_args,
schedule_interval='@daily',
catchup=False, # Không bắt kịp các lần chạy bị lỡ
) as dag:
# Các task sẽ được định nghĩa ở đây
pass
Điều Khiển Sự Phụ Thuộc Giữa Các Task
Khi tạo DAG, bạn có thể thiết lập sự phụ thuộc giữa các task để đảm bảo chúng chạy theo thứ tự mong muốn. Điều này có thể được thực hiện thông qua việc sử dụng toán tử (>>
hoặc <<
) hoặc các phương thức như set_upstream()
và set_downstream()
.
Sử Dụng Toán Tử >> và <<
Toán tử >>
giúp bạn thiết lập task này phải chạy trước task khác, trong khi <<
thiết lập task phải chạy sau. Ví dụ:
from airflow.operators.dummy import DummyOperator
task_1 = DummyOperator(task_id='task_1', dag=dag)
task_2 = DummyOperator(task_id='task_2', dag=dag)
task_3 = DummyOperator(task_id='task_3', dag=dag)
task_1 >> task_2 # task_1 chạy trước task_2
task_2 >> task_3 # task_2 chạy trước task_3
Kết quả là task_1
sẽ chạy trước, tiếp theo là task_2
, và cuối cùng là task_3
.
Sử Dụng Phương Thức set_upstream và set_downstream
Bạn cũng có thể thiết lập phụ thuộc bằng cách sử dụng các phương thức set_upstream()
và set_downstream()
. Ví dụ:
task_2.set_upstream(task_1) # task_1 chạy trước task_2
task_2.set_downstream(task_3) # task_2 chạy trước task_3
Hai phương pháp này tương đương với việc sử dụng toán tử >>
và <<
.
Điều Khiển Phụ Thuộc Linh Hoạt Với Trigger Rules
Trigger rules xác định khi nào một task nên chạy dựa trên trạng thái của các task trước đó. Mặc định, một task sẽ chỉ chạy khi tất cả các task trước đó đã thành công (TriggerRule.ALL_SUCCESS). Tuy nhiên, bạn có thể thay đổi quy tắc này để kiểm soát linh hoạt hơn.
Một số trigger rules phổ biến:
TriggerRule.ALL_SUCCESS
: Chạy khi tất cả các task trước đó thành công (mặc định).
TriggerRule.ALL_FAILED
: Chạy khi tất cả các task trước đó thất bại.
TriggerRule.ONE_SUCCESS
: Chạy khi ít nhất một task trước đó thành công.
TriggerRule.ONE_FAILED
: Chạy khi ít nhất một task trước đó thất bại.
TriggerRule.ALL_DONE
: Chạy bất kể task trước đó thành công hay thất bại.
Ví dụ, sử dụng TriggerRule.ALL_DONE
:
from airflow.utils.trigger_rule import TriggerRule
task_3 = DummyOperator(
task_id='task_3',
dag=dag,
trigger_rule=TriggerRule.ALL_DONE, # task_3 sẽ chạy dù task_1 hay task_2 có thành công hay không
)
Sử Dụng Branching Để Điều Khiển Luồng Task
Airflow cung cấp BranchPythonOperator
để cho phép bạn chia nhánh trong DAG, dựa trên các điều kiện. Điều này rất hữu ích khi bạn muốn chọn một nhánh để chạy dựa trên kết quả của các task trước đó.
Ví dụ, bạn có thể sử dụng BranchPythonOperator
để chọn một trong hai nhánh:
from airflow.operators.python import BranchPythonOperator
def choose_branch(**kwargs):
# Lựa chọn nhánh dựa trên điều kiện
return 'task_2' if some_condition else 'task_3'
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch,
provide_context=True,
dag=dag,
)
branch_task >> task_2
branch_task >> task_3
Trong ví dụ này, DAG sẽ chạy task_2
hoặc task_3
dựa trên điều kiện của hàm choose_branch
.
Lịch trình và điều khiển task trong Airflow là những công cụ mạnh mẽ giúp bạn quản lý luồng công việc phức tạp. Sự linh hoạt trong việc thiết lập lịch trình và điều khiển task cho phép bạn xây dựng các workflow hiệu quả và dễ dàng điều chỉnh khi cần.