
airflow-ai-sdk
An SDK for working with LLMs and AI Agents from Apache Airflow, based on Pydantic AI
Stars: 282

README:
This repository contains an SDK for working with LLMs from Apache Airflow, based on Pydantic AI. It allows users to call LLMs and orchestrate agent calls directly within their Airflow pipelines using decorator-based tasks. The SDK leverages the familiar Airflow @task
syntax with extensions like @task.llm
, @task.llm_branch
, and @task.agent
.
To get started, check out the examples repository here, which offers a full local Airflow instance with the AI SDK installed and 5 example pipelines. To run this locally, run:
git clone https://github.com/astronomer/ai-sdk-examples.git
cd ai-sdk-examples
astro dev start
If you don't have the Astro CLI installed, run brew install astro
(or see other options here).
If you already have Airflow running, you can also install the package with any optional dependencies you need:
pip install airflow-ai-sdk[openai,duckduckgo]
Note that installing the package with no optional dependencies will install the slim version of the package, which does not include any LLM models or tools. The available optional packages are listed here. While this SDK offers the optional dependencies for convenience sake, you can also install the optional dependencies from Pydantic AI directly.
Table of Contents:
-
LLM tasks with
@task.llm
: Define tasks that call language models (e.g. GPT-3.5-turbo) to process text. -
Agent tasks with
@task.agent
: Orchestrate multi-step AI reasoning by leveraging custom tools. - Automatic output parsing: Use function type hints (including Pydantic models) to automatically parse and validate LLM outputs.
-
Branching with
@task.llm_branch
: Change the control flow of a DAG based on the output of an LLM. - Model support: Support for all models in the Pydantic AI library (OpenAI, Anthropic, Gemini, Ollama, Groq, Mistral, Cohere, Bedrock)
We follow the taskflow pattern of Airflow with three decorators:
-
@task.llm
: Define a task that calls an LLM. Under the hood, this creates a Pydantic AIAgent
with no tools. -
@task.agent
: Define a task that calls an agent. You can pass in a Pydantic AIAgent
directly. -
@task.llm_branch
: Define a task that branches the control flow of a DAG based on the output of an LLM. Enforces that the LLM output is one of the downstream task_ids.
The function supplied to each decorator is a translation function that converts the Airflow task's input into the LLM's input. If you don't want to do any translation, you can just return the input unchanged.
AI workflows are becoming increasingly common as organizations look for pragmatic ways to get value out of LLMs. As with any workflow, it's important to have a flexible and scalable way to orchestrate them.
Airflow is a popular choice for orchestrating data pipelines. It's a powerful tool for managing the dependencies between tasks and for scheduling and monitoring them, and has been trusted by data teams everywhere for 10+ years. It comes "batteries included" with a rich set of capabilities:
- Flexible scheduling: run tasks on a fixed schedule, on-demand, or based on external events
- Dynamic task mapping: easily process multiple inputs in parallel with full error handling and observability
- Branching and conditional logic: change the control flow of a DAG based on the output of certain tasks
- Error handling: built-in support for retries, exponential backoff, and timeouts
- Resource management: limit the concurrency of tasks with Airflow Pools
- Monitoring: detailed logs and monitoring capabilities
- Scalability: designed for production workflows
This SDK is designed to make it easy to integrate LLM workflows into your Airflow pipelines. It allows you to do anything from simple LLM calls to complex agentic workflows.
See the full set of examples in the examples/dags directory.
This example shows how to use the @task.llm
decorator as part of an Airflow DAG. In the @task.llm
decorator, we can
specify a model and system prompt. The decorator allows you to transform the Airflow task's input into the LLM's input.
See full example: github_changelog.py
import os
import pendulum
from airflow.decorators import dag, task
from github import Github
@task
def get_recent_commits(data_interval_start: pendulum.DateTime, data_interval_end: pendulum.DateTime) -> list[str]:
"""
This task returns a mocked list of recent commits. In a real workflow, this
task would get the recent commits from a database or API.
"""
print(f"Getting commits for {data_interval_start} to {data_interval_end}")
gh = Github(os.getenv("GITHUB_TOKEN"))
repo = gh.get_repo("apache/airflow")
commits = repo.get_commits(since=data_interval_start, until=data_interval_end)
return [f"{commit.commit.sha}: {commit.commit.message}" for commit in commits]
@task.llm(
model="gpt-4o-mini",
result_type=str,
system_prompt="""
Your job is to summarize the commits to the Airflow project given a week's worth
of commits. Pay particular attention to large changes and new features as opposed
to bug fixes and minor changes.
You don't need to include every commit, just the most important ones. Add a one line
overall summary of the changes at the top, followed by bullet points of the most
important changes.
Example output:
This week, we made architectural changes to the core scheduler to make it more
maintainable and easier to understand.
- Made the scheduler 20% faster (commit 1234567)
- Added a new task type: `example_task` (commit 1234568)
- Added a new operator: `example_operator` (commit 1234569)
- Added a new sensor: `example_sensor` (commit 1234570)
"""
)
def summarize_commits(commits: list[str] | None = None) -> str:
"""
This task summarizes the commits. You can add logic here to transform the input
before it gets passed to the LLM.
"""
# don't need to do any translation
return "\n".join(commits)
@task
def send_summaries(summaries: str):
...
@dag(
schedule="@weekly",
start_date=pendulum.datetime(2025, 3, 1, tz="UTC"),
catchup=False,
)
def github_changelog():
commits = get_recent_commits()
summaries = summarize_commits(commits=commits)
send_summaries(summaries)
github_changelog()
This example demonstrates how to use the @task.llm
decorator to call an LLM and return a structured output. In this
case, we're using a Pydantic model to validate the output of the LLM. We recommend using the airflow_ai_sdk.BaseModel
class to define your Pydantic models in case we add more functionality in the future.
See full example: product_feedback_summarization.py
import pendulum
from typing import Literal, Any
from airflow.decorators import dag, task
from airflow.exceptions import AirflowSkipException
import airflow_ai_sdk as ai_sdk
from include.pii import mask_pii
@task
def get_product_feedback() -> list[str]:
"""
This task returns a mocked list of product feedback. In a real workflow, this
task would get the product feedback from a database or API.
"""
...
class ProductFeedbackSummary(ai_sdk.BaseModel):
summary: str
sentiment: Literal["positive", "negative", "neutral"]
feature_requests: list[str]
@task.llm(
model="gpt-4o-mini",
result_type=ProductFeedbackSummary,
system_prompt="""
You are a helpful assistant that summarizes product feedback.
"""
)
def summarize_product_feedback(feedback: str | None = None) -> ProductFeedbackSummary:
"""
This task summarizes the product feedback. You can add logic here to transform the input
before summarizing it.
"""
# if the feedback doesn't mention Airflow, skip it
if "Airflow" not in feedback:
raise AirflowSkipException("Feedback does not mention Airflow")
# mask PII in the feedback
feedback = mask_pii(feedback)
return feedback
@task
def upload_summaries(summaries: list[dict[str, Any]]):
...
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
)
def product_feedback_summarization():
feedback = get_product_feedback()
summaries = summarize_product_feedback.expand(feedback=feedback)
upload_summaries(summaries)
product_feedback_summarization()
This example shows how to build an AI agent that can autonomously invoke external tools (e.g., a knowledge base search) when answering a user question.
See full example: deep_research.py
import pendulum
import requests
from airflow.decorators import dag, task
from airflow.models.dagrun import DagRun
from airflow.models.param import Param
from bs4 import BeautifulSoup
from pydantic_ai import Agent
from pydantic_ai.common_tools.duckduckgo import duckduckgo_search_tool
# custom tool to get the content of a page
def get_page_content(url: str) -> str:
"""
Get the content of a page.
"""
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
distillation_agent = Agent(
"gpt-4o-mini",
system_prompt="""
You are responsible for distilling information from a text. The summary will be used by a research agent to generate a research report.
Keep the summary concise and to the point, focusing on only key information.
""",
)
return distillation_agent.run_sync(soup.get_text())
deep_research_agent = Agent(
"o3-mini",
system_prompt="""
You are a deep research agent who is very skilled at distilling information from the web. You are given a query and your job is to generate a research report.
You can search the web by using the `duckduckgo_search_tool`. You can also use the `get_page_content` tool to get the contents of a page.
Keep going until you have enough information to generate a research report. Assume you know nothing about the query or contents, so you need to search the web for relevant information.
Do not generate new information, only distill information from the web.
""",
tools=[duckduckgo_search_tool(), get_page_content],
)
@task.agent(agent=deep_research_agent)
def deep_research_task(dag_run: DagRun) -> str:
"""
This task performs a deep research on the given query.
"""
query = dag_run.conf.get("query")
if not query:
raise ValueError("Query is required")
print(f"Performing deep research on {query}")
return query
@task
def upload_results(results: str):
...
@dag(
schedule=None,
start_date=pendulum.datetime(2025, 3, 1, tz="UTC"),
catchup=False,
params={
"query": Param(
type="string",
default="How has the field of data engineering evolved in the last 5 years?",
),
},
)
def deep_research():
results = deep_research_task()
upload_results(results)
deep_research()
This example demonstrates how to use the @task.llm_branch
decorator to change the control flow of a DAG based on the output of an LLM. In this case, we're routing support tickets based on the severity of the ticket.
See full example: support_ticket_routing.py
import pendulum
from airflow.decorators import dag, task
from airflow.models.dagrun import DagRun
@task.llm_branch(
model="gpt-4o-mini",
system_prompt="""
You are a support agent that routes support tickets based on the priority of the ticket.
Here are the priority definitions:
- P0: Critical issues that impact the user's ability to use the product, specifically for a production deployment.
- P1: Issues that impact the user's ability to use the product, but not as severely (or not for their production deployment).
- P2: Issues that are low priority and can wait until the next business day
- P3: Issues that are not important or time sensitive
Here are some examples of tickets and their priorities:
- "Our production deployment just went down because it ran out of memory. Please help.": P0
- "Our staging / dev / QA deployment just went down because it ran out of memory. Please help.": P1
- "I'm having trouble logging in to my account.": P1
- "The UI is not loading.": P1
- "I need help setting up my account.": P2
- "I have a question about the product.": P3
""",
allow_multiple_branches=True,
)
def route_ticket(dag_run: DagRun) -> str:
return dag_run.conf.get("ticket")
@task
def handle_p0_ticket(ticket: str):
print(f"Handling P0 ticket: {ticket}")
@task
def handle_p1_ticket(ticket: str):
print(f"Handling P1 ticket: {ticket}")
@task
def handle_p2_ticket(ticket: str):
print(f"Handling P2 ticket: {ticket}")
@task
def handle_p3_ticket(ticket: str):
print(f"Handling P3 ticket: {ticket}")
@dag(
start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
params={"ticket": "Hi, our production deployment just went down because it ran out of memory. Please help."}
)
def support_ticket_routing():
ticket = route_ticket()
handle_p0_ticket(ticket)
handle_p1_ticket(ticket)
handle_p2_ticket(ticket)
handle_p3_ticket(ticket)
support_ticket_routing()
For Tasks:
Click tags to check more tools for each tasksFor Jobs:
Alternative AI tools for airflow-ai-sdk
Similar Open Source Tools

