
aiostream
Generator-based operators for asynchronous iteration
Stars: 743

aiostream provides a collection of stream operators for creating asynchronous pipelines of operations. It offers features like operator pipe-lining, repeatability, safe iteration context, simplified execution, slicing and indexing, and concatenation. The stream operators are categorized into creation, transformation, selection, combination, aggregation, advanced, timing, and miscellaneous. Users can combine these operators to perform various asynchronous tasks efficiently.
README:
|docs-badge| |cov-badge| |ci-badge| |version-badge| |pyversion-badge|
Generator-based operators for asynchronous iteration
aiostream_ provides a collection of stream operators that can be combined to create asynchronous pipelines of operations.
It can be seen as an asynchronous version of itertools_, although some aspects are slightly different. Essentially, all the provided operators return a unified interface called a stream. A stream is an enhanced asynchronous iterable providing the following features:
-
Operator pipe-lining - using pipe symbol
|
- Repeatability - every iteration creates a different iterator
-
Safe iteration context - using
async with
and thestream
method -
Simplified execution - get the last element from a stream using
await
-
Slicing and indexing - using square brackets
[]
-
Concatenation - using addition symbol
+
The stream operators
_ are separated in 7 categories:
+--------------------+---------------------------------------------------------------------------------------+ | creation | iterate_, preserve_, just_, call_, empty_, throw_, never_, repeat_, count_, range_ | +--------------------+---------------------------------------------------------------------------------------+ | transformation | map_, enumerate_, starmap_, cycle_, chunks_ | +--------------------+---------------------------------------------------------------------------------------+ | selection | take_, takelast_, skip_, skiplast_, getitem_, filter_, until_, takewhile_, dropwhile_ | +--------------------+---------------------------------------------------------------------------------------+ | combination | map_, zip_, merge_, chain_, ziplatest_ | +--------------------+---------------------------------------------------------------------------------------+ | aggregation | accumulate_, reduce_, list_ | +--------------------+---------------------------------------------------------------------------------------+ | advanced | concat_, flatten_, switch_, concatmap_, flatmap_, switchmap_ | +--------------------+---------------------------------------------------------------------------------------+ | timing | spaceout_, timeout_, delay_ | +--------------------+---------------------------------------------------------------------------------------+ | miscellaneous | action_, print_ | +--------------------+---------------------------------------------------------------------------------------+
The following example demonstrates most of the streams capabilities:
.. code:: python
import asyncio
from aiostream import stream, pipe
async def main():
# Create a counting stream with a 0.2 seconds interval
xs = stream.count(interval=0.2)
# Operators can be piped using '|'
ys = xs | pipe.map(lambda x: x**2)
# Streams can be sliced
zs = ys[1:10:2]
# Use a stream context for proper resource management
async with zs.stream() as streamer:
# Asynchronous iteration
async for z in streamer:
# Print 1, 9, 25, 49 and 81
print('->', z)
# Streams can be awaited and return the last value
print('9² = ', await zs)
# Streams can run several times
print('9² = ', await zs)
# Streams can be concatenated
one_two_three = stream.just(1) + stream.range(2, 4)
# Print [1, 2, 3]
print(await stream.list(one_two_three))
# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
More examples are available in the example section
_ of the documentation.
You can install aiostream from PyPI as the aiostream package
_.
Vincent Michel: [email protected]
.. _aiostream: https://github.com/vxgmichel/aiostream .. _PEP 525: http://www.python.org/dev/peps/pep-0525/ .. _Rx: http://reactivex.io/ .. _aioreactive: http://github.com/dbrattli/aioreactive .. _itertools: http://docs.python.org/3/library/itertools.html
.. _stream operators: http://aiostream.readthedocs.io/en/latest/operators.html .. _example section: http://aiostream.readthedocs.io/en/latest/examples.html
.. _iterate: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.iterate .. _preserve: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.preserve .. _just: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.just .. _call: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.call .. _throw: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.throw .. _empty: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.empty .. _never: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.never .. _repeat: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.repeat .. _range: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.range .. _count: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.count
.. _map: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.map .. _enumerate: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.enumerate .. _starmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.starmap .. _cycle: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.cycle .. _chunks: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.chunks
.. _take: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.take .. _takelast: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.takelast .. _skip: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.skip .. _skiplast: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.skiplast .. _getitem: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.getitem .. _filter: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.filter .. _until: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.until .. _takewhile: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.takewhile .. _dropwhile: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.dropwhile
.. _chain: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.chain .. _zip: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.zip .. _merge: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.merge .. _ziplatest: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.ziplatest
.. _accumulate: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.accumulate .. _reduce: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.reduce .. _list: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.list
.. _concat: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.concat .. _flatten: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.flatten .. _switch: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.switch .. _concatmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.concatmap .. _flatmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.flatmap .. _switchmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.switchmap
.. _spaceout: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.spaceout .. _delay: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.delay .. _timeout: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.timeout
.. _action: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.action .. _print: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.print
.. _aiostream package: https://pypi.org/project/aiostream/
.. |docs-badge| image:: https://readthedocs.org/projects/aiostream/badge/?version=latest :target: http://aiostream.readthedocs.io/en/latest/?badge=latest :alt: .. |cov-badge| image:: https://codecov.io/gh/vxgmichel/aiostream/branch/main/graph/badge.svg :target: https://codecov.io/gh/vxgmichel/aiostream :alt: .. |ci-badge| image:: https://github.com/vxgmichel/aiostream/workflows/CI/badge.svg :target: https://github.com/vxgmichel/aiostream/actions/workflows/ci.yml?query=branch%3Amain :alt: .. |version-badge| image:: https://img.shields.io/pypi/v/aiostream.svg :target: https://pypi.python.org/pypi/aiostream :alt: .. |pyversion-badge| image:: https://img.shields.io/pypi/pyversions/aiostream.svg :target: https://pypi.python.org/pypi/aiostream/ :alt:
For Tasks:
Click tags to check more tools for each tasksFor Jobs:
Alternative AI tools for aiostream
Similar Open Source Tools

