技術 約11分で読めます

PostgreSQLでApache Airflow 3.2.0を動かして最初のDAGを実行する

いけさん目次

DEV Communityに Apache Airflow 3.2.0をPostgreSQL込みで入れて最初のDAGを動かす記事 が出ていた。
内容はLinuxサーバー上でAirflowをpipインストールし、standalone と個別プロセス起動の両方を試し、airflow.cfg をPostgreSQL向けに変えてDAGを読み込ませる流れだ。

原典の手順はスクリーンショット多めで追いやすいが、公式ドキュメントと突き合わせると「まず動かす」手順と「PostgreSQLをメタデータDBとして使い続ける」手順が少し混ざっていたので、流れを整理しつつ3.2.0固有の差分を足した。

standaloneはSQLiteで試す入口

Airflow公式のインストールページでは、まず試すだけなら pipx run apache-airflow standaloneuvx apache-airflow standalone が案内されている。
これはSQLite付きの最小構成を自動で起動するモードで、ローカルの動作確認には速い。

pipx run apache-airflow standalone
# または
uvx apache-airflow standalone

ただし公式ドキュメントも、standalone は本番用途ではないと明記している。
PostgreSQLを使う理由は、DAGそのもののデータを入れるためではなく、DAG Run、Task Instance、接続情報、変数、スケジューラ状態などを保持するAirflowのメタデータDBをSQLiteから外すためだ。

flowchart TD
    A[DAGファイル<br/>Pythonコード] --> B[dag-processor<br/>DAGを解析]
    B --> C[metadata DB<br/>PostgreSQL]
    D[scheduler<br/>実行計画を作る] --> C
    D --> E[executor<br/>タスクを実行]
    F[api-server<br/>Web UIとAPI] --> C

この図でPostgreSQLに入るのは、業務データではなくAirflow自身の状態だ。
ETL先のPostgreSQLや分析DBをAirflowから操作する場合は、別途ConnectionやProviderを使う。

pipインストールはconstraintsを必ず合わせる

pipで入れる場合、Airflowはconstraintsファイル込みで入れる。
原典ではPython 3.12向けに以下の形で入れていた。

python -m venv airflow_venv
source airflow_venv/bin/activate

pip install --upgrade pip
pip install "apache-airflow[postgres]==3.2.0" \
  --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.2.0/constraints-3.12.txt"

原典では apache-airflow[celery] を使っているが、最初のDAGをローカルで動かすだけならCeleryExecutorまでは要らない。
PostgreSQLメタデータDBを使う話なら、まずは postgres extra を入れるほうが目的と依存関係が合う。
CeleryExecutorを使うなら、PostgreSQLとは別にRedisやRabbitMQなどのブローカー設計も入ってくる。

constraintsのPythonバージョンは手元のPythonに合わせる。
Python 3.11なら constraints-3.11.txt、Python 3.12なら constraints-3.12.txt だ。ここがずれると、インストールできても後でProviderの依存関係が崩れやすい。

PostgreSQL側はユーザーとDBを先に作る

Airflow公式のDB設定ページでは、PostgreSQL 13〜17がサポート対象として挙げられている。
例として、Airflow専用のDBとユーザーを作る。

CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;

\c airflow_db
GRANT ALL ON SCHEMA public TO airflow_user;

PostgreSQL 15以降では、public スキーマへの権限も忘れやすい。
airflow db migrate の途中で権限エラーになる場合、DB本体への権限だけでなくスキーマ権限も見る。

Airflow側は airflow.cfg を直接編集してもいいが、環境変数で渡すほうが再現しやすい。

export AIRFLOW_HOME=~/airflow
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql+psycopg2://airflow_user:airflow_pass@localhost:5432/airflow_db"
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export AIRFLOW__CORE__DAGS_FOLDER=~/workflows

現在値はコマンドで確認できる。

airflow config get-value database sql_alchemy_conn
airflow config get-value core dags_folder

DB接続先を変えたら、メタデータDBをマイグレーションする。

airflow db migrate

ここで「既にSQLiteで standalone を起動済み」の環境だと、~/airflow/airflow.cfg や既存DBファイルが残っていて話がややこしくなる。
PostgreSQL構成を試すなら、最初から AIRFLOW_HOME を分けるか、どの設定が読まれているかを airflow config get-value で確認するのが早い。

Airflow 3系はapi-serverとdag-processorを分けて見る

Airflow 2系の記事を読むと webserver 前提の説明がまだ多い。
Airflow 3系では、UI/API側は airflow api-server、DAG解析は airflow dag-processor として見る。