Tools4AI
Tools4AI is a Java-based Agentic Framework for building AI agents to integrate with enterprise Java applications. It enables the conversion of natural language prompts into actionable behaviors, streamlining user interactions with complex systems. By leveraging AI capabilities, it enhances productivity and innovation across diverse applications. The framework allows for seamless integration of AI with various systems, such as customer service applications, to interpret user requests, trigger actions, and streamline workflows. Prompt prediction anticipates user actions based on input prompts, enhancing user experience by proactively suggesting relevant actions or services based on context.

AI
AI is an open-source Swift framework for interfacing with generative AI. It provides functionalities for text completions, image-to-text vision, function calling, DALLE-3 image generation, audio transcription and generation, and text embeddings. The framework supports multiple AI models from providers like OpenAI, Anthropic, Mistral, Groq, and ElevenLabs. Users can easily integrate AI capabilities into their Swift projects using AI framework.

backtrack_sampler
Backtrack Sampler is a framework for experimenting with custom sampling algorithms that can backtrack the latest generated tokens. It provides a simple and easy-to-understand codebase for creating new sampling strategies. Users can implement their own strategies by creating new files in the `/strategy` directory. The repo includes examples for usage with llama.cpp and transformers, showcasing different strategies like Creative Writing, Anti-slop, Debug, Human Guidance, Adaptive Temperature, and Replace. The goal is to encourage experimentation and customization of backtracking algorithms for language models.