aiostream
aiostream provides a collection of stream operators for creating asynchronous pipelines of operations. It offers features like operator pipe-lining, repeatability, safe iteration context, simplified execution, slicing and indexing, and concatenation. The stream operators are categorized into creation, transformation, selection, combination, aggregation, advanced, timing, and miscellaneous. Users can combine these operators to perform various asynchronous tasks efficiently.

rpaframework
RPA Framework is an open-source collection of libraries and tools for Robotic Process Automation (RPA), designed to be used with Robot Framework and Python. It offers well-documented core libraries for Software Robot Developers, optimized for Robocorp Control Room and Developer Tools, and accepts external contributions. The project includes various libraries for tasks like archiving, browser automation, date/time manipulations, cloud services integration, encryption operations, database interactions, desktop automation, document processing, email operations, Excel manipulation, file system operations, FTP interactions, web API interactions, image manipulation, AI services, and more. The development of the repository is Python-based and requires Python version 3.8+, with tooling based on poetry and invoke for compiling, building, and running the package. The project is licensed under the Apache License 2.0.

dive
Dive is an AI toolkit for Go that enables the creation of specialized teams of AI agents and seamless integration with leading LLMs. It offers a CLI and APIs for easy integration, with features like creating specialized agents, hierarchical agent systems, declarative configuration, multiple LLM support, extended reasoning, model context protocol, advanced model settings, tools for agent capabilities, tool annotations, streaming, CLI functionalities, thread management, confirmation system, deep research, and semantic diff. Dive also provides semantic diff analysis, unified interface for LLM providers, tool system with annotations, custom tool creation, and support for various verified models. The toolkit is designed for developers to build AI-powered applications with rich agent capabilities and tool integrations.