airflow users create \
  --username admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email admin@example.com

airflow api-server --port 8080
airflow scheduler
airflow dag-processor
airflow triggerer

原典でも触れられている通り、ユーザー作成コマンドを使うにはFlask AppBuilder系のAuth Managerを入れておく。
ModuleNotFoundError: No module named 'airflow.providers.fab' が出る場合は、FAB providerを入れる。

pip install apache-airflow-providers-fab
airflow db migrate

ただし、これは「Airflow 3系で従来型のユーザー管理を使う」ための話だ。
単にローカルでDAGを試すだけなら、standalone の自動ユーザーで済ませる選択もある。

そもそもDAGとは何か

DAGはDirected Acyclic Graphの略で、日本語にすると有向非巡回グラフだ。
グラフ理論の用語で、要素間に「向き」があり、かつ循環しない構造を指す。

Airflowでは、この構造をワークフローの定義に使っている。
ノードがタスク、エッジがタスク間の依存関係にあたる。
「タスクAが終わったらタスクBを実行する」という順序をグラフで表現していて、循環がないから実行順が一意に決まる。

flowchart LR
    A[CSVダウンロード] --> B[バリデーション]
    B --> C[DBロード]
    B --> D[レポート生成]
    C --> E[Slack通知]
    D --> E

この図だと、CSVダウンロードが終わってからバリデーションが走り、バリデーション後にDBロードとレポート生成が並列で動く。
両方終わったらSlack通知。
もしSlack通知からCSVダウンロードへの矢印を足したら循環ができてしまうので、Airflowはそれを受け付けない。

AirflowのDAGはPythonファイル1つがDAG1つに対応する。
dags_folder.py ファイルを置くと、dag-processor がそれを解析してメタデータDBに登録する。
DAGの中身はあくまでタスクの定義と順序だけで、実際のデータは持たない。

DAGに紐づく概念として、DAG RunとTask Instanceがある。
DAG Runは「あるDAGの1回分の実行」を表すオブジェクトで、スケジュール起動でも手動トリガーでも1つのDAG Runが作られる。
Task Instanceは「あるDAG Runの中の、あるタスクの1回分」だ。
上の図でいえば、1回のDAG Runで5つのTask Instanceが生まれる。
Web UIで見えるグリッドビューは、縦がタスク、横がDAG Runの時系列で、各セルがTask Instanceのステータスを示している。

DAGを定義するときに最低限触るのは dag_idstart_dateschedulecatchup あたりだ。

dag_id はAirflow内でDAGを一意に識別する文字列で、ファイル名とは無関係に自分で決める。
start_date はスケジューラがDAG Runを生成し始める起点の日時で、「この日から5分おきに実行する」のように schedule と組み合わせて使う。
schedule はDAG Runの間隔を指定する。timedelta(minutes=5) のような固定間隔のほかに、cron式やTimetableオブジェクトも受け付ける。
catchupstart_date から現在までの未実行分を遡って埋めるかどうかのフラグだ。デフォルトは True なので、start_date を半年前に設定して catchup=False にし忘れると、起動直後に大量のDAG Runが生成されて面食らう。

タスク間の依存関係はPythonの >> 演算子で書く。

task_a >> task_b >> task_c

これは「A → B → C」の直列実行になる。
並列にしたいときはリストを使う。

task_a >> [task_b, task_c] >> task_d

task_aの後にtask_bとtask_cが並列で走り、両方終わったらtask_dが動く。
set_upstream / set_downstream メソッドでも同じことはできるが、>> のほうが短い。

最初のDAGはairflow.sdkで書く

Airflow 3.2.0のリリースノートでは、DAG authoring向けの安定インターフェースとして airflow.sdk が前面に出ている。
古いサンプルでは from airflow import DAG がよく出てくるが、新しく書くならSDK側に寄せる。

~/workflows/simple_dag.py に置く例。

from datetime import datetime, timedelta

from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import DAG


def say_hello():
    print("Hello from Airflow")


def say_goodbye():
    print("Goodbye from Airflow")


with DAG(
    dag_id="simple_dag",
    start_date=datetime(2026, 1, 1),
    schedule=timedelta(minutes=5),
    catchup=False,
) as dag:
    hello_task = PythonOperator(
        task_id="hi",
        python_callable=say_hello,
    )

    goodbye_task = PythonOperator(
        task_id="bye",
        python_callable=say_goodbye,
    )

    hello_task >> goodbye_task

