aiostream
Generator-based operators for asynchronous iteration
Stars: 743
aiostream provides a collection of stream operators for creating asynchronous pipelines of operations. It offers features like operator pipe-lining, repeatability, safe iteration context, simplified execution, slicing and indexing, and concatenation. The stream operators are categorized into creation, transformation, selection, combination, aggregation, advanced, timing, and miscellaneous. Users can combine these operators to perform various asynchronous tasks efficiently.
README:
|docs-badge| |cov-badge| |ci-badge| |version-badge| |pyversion-badge|
Generator-based operators for asynchronous iteration
aiostream_ provides a collection of stream operators that can be combined to create asynchronous pipelines of operations.
It can be seen as an asynchronous version of itertools_, although some aspects are slightly different. Essentially, all the provided operators return a unified interface called a stream. A stream is an enhanced asynchronous iterable providing the following features:
-
Operator pipe-lining - using pipe symbol
| - Repeatability - every iteration creates a different iterator
-
Safe iteration context - using
async withand thestreammethod -
Simplified execution - get the last element from a stream using
await -
Slicing and indexing - using square brackets
[] -
Concatenation - using addition symbol
+
The stream operators_ are separated in 7 categories:
+--------------------+---------------------------------------------------------------------------------------+ | creation | iterate_, preserve_, just_, call_, empty_, throw_, never_, repeat_, count_, range_ | +--------------------+---------------------------------------------------------------------------------------+ | transformation | map_, enumerate_, starmap_, cycle_, chunks_ | +--------------------+---------------------------------------------------------------------------------------+ | selection | take_, takelast_, skip_, skiplast_, getitem_, filter_, until_, takewhile_, dropwhile_ | +--------------------+---------------------------------------------------------------------------------------+ | combination | map_, zip_, merge_, chain_, ziplatest_ | +--------------------+---------------------------------------------------------------------------------------+ | aggregation | accumulate_, reduce_, list_ | +--------------------+---------------------------------------------------------------------------------------+ | advanced | concat_, flatten_, switch_, concatmap_, flatmap_, switchmap_ | +--------------------+---------------------------------------------------------------------------------------+ | timing | spaceout_, timeout_, delay_ | +--------------------+---------------------------------------------------------------------------------------+ | miscellaneous | action_, print_ | +--------------------+---------------------------------------------------------------------------------------+
The following example demonstrates most of the streams capabilities:
.. code:: python
import asyncio
from aiostream import stream, pipe
async def main():
# Create a counting stream with a 0.2 seconds interval
xs = stream.count(interval=0.2)
# Operators can be piped using '|'
ys = xs | pipe.map(lambda x: x**2)
# Streams can be sliced
zs = ys[1:10:2]
# Use a stream context for proper resource management
async with zs.stream() as streamer:
# Asynchronous iteration
async for z in streamer:
# Print 1, 9, 25, 49 and 81
print('->', z)
# Streams can be awaited and return the last value
print('9² = ', await zs)
# Streams can run several times
print('9² = ', await zs)
# Streams can be concatenated
one_two_three = stream.just(1) + stream.range(2, 4)
# Print [1, 2, 3]
print(await stream.list(one_two_three))
# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
More examples are available in the example section_ of the documentation.
You can install aiostream from PyPI as the aiostream package_.
Vincent Michel: [email protected]
.. _aiostream: https://github.com/vxgmichel/aiostream .. _PEP 525: http://www.python.org/dev/peps/pep-0525/ .. _Rx: http://reactivex.io/ .. _aioreactive: http://github.com/dbrattli/aioreactive .. _itertools: http://docs.python.org/3/library/itertools.html
.. _stream operators: http://aiostream.readthedocs.io/en/latest/operators.html .. _example section: http://aiostream.readthedocs.io/en/latest/examples.html
.. _iterate: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.iterate .. _preserve: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.preserve .. _just: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.just .. _call: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.call .. _throw: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.throw .. _empty: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.empty .. _never: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.never .. _repeat: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.repeat .. _range: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.range .. _count: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.count
.. _map: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.map .. _enumerate: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.enumerate .. _starmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.starmap .. _cycle: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.cycle .. _chunks: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.chunks
.. _take: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.take .. _takelast: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.takelast .. _skip: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.skip .. _skiplast: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.skiplast .. _getitem: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.getitem .. _filter: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.filter .. _until: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.until .. _takewhile: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.takewhile .. _dropwhile: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.dropwhile
.. _chain: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.chain .. _zip: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.zip .. _merge: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.merge .. _ziplatest: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.ziplatest
.. _accumulate: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.accumulate .. _reduce: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.reduce .. _list: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.list
.. _concat: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.concat .. _flatten: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.flatten .. _switch: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.switch .. _concatmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.concatmap .. _flatmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.flatmap .. _switchmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.switchmap
.. _spaceout: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.spaceout .. _delay: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.delay .. _timeout: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.timeout
.. _action: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.action .. _print: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.print
.. _aiostream package: https://pypi.org/project/aiostream/
.. |docs-badge| image:: https://readthedocs.org/projects/aiostream/badge/?version=latest :target: http://aiostream.readthedocs.io/en/latest/?badge=latest :alt: .. |cov-badge| image:: https://codecov.io/gh/vxgmichel/aiostream/branch/main/graph/badge.svg :target: https://codecov.io/gh/vxgmichel/aiostream :alt: .. |ci-badge| image:: https://github.com/vxgmichel/aiostream/workflows/CI/badge.svg :target: https://github.com/vxgmichel/aiostream/actions/workflows/ci.yml?query=branch%3Amain :alt: .. |version-badge| image:: https://img.shields.io/pypi/v/aiostream.svg :target: https://pypi.python.org/pypi/aiostream :alt: .. |pyversion-badge| image:: https://img.shields.io/pypi/pyversions/aiostream.svg :target: https://pypi.python.org/pypi/aiostream/ :alt:
For Tasks:
Click tags to check more tools for each tasksFor Jobs:
Alternative AI tools for aiostream
Similar Open Source Tools
aiostream
aiostream provides a collection of stream operators for creating asynchronous pipelines of operations. It offers features like operator pipe-lining, repeatability, safe iteration context, simplified execution, slicing and indexing, and concatenation. The stream operators are categorized into creation, transformation, selection, combination, aggregation, advanced, timing, and miscellaneous. Users can combine these operators to perform various asynchronous tasks efficiently.
bark.cpp
Bark.cpp is a C/C++ implementation of the Bark model, a real-time, multilingual text-to-speech generation model. It supports AVX, AVX2, and AVX512 for x86 architectures, and is compatible with both CPU and GPU backends. Bark.cpp also supports mixed F16/F32 precision and 4-bit, 5-bit, and 8-bit integer quantization. It can be used to generate realistic-sounding audio from text prompts.
aiohttp
aiohttp is an async http client/server framework that supports both client and server side of HTTP protocol. It also supports both client and server Web-Sockets out-of-the-box and avoids Callback Hell. aiohttp provides a Web-server with middleware and pluggable routing.
flyte-sdk
Flyte 2 SDK is a pure Python tool for type-safe, distributed orchestration of agents, ML pipelines, and more. It allows users to write data pipelines, ML training jobs, and distributed compute in Python without any DSL constraints. With features like async-first parallelism and fine-grained observability, Flyte 2 offers a seamless workflow experience. Users can leverage core concepts like TaskEnvironments for container configuration, pure Python workflows for flexibility, and async parallelism for distributed execution. Advanced features include sub-task observability with tracing and remote task execution. The tool also provides native Jupyter integration for running and monitoring workflows directly from notebooks. Configuration and deployment are made easy with configuration files and commands for deploying and running workflows. Flyte 2 is licensed under the Apache 2.0 License.
node-sdk
The ChatBotKit Node SDK is a JavaScript-based platform for building conversational AI bots and agents. It offers easy setup, serverless compatibility, modern framework support, customizability, and multi-platform deployment. With capabilities like multi-modal and multi-language support, conversation management, chat history review, custom datasets, and various integrations, this SDK enables users to create advanced chatbots for websites, mobile apps, and messaging platforms.
island-ai
island-ai is a TypeScript toolkit tailored for developers engaging with structured outputs from Large Language Models. It offers streamlined processes for handling, parsing, streaming, and leveraging AI-generated data across various applications. The toolkit includes packages like zod-stream for interfacing with LLM streams, stream-hooks for integrating streaming JSON data into React applications, and schema-stream for JSON streaming parsing based on Zod schemas. Additionally, related packages like @instructor-ai/instructor-js focus on data validation and retry mechanisms, enhancing the reliability of data processing workflows.
flutter_gen_ai_chat_ui
A modern, high-performance Flutter chat UI kit for building beautiful messaging interfaces. Features streaming text animations, markdown support, file attachments, and extensive customization options. Perfect for AI assistants, customer support, team chat, social messaging, and any conversational application. Production Ready, Cross-Platform, High Performance, Fully Customizable. Core features include dark/light mode, word-by-word streaming with animations, enhanced markdown support, speech-to-text integration, responsive layout, RTL language support, high performance message handling, improved pagination support. AI-specific features include customizable welcome message, example questions component, persistent example questions, AI typing indicators, streaming markdown rendering. New AI Actions System with function calling support, generative UI, human-in-the-loop confirmation dialogs, real-time status updates, type-safe parameters, event streaming, error handling. UI components include customizable message bubbles, custom bubble builder, multiple input field styles, loading indicators, smart scroll management, enhanced theme customization, better code block styling.
acte
Acte is a framework designed to build GUI-like tools for AI Agents. It aims to address the issues of cognitive load and freedom degrees when interacting with multiple APIs in complex scenarios. By providing a graphical user interface (GUI) for Agents, Acte helps reduce cognitive load and constraints interaction, similar to how humans interact with computers through GUIs. The tool offers APIs for starting new sessions, executing actions, and displaying screens, accessible via HTTP requests or the SessionManager class.
react-native-rag
React Native RAG is a library that enables private, local RAGs to supercharge LLMs with a custom knowledge base. It offers modular and extensible components like `LLM`, `Embeddings`, `VectorStore`, and `TextSplitter`, with multiple integration options. The library supports on-device inference, vector store persistence, and semantic search implementation. Users can easily generate text responses, manage documents, and utilize custom components for advanced use cases.
agent-tool-protocol
Agent Tool Protocol (ATP) is a production-ready, code-first protocol for AI agents to interact with external systems through secure sandboxed code execution. It enables AI agents to write and execute TypeScript/JavaScript code in a secure, sandboxed environment, allowing for parallel operations, data filtering and transformation, and familiar programming patterns. ATP provides a complete ecosystem for building production-ready AI agents with secure code execution, runtime SDK, stateless architecture, client tools for integration, provenance tracking, and compatibility with OpenAPI and MCP. It solves limitations of traditional protocols like MCP by offering open API integration, parallel execution, data processing, code flexibility, universal compatibility, reduced token usage, type safety, and being production-ready. ATP allows LLMs to write code that executes in a secure sandbox, providing the full power of a programming language while maintaining strict security boundaries.
exstruct
ExStruct is an Excel structured extraction engine that reads Excel workbooks and outputs structured data as JSON, including cells, table candidates, shapes, charts, smartart, merged cell ranges, print areas/views, auto page-break areas, and hyperlinks. It offers different output modes, formula map extraction, table detection tuning, CLI rendering options, and graceful fallback in case Excel COM is unavailable. The tool is designed to fit LLM/RAG pipelines and provides benchmark reports for accuracy and utility. It supports various formats like JSON, YAML, and TOON, with optional extras for rendering and full extraction targeting Windows + Excel environments.
polyfire-js
Polyfire is an all-in-one managed backend for AI apps that allows users to build AI apps directly from the frontend, eliminating the need for a separate backend. It simplifies the process by providing most backend services in just a few lines of code. With Polyfire, users can easily create chatbots, transcribe audio files to text, generate simple text, create a long-term memory, and generate images with Dall-E. The tool also offers starter guides and tutorials to help users get started quickly and efficiently.
simulflow
Simulflow is a Clojure framework for building real-time voice-enabled AI applications using a data-driven, functional approach. It provides a composable pipeline architecture for processing audio, text, and AI interactions with built-in support for major AI providers. The framework uses processors that communicate through specialized frames to create voice-enabled AI agents, allowing for mental multitasking and rational thought. Simulflow offers a flow-based architecture, data-first design, streaming architecture, extensibility, flexible frame system, and built-in services for seamless integration with major AI providers. Users can easily swap components, add new functionality, or debug individual stages without affecting the entire system.
evalplus
EvalPlus is a rigorous evaluation framework for LLM4Code, providing HumanEval+ and MBPP+ tests to evaluate large language models on code generation tasks. It offers precise evaluation and ranking, coding rigorousness analysis, and pre-generated code samples. Users can use EvalPlus to generate code solutions, post-process code, and evaluate code quality. The tool includes tools for code generation and test input generation using various backends.
aiogram
aiogram is a modern and fully asynchronous framework for Telegram Bot API written in Python 3.8+ using asyncio and aiohttp. It helps users create faster and more powerful bots. The framework supports features such as asynchronous operations, type hints, PyPy support, Telegram Bot API integration, router updates, Finite State Machine, magic filters, middlewares, webhook replies, and I18n/L10n support with GNU Gettext or Fluent. Prior experience with asyncio is recommended before using aiogram.
celeste-python
Celeste AI is a type-safe, modality/provider-agnostic tool that offers unified interface for various providers like OpenAI, Anthropic, Gemini, Mistral, and more. It supports multiple modalities including text, image, audio, video, and embeddings, with full Pydantic validation and IDE autocomplete. Users can switch providers instantly, ensuring zero lock-in and a lightweight architecture. The tool provides primitives, not frameworks, for clean I/O operations.
For similar tasks
aiostream
aiostream provides a collection of stream operators for creating asynchronous pipelines of operations. It offers features like operator pipe-lining, repeatability, safe iteration context, simplified execution, slicing and indexing, and concatenation. The stream operators are categorized into creation, transformation, selection, combination, aggregation, advanced, timing, and miscellaneous. Users can combine these operators to perform various asynchronous tasks efficiently.
AIOStreams
AIOStreams is a versatile tool that combines streams from various addons into one platform, offering extensive customization options. Users can change result formats, filter results by various criteria, remove duplicates, prioritize services, sort results, specify size limits, and more. The tool scrapes results from selected addons, applies user configurations, and presents the results in a unified manner. It simplifies the process of finding and accessing desired content from multiple sources, enhancing user experience and efficiency.
perplexity-mcp
Perplexity-mcp is a Model Context Protocol (MCP) server that provides web search functionality using Perplexity AI's API. It works with the Anthropic Claude desktop client. The server allows users to search the web with specific queries and filter results by recency. It implements the perplexity_search_web tool, which takes a query as a required argument and can filter results by day, week, month, or year. Users need to set up environment variables, including the PERPLEXITY_API_KEY, to use the server. The tool can be installed via Smithery and requires UV for installation. It offers various models for different contexts and can be added as an MCP server in Cursor or Claude Desktop configurations.
mage-ai
Mage is an open-source data pipeline tool for transforming and integrating data. It offers an easy developer experience, engineering best practices built-in, and data as a first-class citizen. Mage makes it easy to build, preview, and launch data pipelines, and provides observability and scaling capabilities. It supports data integrations, streaming pipelines, and dbt integration.
airbyte
Airbyte is an open-source data integration platform that makes it easy to move data from any source to any destination. With Airbyte, you can build and manage data pipelines without writing any code. Airbyte provides a library of pre-built connectors that make it easy to connect to popular data sources and destinations. You can also create your own connectors using Airbyte's no-code Connector Builder or low-code CDK. Airbyte is used by data engineers and analysts at companies of all sizes to build and manage their data pipelines.
airbyte-platform
Airbyte is an open-source data integration platform that makes it easy to move data from any source to any destination. With Airbyte, you can build and manage data pipelines without writing any code. Airbyte provides a library of pre-built connectors that make it easy to connect to popular data sources and destinations. You can also create your own connectors using Airbyte's low-code Connector Development Kit (CDK). Airbyte is used by data engineers and analysts at companies of all sizes to move data for a variety of purposes, including data warehousing, data analysis, and machine learning.
airbyte-connectors
This repository contains Airbyte connectors used in Faros and Faros Community Edition platforms as well as Airbyte Connector Development Kit (CDK) for JavaScript/TypeScript.
instill-core
Instill Core is an open-source orchestrator comprising a collection of source-available projects designed to streamline every aspect of building versatile AI features with unstructured data. It includes Instill VDP (Versatile Data Pipeline) for unstructured data, AI, and pipeline orchestration, Instill Model for scalable MLOps and LLMOps for open-source or custom AI models, and Instill Artifact for unified unstructured data management. Instill Core can be used for tasks such as building, testing, and sharing pipelines, importing, serving, fine-tuning, and monitoring ML models, and transforming documents, images, audio, and video into a unified AI-ready format.
For similar jobs
sweep
Sweep is an AI junior developer that turns bugs and feature requests into code changes. It automatically handles developer experience improvements like adding type hints and improving test coverage.
teams-ai
The Teams AI Library is a software development kit (SDK) that helps developers create bots that can interact with Teams and Microsoft 365 applications. It is built on top of the Bot Framework SDK and simplifies the process of developing bots that interact with Teams' artificial intelligence capabilities. The SDK is available for JavaScript/TypeScript, .NET, and Python.
ai-guide
This guide is dedicated to Large Language Models (LLMs) that you can run on your home computer. It assumes your PC is a lower-end, non-gaming setup.
classifai
Supercharge WordPress Content Workflows and Engagement with Artificial Intelligence. Tap into leading cloud-based services like OpenAI, Microsoft Azure AI, Google Gemini and IBM Watson to augment your WordPress-powered websites. Publish content faster while improving SEO performance and increasing audience engagement. ClassifAI integrates Artificial Intelligence and Machine Learning technologies to lighten your workload and eliminate tedious tasks, giving you more time to create original content that matters.
chatbot-ui
Chatbot UI is an open-source AI chat app that allows users to create and deploy their own AI chatbots. It is easy to use and can be customized to fit any need. Chatbot UI is perfect for businesses, developers, and anyone who wants to create a chatbot.
BricksLLM
BricksLLM is a cloud native AI gateway written in Go. Currently, it provides native support for OpenAI, Anthropic, Azure OpenAI and vLLM. BricksLLM aims to provide enterprise level infrastructure that can power any LLM production use cases. Here are some use cases for BricksLLM: * Set LLM usage limits for users on different pricing tiers * Track LLM usage on a per user and per organization basis * Block or redact requests containing PIIs * Improve LLM reliability with failovers, retries and caching * Distribute API keys with rate limits and cost limits for internal development/production use cases * Distribute API keys with rate limits and cost limits for students
uAgents
uAgents is a Python library developed by Fetch.ai that allows for the creation of autonomous AI agents. These agents can perform various tasks on a schedule or take action on various events. uAgents are easy to create and manage, and they are connected to a fast-growing network of other uAgents. They are also secure, with cryptographically secured messages and wallets.
griptape
Griptape is a modular Python framework for building AI-powered applications that securely connect to your enterprise data and APIs. It offers developers the ability to maintain control and flexibility at every step. Griptape's core components include Structures (Agents, Pipelines, and Workflows), Tasks, Tools, Memory (Conversation Memory, Task Memory, and Meta Memory), Drivers (Prompt and Embedding Drivers, Vector Store Drivers, Image Generation Drivers, Image Query Drivers, SQL Drivers, Web Scraper Drivers, and Conversation Memory Drivers), Engines (Query Engines, Extraction Engines, Summary Engines, Image Generation Engines, and Image Query Engines), and additional components (Rulesets, Loaders, Artifacts, Chunkers, and Tokenizers). Griptape enables developers to create AI-powered applications with ease and efficiency.