arkflow
ArkFlow is a high-performance Rust stream processing engine that seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis. It supports multiple input/output sources and processors, enabling easy loading and execution of machine learning models for streaming data and inference, anomaly detection, and complex event processing. The tool is built on Rust and Tokio async runtime, offering excellent performance and low latency. It features built-in SQL queries, Python script, JSON processing, Protobuf encoding/decoding, and batch processing capabilities. ArkFlow is extensible with a modular design, making it easy to extend with new components.

evalchemy
Evalchemy is a unified and easy-to-use toolkit for evaluating language models, focusing on post-trained models. It integrates multiple existing benchmarks such as RepoBench, AlpacaEval, and ZeroEval. Key features include unified installation, parallel evaluation, simplified usage, and results management. Users can run various benchmarks with a consistent command-line interface and track results locally or integrate with a database for systematic tracking and leaderboard submission.

paperless-gpt
paperless-gpt is a tool designed to generate accurate and meaningful document titles and tags for paperless-ngx using Large Language Models (LLMs). It supports multiple LLM providers, including OpenAI and Ollama. With paperless-gpt, you can streamline your document management by automatically suggesting appropriate titles and tags based on the content of your scanned documents. The tool offers features like multiple LLM support, customizable prompts, easy integration with paperless-ngx, user-friendly interface for reviewing and applying suggestions, dockerized deployment, automatic document processing, and an experimental OCR feature.

tunacode
TunaCode CLI is an AI-powered coding assistant that provides a command-line interface for developers to enhance their coding experience. It offers features like model selection, parallel execution for faster file operations, and various commands for code management. The tool aims to improve coding efficiency and provide a seamless coding environment for developers.

llm4s
LLM4S provides a simple, robust, and scalable framework for building Large Language Models (LLM) applications in Scala. It aims to leverage Scala's type safety, functional programming, JVM ecosystem, concurrency, and performance advantages to create reliable and maintainable AI-powered applications. The framework supports multi-provider integration, execution environments, error handling, Model Context Protocol (MCP) support, agent frameworks, multimodal generation, and Retrieval-Augmented Generation (RAG) workflows. It also offers observability features like detailed trace logging, monitoring, and analytics for debugging and performance insights.

gcloud-aio
This repository contains shared codebase for two projects: gcloud-aio and gcloud-rest. gcloud-aio is built for Python 3's asyncio, while gcloud-rest is a threadsafe requests-based implementation. It provides clients for Google Cloud services like Auth, BigQuery, Datastore, KMS, PubSub, Storage, and Task Queue. Users can install the library using pip and refer to the documentation for usage details. Developers can contribute to the project by following the contribution guide.

MassGen
MassGen is a cutting-edge multi-agent system that leverages the power of collaborative AI to solve complex tasks. It assigns a task to multiple AI agents who work in parallel, observe each other's progress, and refine their approaches to converge on the best solution to deliver a comprehensive and high-quality result. The system operates through an architecture designed for seamless multi-agent collaboration, with key features including cross-model/agent synergy, parallel processing, intelligence sharing, consensus building, and live visualization. Users can install the system, configure API settings, and run MassGen for various tasks such as question answering, creative writing, research, development & coding tasks, and web automation & browser tasks. The roadmap includes plans for advanced agent collaboration, expanded model, tool & agent integration, improved performance & scalability, enhanced developer experience, and a web interface.

aiogram
aiogram is a modern and fully asynchronous framework for Telegram Bot API written in Python 3.8+ using asyncio and aiohttp. It helps users create faster and more powerful bots. The framework supports features such as asynchronous operations, type hints, PyPy support, Telegram Bot API integration, router updates, Finite State Machine, magic filters, middlewares, webhook replies, and I18n/L10n support with GNU Gettext or Fluent. Prior experience with asyncio is recommended before using aiogram.

mcp-apache-spark-history-server
The MCP Server for Apache Spark History Server is a tool that connects AI agents to Apache Spark History Server for intelligent job analysis and performance monitoring. It enables AI agents to analyze job performance, identify bottlenecks, and provide insights from Spark History Server data. The server bridges AI agents with existing Apache Spark infrastructure, allowing users to query job details, analyze performance metrics, compare multiple jobs, investigate failures, and generate insights from historical execution data.

