🎯 Quản lý lịch trình và điều khiển task trong Airflow

Trong Apache Airflow, task là đơn vị xử lý nhỏ nhất trong một quy trình. Việc lên lịch chạy (scheduling)thiết lập sự phụ thuộc giữa các task đóng vai trò quan trọng để đảm bảo quy trình được thực thi đúng logic, đúng thời gian, và đúng thứ tự.


📅 Lên lịch chạy DAG (DAG Scheduling)

✅ DAG là gì?

DAG (Directed Acyclic Graph – đồ thị có hướng không chu trình) trong Airflow là tập hợp các task được sắp xếp theo thứ tự thực thi cụ thể. Lịch trình của DAG quyết định khi nào các task trong DAG sẽ được khởi chạy.


⚙️ Cấu hình schedule_interval

Thuộc tính schedule_interval là tham số then chốt để xác định khoảng thời gian giữa các lần chạy của DAG. Bạn có thể khai báo nó bằng nhiều cách khác nhau:

Cách khai báoÝ nghĩa
'@once'DAG chỉ chạy một lần duy nhất khi được kích hoạt.
'@daily'DAG chạy mỗi ngày một lần, vào 00:00 (nửa đêm).
'@hourly'DAG chạy mỗi giờ một lần.
'@weekly'DAG chạy vào 00:00 mỗi Chủ Nhật.
'0 7 * * *'Dạng biểu thức cron, DAG sẽ chạy 7:00 sáng mỗi ngày.

Bạn có thể sử dụng bất kỳ biểu thức cron nào để linh hoạt hóa lịch trình chạy.


🕓 start_dateend_date

  • start_date: Chỉ định thời điểm bắt đầu đầu tiên mà DAG sẽ được lên lịch để chạy.
  • end_date: (tùy chọn) Chỉ định thời điểm mà DAG ngừng chạy. Nếu không khai báo end_date, DAG sẽ tiếp tục chạy vô hạn theo lịch.

Ví dụ:

dag = DAG(
    'example_dag',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    end_date=datetime(2023, 12, 31),
    catchup=False,
)

⏪ Thuộc tính catchup

  • Mặc định (catchup=True), nếu Airflow bị dừng hoặc DAG chưa được trigger, nó sẽ tự động "bắt kịp" bằng cách chạy tất cả các instance bị lỡ kể từ start_date.
  • Nếu không muốn hành vi này (rất hữu ích cho DAG chỉ cần chạy hiện tại), bạn nên tắt:
catchup=False

🔁 Điều khiển sự phụ thuộc giữa các task (Task Dependencies)

Airflow cho phép bạn xác định quan hệ giữa các task bằng nhiều cách:


✅ Cách 1: Sử dụng toán tử >><<

  • task_1 >> task_2: task_2 chỉ chạy sau khi task_1 hoàn tất.
  • task_1 << task_2: tương đương, chỉ là viết ngược lại.

Ví dụ:

task_1 >> task_2 >> task_3

task_1 chạy trước, sau đó task_2, rồi task_3.


✅ Cách 2: Dùng set_upstream()set_downstream()

Phương pháp này tương tự nhưng có thể rõ ràng hơn khi thiết lập từng quan hệ:

task_2.set_upstream(task_1)     # task_1 → task_2
task_3.set_downstream(task_2)   # task_2 → task_3

Hữu ích trong những trường hợp bạn cần xử lý task theo logic phức tạp hơn hoặc không muốn dùng toán tử >>.


🌿 Điều khiển luồng bằng nhánh rẽ – Branching

Khi luồng công việc yêu cầu chọn một trong nhiều nhánh để chạy dựa trên điều kiện logic, bạn có thể sử dụng BranchPythonOperator.


✅ Ví dụ phân nhánh với BranchPythonOperator

from airflow.operators.python import BranchPythonOperator

def choose_branch(**kwargs):
    # Điều kiện để chọn nhánh
    return 'task_2' if some_condition() else 'task_3'

branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=choose_branch,
    provide_context=True,
    dag=dag
)

branch_task >> task_2
branch_task >> task_3

Trong ví dụ này:

  • choose_branch() sẽ quyết định chọn task_2 hoặc task_3.
  • Chỉ một nhánh sẽ được thực thi, nhánh còn lại sẽ bị bỏ qua (skipped).

💡 Kết luận

Quản lý lịch trình và điều khiển task là hai khía cạnh cốt lõi trong Airflow:

  • schedule_interval, start_date, catchup giúp bạn kiểm soát khi nào DAG chạy.
  • Các toán tử như >>, set_upstream()BranchPythonOperator giúp điều khiển thứ tự và điều kiện chạy của các task.

Sự linh hoạt này cho phép bạn xây dựng workflow phức tạp, chính xác và dễ bảo trì trong các hệ thống tự động hóa dữ liệu.