invariant
Invariant Analyzer is an open-source scanner designed for LLM-based AI agents to find bugs, vulnerabilities, and security threats. It scans agent execution traces to identify issues like looping behavior, data leaks, prompt injections, and unsafe code execution. The tool offers a library of built-in checkers, an expressive policy language, data flow analysis, real-time monitoring, and extensible architecture for custom checkers. It helps developers debug AI agents, scan for security violations, and prevent security issues and data breaches during runtime. The analyzer leverages deep contextual understanding and a purpose-built rule matching engine for security policy enforcement.

palimpzest
Palimpzest (PZ) is a tool for managing and optimizing workloads, particularly for data processing tasks. It provides a CLI tool and Python demos for users to register datasets, run workloads, and access results. Users can easily initialize their system, register datasets, and manage configurations using the CLI commands provided. Palimpzest also supports caching intermediate results and configuring for parallel execution with remote services like OpenAI and together.ai. The tool aims to streamline the workflow of working with datasets and optimizing performance for data extraction tasks.

neo4j-graphrag-python
The Neo4j GraphRAG package for Python is an official repository that provides features for creating and managing vector indexes in Neo4j databases. It aims to offer developers a reliable package with long-term commitment, maintenance, and fast feature updates. The package supports various Python versions and includes functionalities for creating vector indexes, populating them, and performing similarity searches. It also provides guidelines for installation, examples, and development processes such as installing dependencies, making changes, and running tests.

