Trong Apache Airflow, task là đơn vị xử lý nhỏ nhất trong một quy trình. Việc lên lịch chạy (scheduling) và thiết lập sự phụ thuộc giữa các task đóng vai trò quan trọng để đảm bảo quy trình được thực thi đúng logic, đúng thời gian, và đúng thứ tự.
DAG (Directed Acyclic Graph – đồ thị có hướng không chu trình) trong Airflow là tập hợp các task được sắp xếp theo thứ tự thực thi cụ thể. Lịch trình của DAG quyết định khi nào các task trong DAG sẽ được khởi chạy.
schedule_interval
Thuộc tính schedule_interval
là tham số then chốt để xác định khoảng thời gian giữa các lần chạy của DAG. Bạn có thể khai báo nó bằng nhiều cách khác nhau:
Cách khai báo | Ý nghĩa |
---|---|
'@once' | DAG chỉ chạy một lần duy nhất khi được kích hoạt. |
'@daily' | DAG chạy mỗi ngày một lần, vào 00:00 (nửa đêm). |
'@hourly' | DAG chạy mỗi giờ một lần. |
'@weekly' | DAG chạy vào 00:00 mỗi Chủ Nhật. |
'0 7 * * *' | Dạng biểu thức cron, DAG sẽ chạy 7:00 sáng mỗi ngày. |
Bạn có thể sử dụng bất kỳ biểu thức cron nào để linh hoạt hóa lịch trình chạy.
start_date
và end_date
start_date
: Chỉ định thời điểm bắt đầu đầu tiên mà DAG sẽ được lên lịch để chạy.end_date
: (tùy chọn) Chỉ định thời điểm mà DAG ngừng chạy. Nếu không khai báo end_date
, DAG sẽ tiếp tục chạy vô hạn theo lịch.Ví dụ:
dag = DAG(
'example_dag',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
end_date=datetime(2023, 12, 31),
catchup=False,
)
catchup
catchup=True
), nếu Airflow bị dừng hoặc DAG chưa được trigger, nó sẽ tự động "bắt kịp" bằng cách chạy tất cả các instance bị lỡ kể từ start_date
.catchup=False
Airflow cho phép bạn xác định quan hệ giữa các task bằng nhiều cách:
>>
và <<
task_1 >> task_2
: task_2 chỉ chạy sau khi task_1 hoàn tất.task_1 << task_2
: tương đương, chỉ là viết ngược lại.Ví dụ:
task_1 >> task_2 >> task_3
→ task_1
chạy trước, sau đó task_2
, rồi task_3
.
set_upstream()
và set_downstream()
Phương pháp này tương tự nhưng có thể rõ ràng hơn khi thiết lập từng quan hệ:
task_2.set_upstream(task_1) # task_1 → task_2
task_3.set_downstream(task_2) # task_2 → task_3
Hữu ích trong những trường hợp bạn cần xử lý task theo logic phức tạp hơn hoặc không muốn dùng toán tử
>>
.
Khi luồng công việc yêu cầu chọn một trong nhiều nhánh để chạy dựa trên điều kiện logic, bạn có thể sử dụng BranchPythonOperator
.
BranchPythonOperator
from airflow.operators.python import BranchPythonOperator
def choose_branch(**kwargs):
# Điều kiện để chọn nhánh
return 'task_2' if some_condition() else 'task_3'
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=choose_branch,
provide_context=True,
dag=dag
)
branch_task >> task_2
branch_task >> task_3
Trong ví dụ này:
choose_branch()
sẽ quyết định chọn task_2
hoặc task_3
.Quản lý lịch trình và điều khiển task là hai khía cạnh cốt lõi trong Airflow:
schedule_interval
, start_date
, catchup
giúp bạn kiểm soát khi nào DAG chạy.>>
, set_upstream()
và BranchPythonOperator
giúp điều khiển thứ tự và điều kiện chạy của các task.Sự linh hoạt này cho phép bạn xây dựng workflow phức tạp, chính xác và dễ bảo trì trong các hệ thống tự động hóa dữ liệu.