DAGが読み込まれているかはUIだけでなくCLIでも確認できる。

airflow dags list
airflow dags test simple_dag 2026-01-01

airflow dags list に出ない場合は、まず dags_folderdag-processor のログを見る。
Pythonのimport error、Provider不足、DAGファイルの置き場所違いのどれかで止まっていることが多い。

3.2.0で気にするのは手順より運用境界

Airflow 3.2.0自体の大きな追加は、Asset Partitioning、Multi-Team Deployments、Deadline Alerts、DAG処理可視性の改善、Grid viewの性能改善などだ。
最初のDAGを動かすだけなら、ここを全部理解する必要はない。

一方で、PostgreSQL構成にした時点で見る場所は増える。
公式DB設定ページでは、AirflowはメタデータDBへの接続を多く開くため、本番のPostgreSQL構成ではPGBouncerの利用が推奨されている。
RDS、Cloud SQL、Azure Database for PostgreSQLのようなマネージドPostgreSQLでは、アイドル接続が切られて SSL SYSCALL error: EOF detected になることもあるので、keepalive設定も確認対象になる。

見る場所理由
sql_alchemy_connAirflowが本当にPostgreSQLを見ているか確認する
dags_folderDAGファイルの置き場所と dag-processor の監視対象を合わせる
executorLocalExecutor、CeleryExecutor、KubernetesExecutorで周辺コンポーネントが変わる
load_examplesサンプルDAGを消して、自分のDAGだけ見える状態にする
DB接続数scheduler、api-server、dag-processor、triggererが同じメタデータDBを見る
認証と公開範囲8080番ポートをそのままインターネットに出さない

このブログでは以前、Daguのデフォルト認証なしAPIがRCEにつながる件を CISA KEV追加の重大脆弱性4件 で書いた。
AirflowとDaguは別製品だが、DAGやワークフロー定義を受け取る管理UI/APIを外に出す怖さは同じだ。
AirflowをVPSで動かすなら、まずリバースプロキシ、認証、ファイアウォール、非特権ユーザー実行を先に固める。

PostgreSQL側の性能やカーネル影響まで踏み込むなら、以前書いた Linuxカーネル7.0のPREEMPT_NONE廃止でPostgreSQLスループット半減 のような話も関係してくる。
AirflowのメタデータDBは業務DBほど巨大でなくても、DAG数、Task Instance数、履歴保持期間が増えると普通に効いてくる。

cronで足りなくなるとき

「毎日2時にCSVをダウンロードしてDBにロードする」程度の仕事なら、cronに1行書いてシェルスクリプトを叩けば済む。
Airflowを入れるまでもない。

DAGが効くのは、処理に分岐や合流が出てきたときだ。
CSVダウンロード → バリデーション → DBロード → レポート生成 → Slack通知、のように複数ステップを組んでいて、バリデーションで落ちたらDBロードは走らせたくない、レポート生成だけ失敗したらそこだけ再実行したい、というケースだ。

cronスクリプトでこれをやろうとすると、スクリプト内に ifexit が増えて、どこまで成功したかの判断がログ頼みになる。
途中で失敗した場合に「3ステップ目から再開」したければ、その仕組みを自前で書くことになる。

Airflowはこの部分をDAGの構造として持つ。
タスクごとに成功・失敗・リトライの状態がメタデータDBに記録されるので、失敗したタスクだけWeb UIからクリックして再実行できる。
cronスクリプトで起きがちな「全部最初からやり直す」が要らない。

もうひとつは並列実行だ。
前のセクションのMermaid図で、バリデーション後にDBロードとレポート生成が並列で動く例を出した。
cronスクリプトで並列化しようとすると、& でバックグラウンドに投げて wait する形になるが、片方だけ失敗したときのハンドリングがすぐ複雑になる。
DAGなら依存関係を >> で書くだけで、schedulerが自動で並列実行と合流を管理する。

バックフィルも地味に効く。
「先月分のデータを全部入れ直したい」とき、Airflowなら airflow dags backfill simple_dag --start-date 2026-04-01 --end-date 2026-04-30 で指定期間分のDAG Runをまとめて生成できる。
cronスクリプトだと日付パラメータを受けるように書いておいて、forループで回す自前の実装になる。

逆に、処理が直列1本で、失敗したら全部やり直しても困らない、バックフィルも不要、という仕事にAirflowを入れるのはオーバーキルだ。
scheduler、api-server、dag-processor、PostgreSQLと常時4プロセス以上走る構成は、cronの1行に比べて維持コストが高い。

参照