aiostream
Generator-based operators for asynchronous iteration
Stars: 743
aiostream provides a collection of stream operators for creating asynchronous pipelines of operations. It offers features like operator pipe-lining, repeatability, safe iteration context, simplified execution, slicing and indexing, and concatenation. The stream operators are categorized into creation, transformation, selection, combination, aggregation, advanced, timing, and miscellaneous. Users can combine these operators to perform various asynchronous tasks efficiently.
README:
|docs-badge| |cov-badge| |ci-badge| |version-badge| |pyversion-badge|
Generator-based operators for asynchronous iteration
aiostream_ provides a collection of stream operators that can be combined to create asynchronous pipelines of operations.
It can be seen as an asynchronous version of itertools_, although some aspects are slightly different. Essentially, all the provided operators return a unified interface called a stream. A stream is an enhanced asynchronous iterable providing the following features:
-
Operator pipe-lining - using pipe symbol
|
- Repeatability - every iteration creates a different iterator
-
Safe iteration context - using
async with
and thestream
method -
Simplified execution - get the last element from a stream using
await
-
Slicing and indexing - using square brackets
[]
-
Concatenation - using addition symbol
+
The stream operators
_ are separated in 7 categories:
+--------------------+---------------------------------------------------------------------------------------+ | creation | iterate_, preserve_, just_, call_, empty_, throw_, never_, repeat_, count_, range_ | +--------------------+---------------------------------------------------------------------------------------+ | transformation | map_, enumerate_, starmap_, cycle_, chunks_ | +--------------------+---------------------------------------------------------------------------------------+ | selection | take_, takelast_, skip_, skiplast_, getitem_, filter_, until_, takewhile_, dropwhile_ | +--------------------+---------------------------------------------------------------------------------------+ | combination | map_, zip_, merge_, chain_, ziplatest_ | +--------------------+---------------------------------------------------------------------------------------+ | aggregation | accumulate_, reduce_, list_ | +--------------------+---------------------------------------------------------------------------------------+ | advanced | concat_, flatten_, switch_, concatmap_, flatmap_, switchmap_ | +--------------------+---------------------------------------------------------------------------------------+ | timing | spaceout_, timeout_, delay_ | +--------------------+---------------------------------------------------------------------------------------+ | miscellaneous | action_, print_ | +--------------------+---------------------------------------------------------------------------------------+
The following example demonstrates most of the streams capabilities:
.. code:: python
import asyncio
from aiostream import stream, pipe
async def main():
# Create a counting stream with a 0.2 seconds interval
xs = stream.count(interval=0.2)
# Operators can be piped using '|'
ys = xs | pipe.map(lambda x: x**2)
# Streams can be sliced
zs = ys[1:10:2]
# Use a stream context for proper resource management
async with zs.stream() as streamer:
# Asynchronous iteration
async for z in streamer:
# Print 1, 9, 25, 49 and 81
print('->', z)
# Streams can be awaited and return the last value
print('9² = ', await zs)
# Streams can run several times
print('9² = ', await zs)
# Streams can be concatenated
one_two_three = stream.just(1) + stream.range(2, 4)
# Print [1, 2, 3]
print(await stream.list(one_two_three))
# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
More examples are available in the example section
_ of the documentation.
You can install aiostream from PyPI as the aiostream package
_.
Vincent Michel: [email protected]
.. _aiostream: https://github.com/vxgmichel/aiostream .. _PEP 525: http://www.python.org/dev/peps/pep-0525/ .. _Rx: http://reactivex.io/ .. _aioreactive: http://github.com/dbrattli/aioreactive .. _itertools: http://docs.python.org/3/library/itertools.html
.. _stream operators: http://aiostream.readthedocs.io/en/latest/operators.html .. _example section: http://aiostream.readthedocs.io/en/latest/examples.html
.. _iterate: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.iterate .. _preserve: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.preserve .. _just: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.just .. _call: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.call .. _throw: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.throw .. _empty: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.empty .. _never: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.never .. _repeat: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.repeat .. _range: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.range .. _count: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.count
.. _map: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.map .. _enumerate: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.enumerate .. _starmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.starmap .. _cycle: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.cycle .. _chunks: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.chunks
.. _take: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.take .. _takelast: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.takelast .. _skip: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.skip .. _skiplast: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.skiplast .. _getitem: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.getitem .. _filter: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.filter .. _until: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.until .. _takewhile: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.takewhile .. _dropwhile: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.dropwhile
.. _chain: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.chain .. _zip: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.zip .. _merge: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.merge .. _ziplatest: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.ziplatest
.. _accumulate: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.accumulate .. _reduce: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.reduce .. _list: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.list
.. _concat: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.concat .. _flatten: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.flatten .. _switch: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.switch .. _concatmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.concatmap .. _flatmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.flatmap .. _switchmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.switchmap
.. _spaceout: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.spaceout .. _delay: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.delay .. _timeout: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.timeout
.. _action: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.action .. _print: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.print
.. _aiostream package: https://pypi.org/project/aiostream/
.. |docs-badge| image:: https://readthedocs.org/projects/aiostream/badge/?version=latest :target: http://aiostream.readthedocs.io/en/latest/?badge=latest :alt: .. |cov-badge| image:: https://codecov.io/gh/vxgmichel/aiostream/branch/main/graph/badge.svg :target: https://codecov.io/gh/vxgmichel/aiostream :alt: .. |ci-badge| image:: https://github.com/vxgmichel/aiostream/workflows/CI/badge.svg :target: https://github.com/vxgmichel/aiostream/actions/workflows/ci.yml?query=branch%3Amain :alt: .. |version-badge| image:: https://img.shields.io/pypi/v/aiostream.svg :target: https://pypi.python.org/pypi/aiostream :alt: .. |pyversion-badge| image:: https://img.shields.io/pypi/pyversions/aiostream.svg :target: https://pypi.python.org/pypi/aiostream/ :alt:
For Tasks:
Click tags to check more tools for each tasksFor Jobs:
Alternative AI tools for aiostream
Similar Open Source Tools
aiostream
aiostream provides a collection of stream operators for creating asynchronous pipelines of operations. It offers features like operator pipe-lining, repeatability, safe iteration context, simplified execution, slicing and indexing, and concatenation. The stream operators are categorized into creation, transformation, selection, combination, aggregation, advanced, timing, and miscellaneous. Users can combine these operators to perform various asynchronous tasks efficiently.
rpaframework
RPA Framework is an open-source collection of libraries and tools for Robotic Process Automation (RPA), designed to be used with Robot Framework and Python. It offers well-documented core libraries for Software Robot Developers, optimized for Robocorp Control Room and Developer Tools, and accepts external contributions. The project includes various libraries for tasks like archiving, browser automation, date/time manipulations, cloud services integration, encryption operations, database interactions, desktop automation, document processing, email operations, Excel manipulation, file system operations, FTP interactions, web API interactions, image manipulation, AI services, and more. The development of the repository is Python-based and requires Python version 3.8+, with tooling based on poetry and invoke for compiling, building, and running the package. The project is licensed under the Apache License 2.0.
gcloud-aio
This repository contains shared codebase for two projects: gcloud-aio and gcloud-rest. gcloud-aio is built for Python 3's asyncio, while gcloud-rest is a threadsafe requests-based implementation. It provides clients for Google Cloud services like Auth, BigQuery, Datastore, KMS, PubSub, Storage, and Task Queue. Users can install the library using pip and refer to the documentation for usage details. Developers can contribute to the project by following the contribution guide.
aiogram
aiogram is a modern and fully asynchronous framework for Telegram Bot API written in Python 3.8+ using asyncio and aiohttp. It helps users create faster and more powerful bots. The framework supports features such as asynchronous operations, type hints, PyPy support, Telegram Bot API integration, router updates, Finite State Machine, magic filters, middlewares, webhook replies, and I18n/L10n support with GNU Gettext or Fluent. Prior experience with asyncio is recommended before using aiogram.
PraisonAI
Praison AI is a low-code, centralised framework that simplifies the creation and orchestration of multi-agent systems for various LLM applications. It emphasizes ease of use, customization, and human-agent interaction. The tool leverages AutoGen and CrewAI frameworks to facilitate the development of AI-generated scripts and movie concepts. Users can easily create, run, test, and deploy agents for scriptwriting and movie concept development. Praison AI also provides options for full automatic mode and integration with OpenAI models for enhanced AI capabilities.
skpro
skpro is a library for supervised probabilistic prediction in python. It provides `scikit-learn`-like, `scikit-base` compatible interfaces to: * tabular **supervised regressors for probabilistic prediction** \- interval, quantile and distribution predictions * tabular **probabilistic time-to-event and survival prediction** \- instance-individual survival distributions * **metrics to evaluate probabilistic predictions** , e.g., pinball loss, empirical coverage, CRPS, survival losses * **reductions** to turn `scikit-learn` regressors into probabilistic `skpro` regressors, such as bootstrap or conformal * building **pipelines and composite models** , including tuning via probabilistic performance metrics * symbolic **probability distributions** with value domain of `pandas.DataFrame`-s and `pandas`-like interface
dvc
DVC, or Data Version Control, is a command-line tool and VS Code extension that helps you develop reproducible machine learning projects. With DVC, you can version your data and models, iterate fast with lightweight pipelines, track experiments in your local Git repo, compare any data, code, parameters, model, or performance plots, and share experiments and automatically reproduce anyone's experiment.
aio-pika
Aio-pika is a wrapper around aiormq for asyncio and humans. It provides a completely asynchronous API, object-oriented API, transparent auto-reconnects with complete state recovery, Python 3.7+ compatibility, transparent publisher confirms support, transactions support, and complete type-hints coverage.
MegaParse
MegaParse is a powerful and versatile parser designed to handle various types of documents such as text, PDFs, Powerpoint presentations, and Word documents with no information loss. It is fast, efficient, and open source, supporting a wide range of file formats. MegaParse ensures compatibility with tables, table of contents, headers, footers, and images, making it a comprehensive solution for document parsing.
aiohttp
aiohttp is an async http client/server framework that supports both client and server side of HTTP protocol. It also supports both client and server Web-Sockets out-of-the-box and avoids Callback Hell. aiohttp provides a Web-server with middleware and pluggable routing.
cortex.cpp
Cortex is a C++ AI engine with a Docker-like command-line interface and client libraries. It supports running AI models using ONNX, TensorRT-LLM, and llama.cpp engines. Cortex can function as a standalone server or be integrated as a library. The tool provides support for various engines and models, allowing users to easily deploy and interact with AI models. It offers a range of CLI commands for managing models, embeddings, and engines, as well as a REST API for interacting with models. Cortex is designed to simplify the deployment and usage of AI models in C++ applications.
sktime
sktime is a Python library for time series analysis that provides a unified interface for various time series learning tasks such as classification, regression, clustering, annotation, and forecasting. It offers time series algorithms and tools compatible with scikit-learn for building, tuning, and validating time series models. sktime aims to enhance the interoperability and usability of the time series analysis ecosystem by empowering users to apply algorithms across different tasks and providing interfaces to related libraries like scikit-learn, statsmodels, tsfresh, PyOD, and fbprophet.
EasyEdit
EasyEdit is a Python package for edit Large Language Models (LLM) like `GPT-J`, `Llama`, `GPT-NEO`, `GPT2`, `T5`(support models from **1B** to **65B**), the objective of which is to alter the behavior of LLMs efficiently within a specific domain without negatively impacting performance across other inputs. It is designed to be easy to use and easy to extend.
Liger-Kernel
Liger Kernel is a collection of Triton kernels designed for LLM training, increasing training throughput by 20% and reducing memory usage by 60%. It includes Hugging Face Compatible modules like RMSNorm, RoPE, SwiGLU, CrossEntropy, and FusedLinearCrossEntropy. The tool works with Flash Attention, PyTorch FSDP, and Microsoft DeepSpeed, aiming to enhance model efficiency and performance for researchers, ML practitioners, and curious novices.
Q-Bench
Q-Bench is a benchmark for general-purpose foundation models on low-level vision, focusing on multi-modality LLMs performance. It includes three realms for low-level vision: perception, description, and assessment. The benchmark datasets LLVisionQA and LLDescribe are collected for perception and description tasks, with open submission-based evaluation. An abstract evaluation code is provided for assessment using public datasets. The tool can be used with the datasets API for single images and image pairs, allowing for automatic download and usage. Various tasks and evaluations are available for testing MLLMs on low-level vision tasks.
StableToolBench
StableToolBench is a new benchmark developed to address the instability of Tool Learning benchmarks. It aims to balance stability and reality by introducing features such as a Virtual API System with caching and API simulators, a new set of solvable queries determined by LLMs, and a Stable Evaluation System using GPT-4. The Virtual API Server can be set up either by building from source or using a prebuilt Docker image. Users can test the server using provided scripts and evaluate models with Solvable Pass Rate and Solvable Win Rate metrics. The tool also includes model experiments results comparing different models' performance.
For similar tasks
aiostream
aiostream provides a collection of stream operators for creating asynchronous pipelines of operations. It offers features like operator pipe-lining, repeatability, safe iteration context, simplified execution, slicing and indexing, and concatenation. The stream operators are categorized into creation, transformation, selection, combination, aggregation, advanced, timing, and miscellaneous. Users can combine these operators to perform various asynchronous tasks efficiently.
mage-ai
Mage is an open-source data pipeline tool for transforming and integrating data. It offers an easy developer experience, engineering best practices built-in, and data as a first-class citizen. Mage makes it easy to build, preview, and launch data pipelines, and provides observability and scaling capabilities. It supports data integrations, streaming pipelines, and dbt integration.
airbyte
Airbyte is an open-source data integration platform that makes it easy to move data from any source to any destination. With Airbyte, you can build and manage data pipelines without writing any code. Airbyte provides a library of pre-built connectors that make it easy to connect to popular data sources and destinations. You can also create your own connectors using Airbyte's no-code Connector Builder or low-code CDK. Airbyte is used by data engineers and analysts at companies of all sizes to build and manage their data pipelines.
airbyte-platform
Airbyte is an open-source data integration platform that makes it easy to move data from any source to any destination. With Airbyte, you can build and manage data pipelines without writing any code. Airbyte provides a library of pre-built connectors that make it easy to connect to popular data sources and destinations. You can also create your own connectors using Airbyte's low-code Connector Development Kit (CDK). Airbyte is used by data engineers and analysts at companies of all sizes to move data for a variety of purposes, including data warehousing, data analysis, and machine learning.
airbyte-connectors
This repository contains Airbyte connectors used in Faros and Faros Community Edition platforms as well as Airbyte Connector Development Kit (CDK) for JavaScript/TypeScript.
instill-core
Instill Core is an open-source orchestrator comprising a collection of source-available projects designed to streamline every aspect of building versatile AI features with unstructured data. It includes Instill VDP (Versatile Data Pipeline) for unstructured data, AI, and pipeline orchestration, Instill Model for scalable MLOps and LLMOps for open-source or custom AI models, and Instill Artifact for unified unstructured data management. Instill Core can be used for tasks such as building, testing, and sharing pipelines, importing, serving, fine-tuning, and monitoring ML models, and transforming documents, images, audio, and video into a unified AI-ready format.
opendataeditor
The Open Data Editor (ODE) is a no-code application to explore, validate and publish data in a simple way. It is an open source project powered by the Frictionless Framework. The ODE is currently available for download and testing in beta.
commonplace-bot
Commonplace Bot is a modern representation of the commonplace book, leveraging modern technological advancements in computation, data storage, machine learning, and networking. It aims to capture, engage, and share knowledge by providing a platform for users to collect ideas, quotes, and information, organize them efficiently, engage with the data through various strategies and triggers, and transform the data into new mediums for sharing. The tool utilizes embeddings and cached transformations for efficient data storage and retrieval, flips traditional engagement rules by engaging with the user, and enables users to alchemize raw data into new forms like art prompts. Commonplace Bot offers a unique approach to knowledge management and creative expression.
For similar jobs
sweep
Sweep is an AI junior developer that turns bugs and feature requests into code changes. It automatically handles developer experience improvements like adding type hints and improving test coverage.
teams-ai
The Teams AI Library is a software development kit (SDK) that helps developers create bots that can interact with Teams and Microsoft 365 applications. It is built on top of the Bot Framework SDK and simplifies the process of developing bots that interact with Teams' artificial intelligence capabilities. The SDK is available for JavaScript/TypeScript, .NET, and Python.
ai-guide
This guide is dedicated to Large Language Models (LLMs) that you can run on your home computer. It assumes your PC is a lower-end, non-gaming setup.
classifai
Supercharge WordPress Content Workflows and Engagement with Artificial Intelligence. Tap into leading cloud-based services like OpenAI, Microsoft Azure AI, Google Gemini and IBM Watson to augment your WordPress-powered websites. Publish content faster while improving SEO performance and increasing audience engagement. ClassifAI integrates Artificial Intelligence and Machine Learning technologies to lighten your workload and eliminate tedious tasks, giving you more time to create original content that matters.
chatbot-ui
Chatbot UI is an open-source AI chat app that allows users to create and deploy their own AI chatbots. It is easy to use and can be customized to fit any need. Chatbot UI is perfect for businesses, developers, and anyone who wants to create a chatbot.
BricksLLM
BricksLLM is a cloud native AI gateway written in Go. Currently, it provides native support for OpenAI, Anthropic, Azure OpenAI and vLLM. BricksLLM aims to provide enterprise level infrastructure that can power any LLM production use cases. Here are some use cases for BricksLLM: * Set LLM usage limits for users on different pricing tiers * Track LLM usage on a per user and per organization basis * Block or redact requests containing PIIs * Improve LLM reliability with failovers, retries and caching * Distribute API keys with rate limits and cost limits for internal development/production use cases * Distribute API keys with rate limits and cost limits for students
uAgents
uAgents is a Python library developed by Fetch.ai that allows for the creation of autonomous AI agents. These agents can perform various tasks on a schedule or take action on various events. uAgents are easy to create and manage, and they are connected to a fast-growing network of other uAgents. They are also secure, with cryptographically secured messages and wallets.
griptape
Griptape is a modular Python framework for building AI-powered applications that securely connect to your enterprise data and APIs. It offers developers the ability to maintain control and flexibility at every step. Griptape's core components include Structures (Agents, Pipelines, and Workflows), Tasks, Tools, Memory (Conversation Memory, Task Memory, and Meta Memory), Drivers (Prompt and Embedding Drivers, Vector Store Drivers, Image Generation Drivers, Image Query Drivers, SQL Drivers, Web Scraper Drivers, and Conversation Memory Drivers), Engines (Query Engines, Extraction Engines, Summary Engines, Image Generation Engines, and Image Query Engines), and additional components (Rulesets, Loaders, Artifacts, Chunkers, and Tokenizers). Griptape enables developers to create AI-powered applications with ease and efficiency.