VimLM
VimLM is an AI-powered coding assistant for Vim that integrates AI for code generation, refactoring, and documentation directly into your Vim workflow. It offers native Vim integration with split-window responses and intuitive keybindings, offline first execution with MLX-compatible models, contextual awareness with seamless integration with codebase and external resources, conversational workflow for iterating on responses, project scaffolding for generating and deploying code blocks, and extensibility for creating custom LLM workflows with command chains.

terminal-bench
Terminal Bench is a simple command-line benchmark tool for Unix-like systems. It allows users to easily compare the performance of different commands or scripts by measuring their execution time. The tool provides detailed statistics and visualizations to help users analyze the results. With Terminal Bench, users can optimize their scripts and commands for better performance and efficiency.

quantalogic
QuantaLogic is a ReAct framework for building advanced AI agents that seamlessly integrates large language models with a robust tool system. It aims to bridge the gap between advanced AI models and practical implementation in business processes by enabling agents to understand, reason about, and execute complex tasks through natural language interaction. The framework includes features such as ReAct Framework, Universal LLM Support, Secure Tool System, Real-time Monitoring, Memory Management, and Enterprise Ready components.

zotero-mcp
Zotero MCP is an open-source project that integrates AI capabilities with Zotero using the Model Context Protocol. It consists of a Zotero plugin and an MCP server, enabling AI assistants to search, retrieve, and cite references from Zotero library. The project features a unified architecture with an integrated MCP server, eliminating the need for a separate server process. It provides features like intelligent search, detailed reference information, filtering by tags and identifiers, aiding in academic tasks such as literature reviews and citation management.
For similar tasks

aiostream
aiostream provides a collection of stream operators for creating asynchronous pipelines of operations. It offers features like operator pipe-lining, repeatability, safe iteration context, simplified execution, slicing and indexing, and concatenation. The stream operators are categorized into creation, transformation, selection, combination, aggregation, advanced, timing, and miscellaneous. Users can combine these operators to perform various asynchronous tasks efficiently.

AIOStreams
AIOStreams is a versatile tool that combines streams from various addons into one platform, offering extensive customization options. Users can change result formats, filter results by various criteria, remove duplicates, prioritize services, sort results, specify size limits, and more. The tool scrapes results from selected addons, applies user configurations, and presents the results in a unified manner. It simplifies the process of finding and accessing desired content from multiple sources, enhancing user experience and efficiency.

perplexity-mcp
Perplexity-mcp is a Model Context Protocol (MCP) server that provides web search functionality using Perplexity AI's API. It works with the Anthropic Claude desktop client. The server allows users to search the web with specific queries and filter results by recency. It implements the perplexity_search_web tool, which takes a query as a required argument and can filter results by day, week, month, or year. Users need to set up environment variables, including the PERPLEXITY_API_KEY, to use the server. The tool can be installed via Smithery and requires UV for installation. It offers various models for different contexts and can be added as an MCP server in Cursor or Claude Desktop configurations.

mage-ai
Mage is an open-source data pipeline tool for transforming and integrating data. It offers an easy developer experience, engineering best practices built-in, and data as a first-class citizen. Mage makes it easy to build, preview, and launch data pipelines, and provides observability and scaling capabilities. It supports data integrations, streaming pipelines, and dbt integration.

airbyte
Airbyte is an open-source data integration platform that makes it easy to move data from any source to any destination. With Airbyte, you can build and manage data pipelines without writing any code. Airbyte provides a library of pre-built connectors that make it easy to connect to popular data sources and destinations. You can also create your own connectors using Airbyte's no-code Connector Builder or low-code CDK. Airbyte is used by data engineers and analysts at companies of all sizes to build and manage their data pipelines.

