I Built an MCP Server to Kill Your Airflow DAGs
I want to talk about why I built airflow-unfactor, but first I need to talk about why Airflow frustrates me.
Airflow Is a Legacy Tool
I know that's a strong statement. Hear me out.
Airflow was built in an era when Python had real constraints. Async wasn't mature. Typing was barely a thing. The language didn't have the ergonomics that we take for granted in 2026. So Airflow introduced a heavy DSL: DAGs, operators, XCom, connections, sensors, trigger rules. An entire parallel universe of constructs that you had to learn and internalize before you could orchestrate a single function call.
Those constraints are gone. Python is a different language now. We can write concurrent, typed, well-structured code natively. We don't need rigid abstractions to orchestrate work anymore. We definitely don't need to run Docker Compose to emulate a production scheduling system on our local machine just to test a pipeline.
And yet Airflow persists. Not because it's the best tool, but because it's the entrenched tool.
The Anti-Pattern I Keep Seeing
Here's what I've watched happen over and over again. An engineer has a perfectly good Python script. It extracts data, transforms it, loads it somewhere. It works. It's testable. It follows software engineering best practices.
Then someone says "we need to orchestrate this," and suddenly that clean script gets reconstituted into a DAG. The logic gets split across operators. Data passing goes through XCom instead of return values. The schedule gets baked into a DAG definition instead of living in deployment configuration. The engineer changes their entire programming methodology to fit Airflow's worldview.
That's backwards. The tool should meet the engineer where they are, not the other way around. In Prefect, your Python script is the flow. Decorate the entry point with @flow, decorate the functions with @task, and you're orchestrating. Your existing tests still pass. Your IDE still understands the code. You're still writing Python.
You can see that Airflow knows this is the direction things are heading. Their task flow decorator, their attempt to emulate what people love about Prefect, is an admission that the old operator model doesn't hold water. But bolting a Pythonic interface onto a non-Pythonic architecture only gets you so far.
The Sunk Cost Problem
So if Prefect is the better model, why doesn't everyone just switch?
Because they already have Airflow. That's it. That's the whole reason.
The sunk cost fallacy runs deep in engineering, especially when it comes to technical debt. Everyone has a roadmap. Everyone has a backlog. The idea of spending weeks migrating DAGs (rewriting, testing, deploying, validating) is a complete resource sink. Few people get excited about refactors, and it's hard to justify the investment when the existing system technically works.
I get it. I've been on the other side of that conversation. But "it works" is not the same as "it's good," and the ongoing cost of maintaining Airflow infrastructure, onboarding new engineers into its mental model, and fighting its constraints on every new pipeline? That cost compounds quietly.
So I Built a Tool
airflow-unfactor is an MCP server. You point it at an Airflow DAG, and an LLM generates clean Prefect code. Not a scaffold with TODOs. Not a template. Working code with tests.
Migrations shouldn't be a problem anymore. The biggest barrier to moving off Airflow was the labor involved. Remove the labor, remove the barrier.
Here's how it works. The server exposes a handful of tools over the Model Context Protocol:
analyze: Extracts DAG structure, operators, and complexity metrics. Gives the LLM a structural understanding of what it's working with.
convert: The main engine. Produces Prefect flow code plus a pytest test suite. The generated code includes commentary explaining the architectural changes, so the engineer reviewing it understands why things changed, not just what changed.
validate: Compares the original DAG against the generated flow. Task graph extraction, semantic operator mapping, XCom-to-return-value verification, confidence scoring. It tells you exactly what made it across and flags anything that needs attention.
explain: An educational tool that maps Airflow concepts to their Prefect equivalents. Useful for engineers who want to understand the translation, not just accept it.
batch: Multi-DAG processing for when you've got a directory full of DAGs and a free afternoon.
scaffold: Generates a project structure from a DAG directory. Gives you the flows/, tasks/, tests/ layout that makes the converted code feel like a real project, not a pile of migrated files.
The whole thing is built on FastMCP, the Pythonic framework for building MCP servers. FastMCP came out of Prefect, and it shows. Writing an MCP server with FastMCP feels like writing a Prefect flow: you decorate functions, return values, and the framework handles the protocol. I had airflow-unfactor serving tools within an afternoon because FastMCP got out of my way. Any MCP-compatible client can use it: Claude Code, Claude Desktop, Cursor, whatever you prefer.
What a Conversion Looks Like
You open your MCP client, point it at a DAG, and say "convert this to Prefect." The LLM reads the source, looks up the relevant operator mappings, checks Prefect docs for anything nuanced, generates the flow, and validates the output.
Thirty seconds. For a conversion that would take a developer hours by hand.
Here's a before-and-after with a realistic DAG. This is the kind of thing you actually find in production Airflow codebases:
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
default_args = {
"owner": "data-eng",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": lambda ctx: SlackWebhookOperator(
task_id="slack_fail",
slack_webhook_conn_id="slack_alerts",
message=f"Task {ctx['task_instance'].task_id} failed in {ctx['dag'].dag_id}",
).execute(ctx),
}
def _check_file_size(ti):
hook = S3Hook(aws_conn_id="aws_prod")
metadata = hook.head_object(key=ti.xcom_pull(task_ids="wait_for_file", key="s3_key"),
bucket_name="data-lake-raw")
ti.xcom_push(key="file_size_mb", value=metadata["ContentLength"] / 1e6)
def _pick_processing_path(ti):
size = ti.xcom_pull(task_ids="check_size", key="file_size_mb")
return "process_large" if size > 500 else "process_standard"
def _process_standard(ti):
hook = S3Hook(aws_conn_id="aws_prod")
key = ti.xcom_pull(task_ids="wait_for_file", key="s3_key")
data = hook.read_key(key=key, bucket_name="data-lake-raw")
# ... transform logic ...
ti.xcom_push(key="row_count", value=len(data.splitlines()))
def _process_large(ti):
hook = S3Hook(aws_conn_id="aws_prod")
key = ti.xcom_pull(task_ids="wait_for_file", key="s3_key")
# ... chunked processing logic ...
ti.xcom_push(key="row_count", value=0) # placeholder
def _load_to_warehouse(ti):
row_count = (ti.xcom_pull(task_ids="process_standard", key="row_count")
or ti.xcom_pull(task_ids="process_large", key="row_count"))
# ... load logic ...
def _send_success_alert(ti):
row_count = (ti.xcom_pull(task_ids="process_standard", key="row_count")
or ti.xcom_pull(task_ids="process_large", key="row_count"))
SlackWebhookOperator(
task_id="slack_ok",
slack_webhook_conn_id="slack_alerts",
message=f"Pipeline complete: {row_count} rows loaded",
).execute(ti.get_template_context())
with DAG(
"s3_ingest_pipeline",
default_args=default_args,
schedule=None,
start_date=datetime(2024, 1, 1),
catchup=False,
):
wait = S3KeySensor(
task_id="wait_for_file",
bucket_name="data-lake-raw",
bucket_key="incoming/{{ ds }}/*.parquet",
aws_conn_id="aws_prod",
poke_interval=60,
timeout=3600,
)
check = PythonOperator(task_id="check_size", python_callable=_check_file_size)
branch = BranchPythonOperator(task_id="pick_path", python_callable=_pick_processing_path)
standard = PythonOperator(task_id="process_standard", python_callable=_process_standard)
large = PythonOperator(task_id="process_large", python_callable=_process_large)
join = EmptyOperator(task_id="join", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
load = PythonOperator(task_id="load", python_callable=_load_to_warehouse)
alert = PythonOperator(task_id="success_alert", python_callable=_send_success_alert)
wait >> check >> branch >> [standard, large] >> join >> load >> alertThat's 75 lines before we even get to the actual business logic. Now look at the Prefect version:
from prefect import flow, task
from prefect.blocks.system import Secret
from prefect_aws import S3Bucket
import boto3
s3_bucket = S3Bucket.load("data-lake-raw")
@task(retries=3, retry_delay_seconds=300)
def check_file_size(key: str) -> float:
client = boto3.client("s3")
metadata = client.head_object(Bucket=s3_bucket.bucket_name, Key=key)
return metadata["ContentLength"] / 1e6
@task(retries=3, retry_delay_seconds=300)
def process_file(key: str, size_mb: float) -> int:
data = s3_bucket.read_path(key)
if size_mb > 500:
return process_in_chunks(data)
return process_standard(data)
@task(retries=3, retry_delay_seconds=300)
def load_to_warehouse(row_count: int):
# ... load logic ...
pass
@task
def notify_slack(row_count: int):
webhook_url = Secret.load("slack-alerts")
# ... send notification ...
@flow(name="s3-ingest-pipeline", log_prints=True)
def s3_ingest_pipeline(key: str):
size_mb = check_file_size(key)
row_count = process_file(key, size_mb)
load_to_warehouse(row_count)
notify_slack(row_count)Count the lines. The branching logic, the XCom juggling, the trigger rules, the sensor, the connection IDs, the EmptyOperator just to rejoin branches, the callback-within-a-callback for Slack notifications, the ti.xcom_pull calls fishing for data across task boundaries with string-based task IDs. All gone.
In the Prefect version, the branching is an if statement inside process_file. Data flows through function arguments and return values. The Slack notification is a function call, not an operator instantiated inside a callback. The flow takes a key parameter directly instead of waiting on a sensor. You trigger it from an event (S3 notification, webhook, whatever you want) rather than polling from inside the DAG.
Every function is independently testable with pytest. No Airflow test harness. No DAG context manager. No ti object. Just Python.
Why MCP
I built this as an MCP server because the architecture maps perfectly to the problem. MCP tools are composable. The LLM decides which tools to call and in what order based on what it's looking at. A simple DAG might only need convert and validate. A complex one with custom operators and unusual patterns might need explain and a docs search first.
The server doesn't try to be smart. It provides raw materials (source code, translation knowledge, documentation, validation) and trusts the LLM to reason about what to do. Each tool has a clear, bounded responsibility. The LLM orchestrates them.
Compare that to a monolithic "convert my DAG" endpoint that tries to handle everything internally. That approach is brittle and opaque. With MCP, you can see exactly what knowledge the LLM received, exactly what it generated, and exactly what the validator found.
airflow-unfactor is a single-purpose open source tool, but it's worth seeing it in the context of where Prefect is going with AI. Horizon, Prefect's enterprise MCP Gateway, takes the same idea and scales it across your entire infrastructure. Horizon exposes your Prefect workspace (flows, deployments, work pools, variables, blocks) as MCP tools that any AI agent can call. Instead of one developer running a migration from their laptop, you can wire Horizon into your CI pipeline, your Slack bots, your internal agent framework, whatever. The same MCP protocol, the same composability, but now your AI agents can orchestrate production workflows, not just convert DAGs. FastMCP is the foundation underneath both.
The Bigger Picture
I built airflow-unfactor because I think Airflow's time has passed and the only thing keeping people on it is inertia. The core philosophies and components of Airflow were answers to problems that no longer exist. Python has grown up. Orchestration should grow up with it.
Prefect represents the future of this space. It's intuitive, Pythonic, testable, and built around software engineering best practices instead of against them. Tools like airflow-unfactor and the Prefect docs MCP server make it easy to move off legacy tooling and adopt a modern workflow. The investment in AI-assisted migration means that the sunk cost argument doesn't hold like it used to.
To be clear, these are my opinions, not Prefect's official position. But I believe in what we're building, and I built this tool because I wanted to make it easier for engineers to experience the difference for themselves.
airflow-unfactor is open source. Install it with uvx airflow-unfactor or pip install airflow-unfactor, add it to your MCP client, and try it on one of your DAGs. The GitHub repo has full documentation and examples.
Happy engineering.