PostgreSQLでApache Airflow 3.2.0を動かして最初のDAGを実行する
目次
DEV Communityに Apache Airflow 3.2.0をPostgreSQL込みで入れて最初のDAGを動かす記事 が出ていた。
内容はLinuxサーバー上でAirflowをpipインストールし、standalone と個別プロセス起動の両方を試し、airflow.cfg をPostgreSQL向けに変えてDAGを読み込ませる流れだ。
原典の手順はスクリーンショット多めで追いやすいが、公式ドキュメントと突き合わせると「まず動かす」手順と「PostgreSQLをメタデータDBとして使い続ける」手順が少し混ざっていたので、流れを整理しつつ3.2.0固有の差分を足した。
standaloneはSQLiteで試す入口
Airflow公式のインストールページでは、まず試すだけなら pipx run apache-airflow standalone や uvx apache-airflow standalone が案内されている。
これはSQLite付きの最小構成を自動で起動するモードで、ローカルの動作確認には速い。
pipx run apache-airflow standalone
# または
uvx apache-airflow standalone
ただし公式ドキュメントも、standalone は本番用途ではないと明記している。
PostgreSQLを使う理由は、DAGそのもののデータを入れるためではなく、DAG Run、Task Instance、接続情報、変数、スケジューラ状態などを保持するAirflowのメタデータDBをSQLiteから外すためだ。
flowchart TD
A[DAGファイル<br/>Pythonコード] --> B[dag-processor<br/>DAGを解析]
B --> C[metadata DB<br/>PostgreSQL]
D[scheduler<br/>実行計画を作る] --> C
D --> E[executor<br/>タスクを実行]
F[api-server<br/>Web UIとAPI] --> C
この図でPostgreSQLに入るのは、業務データではなくAirflow自身の状態だ。
ETL先のPostgreSQLや分析DBをAirflowから操作する場合は、別途ConnectionやProviderを使う。
pipインストールはconstraintsを必ず合わせる
pipで入れる場合、Airflowはconstraintsファイル込みで入れる。
原典ではPython 3.12向けに以下の形で入れていた。
python -m venv airflow_venv
source airflow_venv/bin/activate
pip install --upgrade pip
pip install "apache-airflow[postgres]==3.2.0" \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-3.2.0/constraints-3.12.txt"
原典では apache-airflow[celery] を使っているが、最初のDAGをローカルで動かすだけならCeleryExecutorまでは要らない。
PostgreSQLメタデータDBを使う話なら、まずは postgres extra を入れるほうが目的と依存関係が合う。
CeleryExecutorを使うなら、PostgreSQLとは別にRedisやRabbitMQなどのブローカー設計も入ってくる。
constraintsのPythonバージョンは手元のPythonに合わせる。
Python 3.11なら constraints-3.11.txt、Python 3.12なら constraints-3.12.txt だ。ここがずれると、インストールできても後でProviderの依存関係が崩れやすい。
PostgreSQL側はユーザーとDBを先に作る
Airflow公式のDB設定ページでは、PostgreSQL 13〜17がサポート対象として挙げられている。
例として、Airflow専用のDBとユーザーを作る。
CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
\c airflow_db
GRANT ALL ON SCHEMA public TO airflow_user;
PostgreSQL 15以降では、public スキーマへの権限も忘れやすい。
airflow db migrate の途中で権限エラーになる場合、DB本体への権限だけでなくスキーマ権限も見る。
Airflow側は airflow.cfg を直接編集してもいいが、環境変数で渡すほうが再現しやすい。
export AIRFLOW_HOME=~/airflow
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql+psycopg2://airflow_user:airflow_pass@localhost:5432/airflow_db"
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export AIRFLOW__CORE__DAGS_FOLDER=~/workflows
現在値はコマンドで確認できる。
airflow config get-value database sql_alchemy_conn
airflow config get-value core dags_folder
DB接続先を変えたら、メタデータDBをマイグレーションする。
airflow db migrate
ここで「既にSQLiteで standalone を起動済み」の環境だと、~/airflow/airflow.cfg や既存DBファイルが残っていて話がややこしくなる。
PostgreSQL構成を試すなら、最初から AIRFLOW_HOME を分けるか、どの設定が読まれているかを airflow config get-value で確認するのが早い。
Airflow 3系はapi-serverとdag-processorを分けて見る
Airflow 2系の記事を読むと webserver 前提の説明がまだ多い。
Airflow 3系では、UI/API側は airflow api-server、DAG解析は airflow dag-processor として見る。
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
airflow api-server --port 8080
airflow scheduler
airflow dag-processor
airflow triggerer
原典でも触れられている通り、ユーザー作成コマンドを使うにはFlask AppBuilder系のAuth Managerを入れておく。
ModuleNotFoundError: No module named 'airflow.providers.fab' が出る場合は、FAB providerを入れる。
pip install apache-airflow-providers-fab
airflow db migrate
ただし、これは「Airflow 3系で従来型のユーザー管理を使う」ための話だ。
単にローカルでDAGを試すだけなら、standalone の自動ユーザーで済ませる選択もある。
そもそもDAGとは何か
DAGはDirected Acyclic Graphの略で、日本語にすると有向非巡回グラフだ。
グラフ理論の用語で、要素間に「向き」があり、かつ循環しない構造を指す。
Airflowでは、この構造をワークフローの定義に使っている。
ノードがタスク、エッジがタスク間の依存関係にあたる。
「タスクAが終わったらタスクBを実行する」という順序をグラフで表現していて、循環がないから実行順が一意に決まる。
flowchart LR
A[CSVダウンロード] --> B[バリデーション]
B --> C[DBロード]
B --> D[レポート生成]
C --> E[Slack通知]
D --> E
この図だと、CSVダウンロードが終わってからバリデーションが走り、バリデーション後にDBロードとレポート生成が並列で動く。
両方終わったらSlack通知。
もしSlack通知からCSVダウンロードへの矢印を足したら循環ができてしまうので、Airflowはそれを受け付けない。
AirflowのDAGはPythonファイル1つがDAG1つに対応する。
dags_folder に .py ファイルを置くと、dag-processor がそれを解析してメタデータDBに登録する。
DAGの中身はあくまでタスクの定義と順序だけで、実際のデータは持たない。
DAGに紐づく概念として、DAG RunとTask Instanceがある。
DAG Runは「あるDAGの1回分の実行」を表すオブジェクトで、スケジュール起動でも手動トリガーでも1つのDAG Runが作られる。
Task Instanceは「あるDAG Runの中の、あるタスクの1回分」だ。
上の図でいえば、1回のDAG Runで5つのTask Instanceが生まれる。
Web UIで見えるグリッドビューは、縦がタスク、横がDAG Runの時系列で、各セルがTask Instanceのステータスを示している。
DAGを定義するときに最低限触るのは dag_id、start_date、schedule、catchup あたりだ。
dag_id はAirflow内でDAGを一意に識別する文字列で、ファイル名とは無関係に自分で決める。
start_date はスケジューラがDAG Runを生成し始める起点の日時で、「この日から5分おきに実行する」のように schedule と組み合わせて使う。
schedule はDAG Runの間隔を指定する。timedelta(minutes=5) のような固定間隔のほかに、cron式やTimetableオブジェクトも受け付ける。
catchup は start_date から現在までの未実行分を遡って埋めるかどうかのフラグだ。デフォルトは True なので、start_date を半年前に設定して catchup=False にし忘れると、起動直後に大量のDAG Runが生成されて面食らう。
タスク間の依存関係はPythonの >> 演算子で書く。
task_a >> task_b >> task_c
これは「A → B → C」の直列実行になる。
並列にしたいときはリストを使う。
task_a >> [task_b, task_c] >> task_d
task_aの後にtask_bとtask_cが並列で走り、両方終わったらtask_dが動く。
set_upstream / set_downstream メソッドでも同じことはできるが、>> のほうが短い。
最初のDAGはairflow.sdkで書く
Airflow 3.2.0のリリースノートでは、DAG authoring向けの安定インターフェースとして airflow.sdk が前面に出ている。
古いサンプルでは from airflow import DAG がよく出てくるが、新しく書くならSDK側に寄せる。
~/workflows/simple_dag.py に置く例。
from datetime import datetime, timedelta
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import DAG
def say_hello():
print("Hello from Airflow")
def say_goodbye():
print("Goodbye from Airflow")
with DAG(
dag_id="simple_dag",
start_date=datetime(2026, 1, 1),
schedule=timedelta(minutes=5),
catchup=False,
) as dag:
hello_task = PythonOperator(
task_id="hi",
python_callable=say_hello,
)
goodbye_task = PythonOperator(
task_id="bye",
python_callable=say_goodbye,
)
hello_task >> goodbye_task
DAGが読み込まれているかはUIだけでなくCLIでも確認できる。
airflow dags list
airflow dags test simple_dag 2026-01-01
airflow dags list に出ない場合は、まず dags_folder と dag-processor のログを見る。
Pythonのimport error、Provider不足、DAGファイルの置き場所違いのどれかで止まっていることが多い。
3.2.0で気にするのは手順より運用境界
Airflow 3.2.0自体の大きな追加は、Asset Partitioning、Multi-Team Deployments、Deadline Alerts、DAG処理可視性の改善、Grid viewの性能改善などだ。
最初のDAGを動かすだけなら、ここを全部理解する必要はない。
一方で、PostgreSQL構成にした時点で見る場所は増える。
公式DB設定ページでは、AirflowはメタデータDBへの接続を多く開くため、本番のPostgreSQL構成ではPGBouncerの利用が推奨されている。
RDS、Cloud SQL、Azure Database for PostgreSQLのようなマネージドPostgreSQLでは、アイドル接続が切られて SSL SYSCALL error: EOF detected になることもあるので、keepalive設定も確認対象になる。
| 見る場所 | 理由 |
|---|---|
sql_alchemy_conn | Airflowが本当にPostgreSQLを見ているか確認する |
dags_folder | DAGファイルの置き場所と dag-processor の監視対象を合わせる |
executor | LocalExecutor、CeleryExecutor、KubernetesExecutorで周辺コンポーネントが変わる |
load_examples | サンプルDAGを消して、自分のDAGだけ見える状態にする |
| DB接続数 | scheduler、api-server、dag-processor、triggererが同じメタデータDBを見る |
| 認証と公開範囲 | 8080番ポートをそのままインターネットに出さない |
このブログでは以前、Daguのデフォルト認証なしAPIがRCEにつながる件を CISA KEV追加の重大脆弱性4件 で書いた。
AirflowとDaguは別製品だが、DAGやワークフロー定義を受け取る管理UI/APIを外に出す怖さは同じだ。
AirflowをVPSで動かすなら、まずリバースプロキシ、認証、ファイアウォール、非特権ユーザー実行を先に固める。
PostgreSQL側の性能やカーネル影響まで踏み込むなら、以前書いた Linuxカーネル7.0のPREEMPT_NONE廃止でPostgreSQLスループット半減 のような話も関係してくる。
AirflowのメタデータDBは業務DBほど巨大でなくても、DAG数、Task Instance数、履歴保持期間が増えると普通に効いてくる。
cronで足りなくなるとき
「毎日2時にCSVをダウンロードしてDBにロードする」程度の仕事なら、cronに1行書いてシェルスクリプトを叩けば済む。
Airflowを入れるまでもない。
DAGが効くのは、処理に分岐や合流が出てきたときだ。
CSVダウンロード → バリデーション → DBロード → レポート生成 → Slack通知、のように複数ステップを組んでいて、バリデーションで落ちたらDBロードは走らせたくない、レポート生成だけ失敗したらそこだけ再実行したい、というケースだ。
cronスクリプトでこれをやろうとすると、スクリプト内に if と exit が増えて、どこまで成功したかの判断がログ頼みになる。
途中で失敗した場合に「3ステップ目から再開」したければ、その仕組みを自前で書くことになる。
Airflowはこの部分をDAGの構造として持つ。
タスクごとに成功・失敗・リトライの状態がメタデータDBに記録されるので、失敗したタスクだけWeb UIからクリックして再実行できる。
cronスクリプトで起きがちな「全部最初からやり直す」が要らない。
もうひとつは並列実行だ。
前のセクションのMermaid図で、バリデーション後にDBロードとレポート生成が並列で動く例を出した。
cronスクリプトで並列化しようとすると、& でバックグラウンドに投げて wait する形になるが、片方だけ失敗したときのハンドリングがすぐ複雑になる。
DAGなら依存関係を >> で書くだけで、schedulerが自動で並列実行と合流を管理する。
バックフィルも地味に効く。
「先月分のデータを全部入れ直したい」とき、Airflowなら airflow dags backfill simple_dag --start-date 2026-04-01 --end-date 2026-04-30 で指定期間分のDAG Runをまとめて生成できる。
cronスクリプトだと日付パラメータを受けるように書いておいて、forループで回す自前の実装になる。
逆に、処理が直列1本で、失敗したら全部やり直しても困らない、バックフィルも不要、という仕事にAirflowを入れるのはオーバーキルだ。
scheduler、api-server、dag-processor、PostgreSQLと常時4プロセス以上走る構成は、cronの1行に比べて維持コストが高い。