airbyte-platform
Airbyte is an open-source data integration platform that makes it easy to move data from any source to any destination. With Airbyte, you can build and manage data pipelines without writing any code. Airbyte provides a library of pre-built connectors that make it easy to connect to popular data sources and destinations. You can also create your own connectors using Airbyte's low-code Connector Development Kit (CDK). Airbyte is used by data engineers and analysts at companies of all sizes to move data for a variety of purposes, including data warehousing, data analysis, and machine learning.

airbyte-connectors
This repository contains Airbyte connectors used in Faros and Faros Community Edition platforms as well as Airbyte Connector Development Kit (CDK) for JavaScript/TypeScript.

instill-core
Instill Core is an open-source orchestrator comprising a collection of source-available projects designed to streamline every aspect of building versatile AI features with unstructured data. It includes Instill VDP (Versatile Data Pipeline) for unstructured data, AI, and pipeline orchestration, Instill Model for scalable MLOps and LLMOps for open-source or custom AI models, and Instill Artifact for unified unstructured data management. Instill Core can be used for tasks such as building, testing, and sharing pipelines, importing, serving, fine-tuning, and monitoring ML models, and transforming documents, images, audio, and video into a unified AI-ready format.
For similar jobs

sweep
Sweep is an AI junior developer that turns bugs and feature requests into code changes. It automatically handles developer experience improvements like adding type hints and improving test coverage.

teams-ai
The Teams AI Library is a software development kit (SDK) that helps developers create bots that can interact with Teams and Microsoft 365 applications. It is built on top of the Bot Framework SDK and simplifies the process of developing bots that interact with Teams' artificial intelligence capabilities. The SDK is available for JavaScript/TypeScript, .NET, and Python.

ai-guide
This guide is dedicated to Large Language Models (LLMs) that you can run on your home computer. It assumes your PC is a lower-end, non-gaming setup.

classifai
Supercharge WordPress Content Workflows and Engagement with Artificial Intelligence. Tap into leading cloud-based services like OpenAI, Microsoft Azure AI, Google Gemini and IBM Watson to augment your WordPress-powered websites. Publish content faster while improving SEO performance and increasing audience engagement. ClassifAI integrates Artificial Intelligence and Machine Learning technologies to lighten your workload and eliminate tedious tasks, giving you more time to create original content that matters.

chatbot-ui
Chatbot UI is an open-source AI chat app that allows users to create and deploy their own AI chatbots. It is easy to use and can be customized to fit any need. Chatbot UI is perfect for businesses, developers, and anyone who wants to create a chatbot.

BricksLLM
BricksLLM is a cloud native AI gateway written in Go. Currently, it provides native support for OpenAI, Anthropic, Azure OpenAI and vLLM. BricksLLM aims to provide enterprise level infrastructure that can power any LLM production use cases. Here are some use cases for BricksLLM: * Set LLM usage limits for users on different pricing tiers * Track LLM usage on a per user and per organization basis * Block or redact requests containing PIIs * Improve LLM reliability with failovers, retries and caching * Distribute API keys with rate limits and cost limits for internal development/production use cases * Distribute API keys with rate limits and cost limits for students

uAgents
uAgents is a Python library developed by Fetch.ai that allows for the creation of autonomous AI agents. These agents can perform various tasks on a schedule or take action on various events. uAgents are easy to create and manage, and they are connected to a fast-growing network of other uAgents. They are also secure, with cryptographically secured messages and wallets.

griptape
Griptape is a modular Python framework for building AI-powered applications that securely connect to your enterprise data and APIs. It offers developers the ability to maintain control and flexibility at every step. Griptape's core components include Structures (Agents, Pipelines, and Workflows), Tasks, Tools, Memory (Conversation Memory, Task Memory, and Meta Memory), Drivers (Prompt and Embedding Drivers, Vector Store Drivers, Image Generation Drivers, Image Query Drivers, SQL Drivers, Web Scraper Drivers, and Conversation Memory Drivers), Engines (Query Engines, Extraction Engines, Summary Engines, Image Generation Engines, and Image Query Engines), and additional components (Rulesets, Loaders, Artifacts, Chunkers, and Tokenizers). Griptape enables developers to create AI-powered applications with ease and efficiency.