chores
The chores package provides a library of ergonomic LLM assistants designed to help users complete repetitive, hard-to-automate tasks quickly. Users can select code, trigger the chores addin, choose a helper, and watch their code be rewritten. The package offers chore helpers for tasks like converting to cli, testthat, and documenting functions with roxygen. Users can also create their own chore helpers by providing instructions in a markdown file. The cost of using helpers depends on the length of the prompt and the model chosen.

ActionWeaver
ActionWeaver is an AI application framework designed for simplicity, relying on OpenAI and Pydantic. It supports both OpenAI API and Azure OpenAI service. The framework allows for function calling as a core feature, extensibility to integrate any Python code, function orchestration for building complex call hierarchies, and telemetry and observability integration. Users can easily install ActionWeaver using pip and leverage its capabilities to create, invoke, and orchestrate actions with the language model. The framework also provides structured extraction using Pydantic models and allows for exception handling customization. Contributions to the project are welcome, and users are encouraged to cite ActionWeaver if found useful.

langchain
LangChain is a framework for developing Elixir applications powered by language models. It enables applications to connect language models to other data sources and interact with the environment. The library provides components for working with language models and off-the-shelf chains for specific tasks. It aims to assist in building applications that combine large language models with other sources of computation or knowledge. LangChain is written in Elixir and is not aimed for parity with the JavaScript and Python versions due to differences in programming paradigms and design choices. The library is designed to make it easy to integrate language models into applications and expose features, data, and functionality to the models.

chatmemory
ChatMemory is a simple yet powerful long-term memory manager that facilitates communication between AI and users. It organizes conversation data into history, summary, and knowledge entities, enabling quick retrieval of context and generation of clear, concise answers. The tool leverages vector search on summaries/knowledge and detailed history to provide accurate responses. It balances speed and accuracy by using lightweight retrieval and fallback detailed search mechanisms, ensuring efficient memory management and response generation beyond mere data retrieval.

llms
The 'llms' repository is a comprehensive guide on Large Language Models (LLMs), covering topics such as language modeling, applications of LLMs, statistical language modeling, neural language models, conditional language models, evaluation methods, transformer-based language models, practical LLMs like GPT and BERT, prompt engineering, fine-tuning LLMs, retrieval augmented generation, AI agents, and LLMs for computer vision. The repository provides detailed explanations, examples, and tools for working with LLMs.

NeMo-Guardrails
NeMo Guardrails is an open-source toolkit for easily adding _programmable guardrails_ to LLM-based conversational applications. Guardrails (or "rails" for short) are specific ways of controlling the output of a large language model, such as not talking about politics, responding in a particular way to specific user requests, following a predefined dialog path, using a particular language style, extracting structured data, and more.

semantic-cache
Semantic Cache is a tool for caching natural text based on semantic similarity. It allows for classifying text into categories, caching AI responses, and reducing API latency by responding to similar queries with cached values. The tool stores cache entries by meaning, handles synonyms, supports multiple languages, understands complex queries, and offers easy integration with Node.js applications. Users can set a custom proximity threshold for filtering results. The tool is ideal for tasks involving querying or retrieving information based on meaning, such as natural language classification or caching AI responses.

experts
Experts.js is a tool that simplifies the creation and deployment of OpenAI's Assistants, allowing users to link them together as Tools to create a Panel of Experts system with expanded memory and attention to detail. It leverages the new Assistants API from OpenAI, which offers advanced features such as referencing attached files & images as knowledge sources, supporting instructions up to 256,000 characters, integrating with 128 tools, and utilizing the Vector Store API for efficient file search. Experts.js introduces Assistants as Tools, enabling the creation of Multi AI Agent Systems where each Tool is an LLM-backed Assistant that can take on specialized roles or fulfill complex tasks.