flyte-sdk
Type-safe, distributed orchestration of agents, ML pipelines, and real-time inference β in pure Python with async/await.
Stars: 67
Flyte 2 SDK is a pure Python tool for type-safe, distributed orchestration of agents, ML pipelines, and more. It allows users to write data pipelines, ML training jobs, and distributed compute in Python without any DSL constraints. With features like async-first parallelism and fine-grained observability, Flyte 2 offers a seamless workflow experience. Users can leverage core concepts like TaskEnvironments for container configuration, pure Python workflows for flexibility, and async parallelism for distributed execution. Advanced features include sub-task observability with tracing and remote task execution. The tool also provides native Jupyter integration for running and monitoring workflows directly from notebooks. Configuration and deployment are made easy with configuration files and commands for deploying and running workflows. Flyte 2 is licensed under the Apache 2.0 License.
README:
Type-safe, distributed orchestration of agents, ML pipelines, and more β in pure Python with async/await or sync!
β‘ Pure Python workflows β’ π Async-first parallelism β’ π οΈ Zero DSL constraints β’ π Sub-task observability
- π Documentation: Docs Link
-
βΆοΈ Getting Started: Docs Link - π¬ Community: Slack | GitHub Discussions
- π Examples: GitHub Examples
- π Issues: Bug Reports
Flyte 2 represents a fundamental shift from constrained domain-specific languages to pure Python workflows. Write data pipelines, ML training jobs, and distributed compute exactly like you write Pythonβbecause it is Python.
import flyte
env = flyte.TaskEnvironment("hello_world")
@env.task
async def process_data(data: list[str]) -> list[str]:
# Use any Python construct: loops, conditionals, try/except
results = []
for item in data:
if len(item) > 5:
results.append(await transform_item(item))
return results
@env.task
async def transform_item(item: str) -> str:
return f"processed: {item.upper()}"
if __name__ == "__main__":
flyte.init()
result = flyte.run(process_data, data=["hello", "world", "flyte"])| Feature Highlight | Flyte 1 | Flyte 2 |
|---|---|---|
| No More Workflow DSL | β @workflow decorators with Python subset limitations |
β Pure Python: loops, conditionals, error handling, dynamic structures |
| Async-First Parallelism | β Custom map() functions and workflow-specific parallel constructs |
β
Native asyncio: await asyncio.gather() for distributed parallel execution |
| Fine-Grained Observability | β Task-level logging only | β
Function-level tracing with @flyte.trace for sub-task checkpoints |
# Install uv package manager
curl -LsSf https://astral.sh/uv/install.sh | sh
# Create virtual environment
uv venv && source .venv/bin/activate
# Install Flyte 2 (beta)
uv pip install --prerelease=allow flyte# hello.py
# /// script
# requires-python = ">=3.10"
# dependencies = ["flyte>=2.0.0b0"]
# ///
import flyte
env = flyte.TaskEnvironment(
name="hello_world",
resources=flyte.Resources(memory="250Mi")
)
@env.task
def calculate(x: int) -> int:
return x * 2 + 5
@env.task
async def main(numbers: list[int]) -> float:
# Parallel execution across distributed containers
results = await asyncio.gather(*[
calculate.aio(num) for num in numbers
])
return sum(results) / len(results)
if __name__ == "__main__":
flyte.init_from_config("config.yaml")
run = flyte.run(main, numbers=list(range(10)))
print(f"Result: {run.result}")
print(f"View at: {run.url}")# Run locally, execute remotely
uv run --prerelease=allow hello.py# Group tasks with shared configuration
env = flyte.TaskEnvironment(
name="ml_pipeline",
image=flyte.Image.from_debian_base().with_pip_packages(
"torch", "pandas", "scikit-learn"
),
resources=flyte.Resources(cpu=4, memory="8Gi", gpu=1),
reusable=flyte.ReusePolicy(replicas=3, idle_ttl=300)
)
@env.task
def train_model(data: flyte.io.File) -> flyte.io.File:
# Runs in configured container with GPU access
pass
@env.task
def evaluate_model(model: flyte.io.File, test_data: flyte.io.File) -> dict:
# Same container configuration, different instance
pass@env.task
async def dynamic_pipeline(config: dict) -> list[str]:
results = []
# β
Use any Python construct
for dataset in config["datasets"]:
try:
# β
Native error handling
if dataset["type"] == "batch":
result = await process_batch(dataset)
else:
result = await process_stream(dataset)
results.append(result)
except ValidationError as e:
# β
Custom error recovery
result = await handle_error(dataset, e)
results.append(result)
return results@env.task
async def parallel_training(hyperparams: list[dict]) -> dict:
# Each model trains on separate infrastructure
models = await asyncio.gather(*[
train_model.aio(params) for params in hyperparams
])
# Evaluate all models in parallel
evaluations = await asyncio.gather(*[
evaluate_model.aio(model) for model in models
])
# Find best model
best_idx = max(range(len(evaluations)),
key=lambda i: evaluations[i]["accuracy"])
return {"best_model": models[best_idx], "accuracy": evaluations[best_idx]}@flyte.trace
async def expensive_computation(data: str) -> str:
# Function-level checkpointing - recoverable on failure
result = await call_external_api(data)
return process_result(result)
@env.task(cache=flyte.Cache(behavior="auto"))
async def main_task(inputs: list[str]) -> list[str]:
results = []
for inp in inputs:
# If task fails here, it resumes from the last successful trace
result = await expensive_computation(inp)
results.append(result)
return resultsimport flyte.remote
# Remote tasks deployed elsewhere
torch_task = flyte.remote.Task.get("torch_env.train_model", auto_version="latest")
spark_task = flyte.remote.Task.get("spark_env.process_data", auto_version="latest")
@env.task
async def orchestrator(raw_data: flyte.io.File) -> flyte.io.File:
# Execute Spark job on big data cluster
processed = await spark_task(raw_data)
# Execute PyTorch training on GPU cluster
model = await torch_task(processed)
return modelRun and monitor workflows directly from notebooks:
# In Jupyter cell
import flyte
flyte.init_from_config()
run = flyte.run(my_workflow, data=large_dataset)
# Stream logs in real-time
run.logs.stream()
# Get outputs when complete
results = run.wait()# config.yaml
endpoint: https://my-flyte-instance.com
project: ml-team
domain: production
image:
builder: local
registry: ghcr.io/my-org
auth:
type: oauth2# Deploy tasks to remote cluster
flyte deploy my_workflow.py
# Run deployed workflow
flyte run my_workflow --input-file params.json
# Monitor execution
flyte logs <execution-id>| Flyte 1 | Flyte 2 |
|---|---|
@workflow + @task
|
@env.task only |
flytekit.map() |
await asyncio.gather() |
@dynamic workflows |
Regular @env.task with loops |
flytekit.conditional() |
Python if/else
|
LaunchPlan schedules |
@env.task(on_schedule=...) |
| Workflow failure handlers | Python try/except
|
We welcome contributions! Whether it's:
- π Bug fixes
- β¨ New features
- π Documentation improvements
- π§ͺ Testing enhancements
To get started, make sure you start from a new virtual environment and install this package in editable mode with any of the supported Python versions, from 3.10 to 3.13.
uv venv --python 3.13
uv pip install -e .Besides from picking up local code changes, installing the package in editable mode
also changes the definition of the default Image() object to use a locally
build wheel. You will need to build said wheel by yourself though, with the make dist target.
make dist
python maint_tools/build_default_image.pyYou'll need to have a local docker daemon running for this. The build script does nothing
more than invoke the local image builder, which will create a buildx builder named flytex if not present. Note that only members of the Flyte Maintainers group has
access to push to the default registry. If you don't have access, please make sure to
specify the registry and name to the build script.
python maint_tools/build_default_image.py --registry ghcr.io/my-org --name my-flyte-imageFlyte 2 is licensed under the Apache 2.0 License.
For Tasks:
Click tags to check more tools for each tasksFor Jobs:
Alternative AI tools for flyte-sdk
Similar Open Source Tools
flyte-sdk
Flyte 2 SDK is a pure Python tool for type-safe, distributed orchestration of agents, ML pipelines, and more. It allows users to write data pipelines, ML training jobs, and distributed compute in Python without any DSL constraints. With features like async-first parallelism and fine-grained observability, Flyte 2 offers a seamless workflow experience. Users can leverage core concepts like TaskEnvironments for container configuration, pure Python workflows for flexibility, and async parallelism for distributed execution. Advanced features include sub-task observability with tracing and remote task execution. The tool also provides native Jupyter integration for running and monitoring workflows directly from notebooks. Configuration and deployment are made easy with configuration files and commands for deploying and running workflows. Flyte 2 is licensed under the Apache 2.0 License.
FDAbench
FDABench is a benchmark tool designed for evaluating data agents' reasoning ability over heterogeneous data in analytical scenarios. It offers 2,007 tasks across various data sources, domains, difficulty levels, and task types. The tool provides ready-to-use data agent implementations, a DAG-based evaluation system, and a framework for agent-expert collaboration in dataset generation. Key features include data agent implementations, comprehensive evaluation metrics, multi-database support, different task types, extensible framework for custom agent integration, and cost tracking. Users can set up the environment using Python 3.10+ on Linux, macOS, or Windows. FDABench can be installed with a one-command setup or manually. The tool supports API configuration for LLM access and offers quick start guides for database download, dataset loading, and running examples. It also includes features like dataset generation using the PUDDING framework, custom agent integration, evaluation metrics like accuracy and rubric score, and a directory structure for easy navigation.
agentops
AgentOps is a toolkit for evaluating and developing robust and reliable AI agents. It provides benchmarks, observability, and replay analytics to help developers build better agents. AgentOps is open beta and can be signed up for here. Key features of AgentOps include: - Session replays in 3 lines of code: Initialize the AgentOps client and automatically get analytics on every LLM call. - Time travel debugging: (coming soon!) - Agent Arena: (coming soon!) - Callback handlers: AgentOps works seamlessly with applications built using Langchain and LlamaIndex.
z-ai-sdk-python
Z.ai Open Platform Python SDK is the official Python SDK for Z.ai's large model open interface, providing developers with easy access to Z.ai's open APIs. The SDK offers core features like chat completions, embeddings, video generation, audio processing, assistant API, and advanced tools. It supports various functionalities such as speech transcription, text-to-video generation, image understanding, and structured conversation handling. Developers can customize client behavior, configure API keys, and handle errors efficiently. The SDK is designed to simplify AI interactions and enhance AI capabilities for developers.
docutranslate
Docutranslate is a versatile tool for translating documents efficiently. It supports multiple file formats and languages, making it ideal for businesses and individuals needing quick and accurate translations. The tool uses advanced algorithms to ensure high-quality translations while maintaining the original document's formatting. With its user-friendly interface, Docutranslate simplifies the translation process and saves time for users. Whether you need to translate legal documents, technical manuals, or personal letters, Docutranslate is the go-to solution for all your document translation needs.
LocalAGI
LocalAGI is a powerful, self-hostable AI Agent platform that allows you to design AI automations without writing code. It provides a complete drop-in replacement for OpenAI's Responses APIs with advanced agentic capabilities. With LocalAGI, you can create customizable AI assistants, automations, chat bots, and agents that run 100% locally, without the need for cloud services or API keys. The platform offers features like no-code agents, web-based interface, advanced agent teaming, connectors for various platforms, comprehensive REST API, short & long-term memory capabilities, planning & reasoning, periodic tasks scheduling, memory management, multimodal support, extensible custom actions, fully customizable models, observability, and more.
acte
Acte is a framework designed to build GUI-like tools for AI Agents. It aims to address the issues of cognitive load and freedom degrees when interacting with multiple APIs in complex scenarios. By providing a graphical user interface (GUI) for Agents, Acte helps reduce cognitive load and constraints interaction, similar to how humans interact with computers through GUIs. The tool offers APIs for starting new sessions, executing actions, and displaying screens, accessible via HTTP requests or the SessionManager class.
mcp-context-forge
MCP Context Forge is a powerful tool for generating context-aware data for machine learning models. It provides functionalities to create diverse datasets with contextual information, enhancing the performance of AI algorithms. The tool supports various data formats and allows users to customize the context generation process easily. With MCP Context Forge, users can efficiently prepare training data for tasks requiring contextual understanding, such as sentiment analysis, recommendation systems, and natural language processing.
pocketgroq
PocketGroq is a tool that provides advanced functionalities for text generation, web scraping, web search, and AI response evaluation. It includes features like an Autonomous Agent for answering questions, web crawling and scraping capabilities, enhanced web search functionality, and flexible integration with Ollama server. Users can customize the agent's behavior, evaluate responses using AI, and utilize various methods for text generation, conversation management, and Chain of Thought reasoning. The tool offers comprehensive methods for different tasks, such as initializing RAG, error handling, and tool management. PocketGroq is designed to enhance development processes and enable the creation of AI-powered applications with ease.
augustus
Augustus is a Go-based LLM vulnerability scanner designed for security professionals to test large language models against a wide range of adversarial attacks. It integrates with 28 LLM providers, covers 210+ adversarial attacks including prompt injection, jailbreaks, encoding exploits, and data extraction, and produces actionable vulnerability reports. The tool is built for production security testing with features like concurrent scanning, rate limiting, retry logic, and timeout handling out of the box.
VimLM
VimLM is an AI-powered coding assistant for Vim that integrates AI for code generation, refactoring, and documentation directly into your Vim workflow. It offers native Vim integration with split-window responses and intuitive keybindings, offline first execution with MLX-compatible models, contextual awareness with seamless integration with codebase and external resources, conversational workflow for iterating on responses, project scaffolding for generating and deploying code blocks, and extensibility for creating custom LLM workflows with command chains.
RepairAgent
RepairAgent is an autonomous LLM-based agent for automated program repair targeting the Defects4J benchmark. It uses an LLM-driven loop to localize, analyze, and fix Java bugs. The tool requires Docker, VS Code with Dev Containers extension, OpenAI API key, disk space of ~40 GB, and internet access. Users can get started with RepairAgent using either VS Code Dev Container or Docker Image. Running RepairAgent involves checking out the buggy project version, autonomous bug analysis, fix candidate generation, and testing against the project's test suite. Users can configure hyperparameters for budget control, repetition handling, commands limit, and external fix strategy. The tool provides output structure, experiment overview, individual analysis scripts, and data on fixed bugs from the Defects4J dataset.
onnxruntime-server
ONNX Runtime Server is a server that provides TCP and HTTP/HTTPS REST APIs for ONNX inference. It aims to offer simple, high-performance ML inference and a good developer experience. Users can provide inference APIs for ONNX models without writing additional code by placing the models in the directory structure. Each session can choose between CPU or CUDA, analyze input/output, and provide Swagger API documentation for easy testing. Ready-to-run Docker images are available, making it convenient to deploy the server.
adk-rust
ADK-Rust is a comprehensive and production-ready Rust framework for building AI agents. It features type-safe agent abstractions with async execution and event streaming, multiple agent types including LLM agents, workflow agents, and custom agents, realtime voice agents with bidirectional audio streaming, a tool ecosystem with function tools, Google Search, and MCP integration, production features like session management, artifact storage, memory systems, and REST/A2A APIs, and a developer-friendly experience with interactive CLI, working examples, and comprehensive documentation. The framework follows a clean layered architecture and is production-ready and actively maintained.
evalplus
EvalPlus is a rigorous evaluation framework for LLM4Code, providing HumanEval+ and MBPP+ tests to evaluate large language models on code generation tasks. It offers precise evaluation and ranking, coding rigorousness analysis, and pre-generated code samples. Users can use EvalPlus to generate code solutions, post-process code, and evaluate code quality. The tool includes tools for code generation and test input generation using various backends.
polyfire-js
Polyfire is an all-in-one managed backend for AI apps that allows users to build AI apps directly from the frontend, eliminating the need for a separate backend. It simplifies the process by providing most backend services in just a few lines of code. With Polyfire, users can easily create chatbots, transcribe audio files to text, generate simple text, create a long-term memory, and generate images with Dall-E. The tool also offers starter guides and tutorials to help users get started quickly and efficiently.
For similar tasks
flyte-sdk
Flyte 2 SDK is a pure Python tool for type-safe, distributed orchestration of agents, ML pipelines, and more. It allows users to write data pipelines, ML training jobs, and distributed compute in Python without any DSL constraints. With features like async-first parallelism and fine-grained observability, Flyte 2 offers a seamless workflow experience. Users can leverage core concepts like TaskEnvironments for container configuration, pure Python workflows for flexibility, and async parallelism for distributed execution. Advanced features include sub-task observability with tracing and remote task execution. The tool also provides native Jupyter integration for running and monitoring workflows directly from notebooks. Configuration and deployment are made easy with configuration files and commands for deploying and running workflows. Flyte 2 is licensed under the Apache 2.0 License.
dbt-airflow
A Python package that helps Data and Analytics engineers render dbt projects in Apache Airflow DAGs. It enables teams to automatically render their dbt projects in a granular level, creating individual Airflow tasks for every model, seed, snapshot, and test within the dbt project. This allows for full control at the task-level, improving visibility and management of data models within the team.
blades
Blades is a multimodal AI Agent framework in Go, supporting custom models, tools, memory, middleware, and more. It is well-suited for multi-turn conversations, chain reasoning, and structured output. The framework provides core components like Agent, Prompt, Chain, ModelProvider, Tool, Memory, and Middleware, enabling developers to build intelligent applications with flexible configuration and high extensibility. Blades leverages the characteristics of Go to achieve high decoupling and efficiency, making it easy to integrate different language model services and external tools. The project is in its early stages, inviting Go developers and AI enthusiasts to contribute and explore the possibilities of building AI applications in Go.
ray
Ray is a unified framework for scaling AI and Python applications. It consists of a core distributed runtime and a set of AI libraries for simplifying ML compute, including Data, Train, Tune, RLlib, and Serve. Ray runs on any machine, cluster, cloud provider, and Kubernetes, and features a growing ecosystem of community integrations. With Ray, you can seamlessly scale the same code from a laptop to a cluster, making it easy to meet the compute-intensive demands of modern ML workloads.
aiscript
AiScript is a lightweight scripting language that runs on JavaScript. It supports arrays, objects, and functions as first-class citizens, and is easy to write without the need for semicolons or commas. AiScript runs in a secure sandbox environment, preventing infinite loops from freezing the host. It also allows for easy provision of variables and functions from the host.
dstack
Dstack is an open-source orchestration engine for running AI workloads in any cloud. It supports a wide range of cloud providers (such as AWS, GCP, Azure, Lambda, TensorDock, Vast.ai, CUDO, RunPod, etc.) as well as on-premises infrastructure. With Dstack, you can easily set up and manage dev environments, tasks, services, and pools for your AI workloads.
mobius
Mobius is an AI infra platform including realtime computing and training. It is built on Ray, a distributed computing framework, and provides a number of features that make it well-suited for online machine learning tasks. These features include: * **Cross Language**: Mobius can run in multiple languages (only Python and Java are supported currently) with high efficiency. You can implement your operator in different languages and run them in one job. * **Single Node Failover**: Mobius has a special failover mechanism that only needs to rollback the failed node itself, in most cases, to recover the job. This is a huge benefit if your job is sensitive about failure recovery time. * **AutoScaling**: Mobius can generate a new graph with different configurations in runtime without stopping the job. * **Fusion Training**: Mobius can combine TensorFlow/Pytorch and streaming, then building an e2e online machine learning pipeline. Mobius is still under development, but it has already been used to power a number of real-world applications, including: * A real-time recommendation system for a major e-commerce company * A fraud detection system for a large financial institution * A personalized news feed for a major news organization If you are interested in using Mobius for your own online machine learning projects, you can find more information in the documentation.
vasttools
This repository contains a collection of tools that can be used with vastai. The tools are free to use, modify and distribute. If you find this useful and wish to donate your welcome to send your donations to the following wallets. BTC 15qkQSYXP2BvpqJkbj2qsNFb6nd7FyVcou XMR 897VkA8sG6gh7yvrKrtvWningikPteojfSgGff3JAUs3cu7jxPDjhiAZRdcQSYPE2VGFVHAdirHqRZEpZsWyPiNK6XPQKAg RVN RSgWs9Co8nQeyPqQAAqHkHhc5ykXyoMDUp USDT(ETH ERC20) 0xa5955cf9fe7af53bcaa1d2404e2b17a1f28aac4f Paypal PayPal.Me/cryptolabsZA
For similar jobs
db2rest
DB2Rest is a modern low-code REST DATA API platform that simplifies the development of intelligent applications. It seamlessly integrates existing and new databases with language models (LMs/LLMs) and vector stores, enabling the rapid delivery of context-aware, reasoning applications without vendor lock-in.
mage-ai
Mage is an open-source data pipeline tool for transforming and integrating data. It offers an easy developer experience, engineering best practices built-in, and data as a first-class citizen. Mage makes it easy to build, preview, and launch data pipelines, and provides observability and scaling capabilities. It supports data integrations, streaming pipelines, and dbt integration.
airbyte
Airbyte is an open-source data integration platform that makes it easy to move data from any source to any destination. With Airbyte, you can build and manage data pipelines without writing any code. Airbyte provides a library of pre-built connectors that make it easy to connect to popular data sources and destinations. You can also create your own connectors using Airbyte's no-code Connector Builder or low-code CDK. Airbyte is used by data engineers and analysts at companies of all sizes to build and manage their data pipelines.
labelbox-python
Labelbox is a data-centric AI platform for enterprises to develop, optimize, and use AI to solve problems and power new products and services. Enterprises use Labelbox to curate data, generate high-quality human feedback data for computer vision and LLMs, evaluate model performance, and automate tasks by combining AI and human-centric workflows. The academic & research community uses Labelbox for cutting-edge AI research.
telemetry-airflow
This repository codifies the Airflow cluster that is deployed at workflow.telemetry.mozilla.org (behind SSO) and commonly referred to as "WTMO" or simply "Airflow". Some links relevant to users and developers of WTMO: * The `dags` directory in this repository contains some custom DAG definitions * Many of the DAGs registered with WTMO don't live in this repository, but are instead generated from ETL task definitions in bigquery-etl * The Data SRE team maintains a WTMO Developer Guide (behind SSO)
airflow
Apache Airflow (or simply Airflow) is a platform to programmatically author, schedule, and monitor workflows. When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative. Use Airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.
airbyte-platform
Airbyte is an open-source data integration platform that makes it easy to move data from any source to any destination. With Airbyte, you can build and manage data pipelines without writing any code. Airbyte provides a library of pre-built connectors that make it easy to connect to popular data sources and destinations. You can also create your own connectors using Airbyte's low-code Connector Development Kit (CDK). Airbyte is used by data engineers and analysts at companies of all sizes to move data for a variety of purposes, including data warehousing, data analysis, and machine learning.
chronon
Chronon is a platform that simplifies and improves ML workflows by providing a central place to define features, ensuring point-in-time correctness for backfills, simplifying orchestration for batch and streaming pipelines, offering easy endpoints for feature fetching, and guaranteeing and measuring consistency. It offers benefits over other approaches by enabling the use of a broad set of data for training, handling large aggregations and other computationally intensive transformations, and abstracting away the infrastructure complexity of data plumbing.