aiostream
Generator-based operators for asynchronous iteration
Stars: 743
aiostream provides a collection of stream operators for creating asynchronous pipelines of operations. It offers features like operator pipe-lining, repeatability, safe iteration context, simplified execution, slicing and indexing, and concatenation. The stream operators are categorized into creation, transformation, selection, combination, aggregation, advanced, timing, and miscellaneous. Users can combine these operators to perform various asynchronous tasks efficiently.
README:
|docs-badge| |cov-badge| |ci-badge| |version-badge| |pyversion-badge|
Generator-based operators for asynchronous iteration
aiostream_ provides a collection of stream operators that can be combined to create asynchronous pipelines of operations.
It can be seen as an asynchronous version of itertools_, although some aspects are slightly different. Essentially, all the provided operators return a unified interface called a stream. A stream is an enhanced asynchronous iterable providing the following features:
-
Operator pipe-lining - using pipe symbol
|
- Repeatability - every iteration creates a different iterator
-
Safe iteration context - using
async with
and thestream
method -
Simplified execution - get the last element from a stream using
await
-
Slicing and indexing - using square brackets
[]
-
Concatenation - using addition symbol
+
The stream operators
_ are separated in 7 categories:
+--------------------+---------------------------------------------------------------------------------------+ | creation | iterate_, preserve_, just_, call_, empty_, throw_, never_, repeat_, count_, range_ | +--------------------+---------------------------------------------------------------------------------------+ | transformation | map_, enumerate_, starmap_, cycle_, chunks_ | +--------------------+---------------------------------------------------------------------------------------+ | selection | take_, takelast_, skip_, skiplast_, getitem_, filter_, until_, takewhile_, dropwhile_ | +--------------------+---------------------------------------------------------------------------------------+ | combination | map_, zip_, merge_, chain_, ziplatest_ | +--------------------+---------------------------------------------------------------------------------------+ | aggregation | accumulate_, reduce_, list_ | +--------------------+---------------------------------------------------------------------------------------+ | advanced | concat_, flatten_, switch_, concatmap_, flatmap_, switchmap_ | +--------------------+---------------------------------------------------------------------------------------+ | timing | spaceout_, timeout_, delay_ | +--------------------+---------------------------------------------------------------------------------------+ | miscellaneous | action_, print_ | +--------------------+---------------------------------------------------------------------------------------+
The following example demonstrates most of the streams capabilities:
.. code:: python
import asyncio
from aiostream import stream, pipe
async def main():
# Create a counting stream with a 0.2 seconds interval
xs = stream.count(interval=0.2)
# Operators can be piped using '|'
ys = xs | pipe.map(lambda x: x**2)
# Streams can be sliced
zs = ys[1:10:2]
# Use a stream context for proper resource management
async with zs.stream() as streamer:
# Asynchronous iteration
async for z in streamer:
# Print 1, 9, 25, 49 and 81
print('->', z)
# Streams can be awaited and return the last value
print('9² = ', await zs)
# Streams can run several times
print('9² = ', await zs)
# Streams can be concatenated
one_two_three = stream.just(1) + stream.range(2, 4)
# Print [1, 2, 3]
print(await stream.list(one_two_three))
# Run main coroutine
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
More examples are available in the example section
_ of the documentation.
You can install aiostream from PyPI as the aiostream package
_.
Vincent Michel: [email protected]
.. _aiostream: https://github.com/vxgmichel/aiostream .. _PEP 525: http://www.python.org/dev/peps/pep-0525/ .. _Rx: http://reactivex.io/ .. _aioreactive: http://github.com/dbrattli/aioreactive .. _itertools: http://docs.python.org/3/library/itertools.html
.. _stream operators: http://aiostream.readthedocs.io/en/latest/operators.html .. _example section: http://aiostream.readthedocs.io/en/latest/examples.html
.. _iterate: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.iterate .. _preserve: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.preserve .. _just: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.just .. _call: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.call .. _throw: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.throw .. _empty: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.empty .. _never: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.never .. _repeat: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.repeat .. _range: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.range .. _count: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.count
.. _map: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.map .. _enumerate: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.enumerate .. _starmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.starmap .. _cycle: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.cycle .. _chunks: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.chunks
.. _take: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.take .. _takelast: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.takelast .. _skip: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.skip .. _skiplast: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.skiplast .. _getitem: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.getitem .. _filter: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.filter .. _until: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.until .. _takewhile: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.takewhile .. _dropwhile: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.dropwhile
.. _chain: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.chain .. _zip: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.zip .. _merge: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.merge .. _ziplatest: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.ziplatest
.. _accumulate: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.accumulate .. _reduce: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.reduce .. _list: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.list
.. _concat: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.concat .. _flatten: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.flatten .. _switch: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.switch .. _concatmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.concatmap .. _flatmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.flatmap .. _switchmap: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.switchmap
.. _spaceout: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.spaceout .. _delay: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.delay .. _timeout: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.timeout
.. _action: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.action .. _print: http://aiostream.readthedocs.io/en/latest/operators.html#aiostream.stream.print
.. _aiostream package: https://pypi.org/project/aiostream/
.. |docs-badge| image:: https://readthedocs.org/projects/aiostream/badge/?version=latest :target: http://aiostream.readthedocs.io/en/latest/?badge=latest :alt: .. |cov-badge| image:: https://codecov.io/gh/vxgmichel/aiostream/branch/main/graph/badge.svg :target: https://codecov.io/gh/vxgmichel/aiostream :alt: .. |ci-badge| image:: https://github.com/vxgmichel/aiostream/workflows/CI/badge.svg :target: https://github.com/vxgmichel/aiostream/actions/workflows/ci.yml?query=branch%3Amain :alt: .. |version-badge| image:: https://img.shields.io/pypi/v/aiostream.svg :target: https://pypi.python.org/pypi/aiostream :alt: .. |pyversion-badge| image:: https://img.shields.io/pypi/pyversions/aiostream.svg :target: https://pypi.python.org/pypi/aiostream/ :alt:
For Tasks:
Click tags to check more tools for each tasksFor Jobs:
Alternative AI tools for aiostream
Similar Open Source Tools
aiostream
aiostream provides a collection of stream operators for creating asynchronous pipelines of operations. It offers features like operator pipe-lining, repeatability, safe iteration context, simplified execution, slicing and indexing, and concatenation. The stream operators are categorized into creation, transformation, selection, combination, aggregation, advanced, timing, and miscellaneous. Users can combine these operators to perform various asynchronous tasks efficiently.
rpaframework
RPA Framework is an open-source collection of libraries and tools for Robotic Process Automation (RPA), designed to be used with Robot Framework and Python. It offers well-documented core libraries for Software Robot Developers, optimized for Robocorp Control Room and Developer Tools, and accepts external contributions. The project includes various libraries for tasks like archiving, browser automation, date/time manipulations, cloud services integration, encryption operations, database interactions, desktop automation, document processing, email operations, Excel manipulation, file system operations, FTP interactions, web API interactions, image manipulation, AI services, and more. The development of the repository is Python-based and requires Python version 3.8+, with tooling based on poetry and invoke for compiling, building, and running the package. The project is licensed under the Apache License 2.0.
gcloud-aio
This repository contains shared codebase for two projects: gcloud-aio and gcloud-rest. gcloud-aio is built for Python 3's asyncio, while gcloud-rest is a threadsafe requests-based implementation. It provides clients for Google Cloud services like Auth, BigQuery, Datastore, KMS, PubSub, Storage, and Task Queue. Users can install the library using pip and refer to the documentation for usage details. Developers can contribute to the project by following the contribution guide.
aiogram
aiogram is a modern and fully asynchronous framework for Telegram Bot API written in Python 3.8+ using asyncio and aiohttp. It helps users create faster and more powerful bots. The framework supports features such as asynchronous operations, type hints, PyPy support, Telegram Bot API integration, router updates, Finite State Machine, magic filters, middlewares, webhook replies, and I18n/L10n support with GNU Gettext or Fluent. Prior experience with asyncio is recommended before using aiogram.
skpro
skpro is a library for supervised probabilistic prediction in python. It provides `scikit-learn`-like, `scikit-base` compatible interfaces to: * tabular **supervised regressors for probabilistic prediction** \- interval, quantile and distribution predictions * tabular **probabilistic time-to-event and survival prediction** \- instance-individual survival distributions * **metrics to evaluate probabilistic predictions** , e.g., pinball loss, empirical coverage, CRPS, survival losses * **reductions** to turn `scikit-learn` regressors into probabilistic `skpro` regressors, such as bootstrap or conformal * building **pipelines and composite models** , including tuning via probabilistic performance metrics * symbolic **probability distributions** with value domain of `pandas.DataFrame`-s and `pandas`-like interface
agentscope
AgentScope is a multi-agent platform designed to empower developers to build multi-agent applications with large-scale models. It features three high-level capabilities: Easy-to-Use, High Robustness, and Actor-Based Distribution. AgentScope provides a list of `ModelWrapper` to support both local model services and third-party model APIs, including OpenAI API, DashScope API, Gemini API, and ollama. It also enables developers to rapidly deploy local model services using libraries such as ollama (CPU inference), Flask + Transformers, Flask + ModelScope, FastChat, and vllm. AgentScope supports various services, including Web Search, Data Query, Retrieval, Code Execution, File Operation, and Text Processing. Example applications include Conversation, Game, and Distribution. AgentScope is released under Apache License 2.0 and welcomes contributions.
dvc
DVC, or Data Version Control, is a command-line tool and VS Code extension that helps you develop reproducible machine learning projects. With DVC, you can version your data and models, iterate fast with lightweight pipelines, track experiments in your local Git repo, compare any data, code, parameters, model, or performance plots, and share experiments and automatically reproduce anyone's experiment.
langtrace
Langtrace is an open source observability software that lets you capture, debug, and analyze traces and metrics from all your applications that leverage LLM APIs, Vector Databases, and LLM-based Frameworks. It supports Open Telemetry Standards (OTEL), and the traces generated adhere to these standards. Langtrace offers both a managed SaaS version (Langtrace Cloud) and a self-hosted option. The SDKs for both Typescript/Javascript and Python are available, making it easy to integrate Langtrace into your applications. Langtrace automatically captures traces from various vendors, including OpenAI, Anthropic, Azure OpenAI, Langchain, LlamaIndex, Pinecone, and ChromaDB.
LightRAG
LightRAG is a repository hosting the code for LightRAG, a system that supports seamless integration of custom knowledge graphs, Oracle Database 23ai, Neo4J for storage, and multiple file types. It includes features like entity deletion, batch insert, incremental insert, and graph visualization. LightRAG provides an API server implementation for RESTful API access to RAG operations, allowing users to interact with it through HTTP requests. The repository also includes evaluation scripts, code for reproducing results, and a comprehensive code structure.
paperless-gpt
paperless-gpt is a tool designed to generate accurate and meaningful document titles and tags for paperless-ngx using Large Language Models (LLMs). It supports multiple LLM providers, including OpenAI and Ollama. With paperless-gpt, you can streamline your document management by automatically suggesting appropriate titles and tags based on the content of your scanned documents. The tool offers features like multiple LLM support, customizable prompts, easy integration with paperless-ngx, user-friendly interface for reviewing and applying suggestions, dockerized deployment, automatic document processing, and an experimental OCR feature.
sktime
sktime is a Python library for time series analysis that provides a unified interface for various time series learning tasks such as classification, regression, clustering, annotation, and forecasting. It offers time series algorithms and tools compatible with scikit-learn for building, tuning, and validating time series models. sktime aims to enhance the interoperability and usability of the time series analysis ecosystem by empowering users to apply algorithms across different tasks and providing interfaces to related libraries like scikit-learn, statsmodels, tsfresh, PyOD, and fbprophet.
camel
CAMEL is an open-source library designed for the study of autonomous and communicative agents. We believe that studying these agents on a large scale offers valuable insights into their behaviors, capabilities, and potential risks. To facilitate research in this field, we implement and support various types of agents, tasks, prompts, models, and simulated environments.
spark-nlp
Spark NLP is a state-of-the-art Natural Language Processing library built on top of Apache Spark. It provides simple, performant, and accurate NLP annotations for machine learning pipelines that scale easily in a distributed environment. Spark NLP comes with 36000+ pretrained pipelines and models in more than 200+ languages. It offers tasks such as Tokenization, Word Segmentation, Part-of-Speech Tagging, Named Entity Recognition, Dependency Parsing, Spell Checking, Text Classification, Sentiment Analysis, Token Classification, Machine Translation, Summarization, Question Answering, Table Question Answering, Text Generation, Image Classification, Image to Text (captioning), Automatic Speech Recognition, Zero-Shot Learning, and many more NLP tasks. Spark NLP is the only open-source NLP library in production that offers state-of-the-art transformers such as BERT, CamemBERT, ALBERT, ELECTRA, XLNet, DistilBERT, RoBERTa, DeBERTa, XLM-RoBERTa, Longformer, ELMO, Universal Sentence Encoder, Llama-2, M2M100, BART, Instructor, E5, Google T5, MarianMT, OpenAI GPT2, Vision Transformers (ViT), OpenAI Whisper, and many more not only to Python and R, but also to JVM ecosystem (Java, Scala, and Kotlin) at scale by extending Apache Spark natively.
nuitrack-sdk
Nuitrack™ is an ultimate 3D body tracking solution developed by 3DiVi Inc. It enables body motion analytics applications for virtually any widespread depth sensors and hardware platforms, supporting a wide range of applications from real-time gesture recognition on embedded platforms to large-scale multisensor analytical systems. Nuitrack provides highly-sophisticated 3D skeletal tracking, basic facial analysis, hand tracking, and gesture recognition APIs for UI control. It offers two skeletal tracking engines: classical for embedded hardware and AI for complex poses, providing a human-centric spatial understanding tool for natural and intelligent user engagement.
dora
Dataflow-oriented robotic application (dora-rs) is a framework that makes creation of robotic applications fast and simple. Building a robotic application can be summed up as bringing together hardwares, algorithms, and AI models, and make them communicate with each others. At dora-rs, we try to: make integration of hardware and software easy by supporting Python, C, C++, and also ROS2. make communication low latency by using zero-copy Arrow messages. dora-rs is still experimental and you might experience bugs, but we're working very hard to make it stable as possible.
Q-Bench
Q-Bench is a benchmark for general-purpose foundation models on low-level vision, focusing on multi-modality LLMs performance. It includes three realms for low-level vision: perception, description, and assessment. The benchmark datasets LLVisionQA and LLDescribe are collected for perception and description tasks, with open submission-based evaluation. An abstract evaluation code is provided for assessment using public datasets. The tool can be used with the datasets API for single images and image pairs, allowing for automatic download and usage. Various tasks and evaluations are available for testing MLLMs on low-level vision tasks.
For similar tasks
aiostream
aiostream provides a collection of stream operators for creating asynchronous pipelines of operations. It offers features like operator pipe-lining, repeatability, safe iteration context, simplified execution, slicing and indexing, and concatenation. The stream operators are categorized into creation, transformation, selection, combination, aggregation, advanced, timing, and miscellaneous. Users can combine these operators to perform various asynchronous tasks efficiently.
AIOStreams
AIOStreams is a versatile tool that combines streams from various addons into one platform, offering extensive customization options. Users can change result formats, filter results by various criteria, remove duplicates, prioritize services, sort results, specify size limits, and more. The tool scrapes results from selected addons, applies user configurations, and presents the results in a unified manner. It simplifies the process of finding and accessing desired content from multiple sources, enhancing user experience and efficiency.
mage-ai
Mage is an open-source data pipeline tool for transforming and integrating data. It offers an easy developer experience, engineering best practices built-in, and data as a first-class citizen. Mage makes it easy to build, preview, and launch data pipelines, and provides observability and scaling capabilities. It supports data integrations, streaming pipelines, and dbt integration.
airbyte
Airbyte is an open-source data integration platform that makes it easy to move data from any source to any destination. With Airbyte, you can build and manage data pipelines without writing any code. Airbyte provides a library of pre-built connectors that make it easy to connect to popular data sources and destinations. You can also create your own connectors using Airbyte's no-code Connector Builder or low-code CDK. Airbyte is used by data engineers and analysts at companies of all sizes to build and manage their data pipelines.
airbyte-platform
Airbyte is an open-source data integration platform that makes it easy to move data from any source to any destination. With Airbyte, you can build and manage data pipelines without writing any code. Airbyte provides a library of pre-built connectors that make it easy to connect to popular data sources and destinations. You can also create your own connectors using Airbyte's low-code Connector Development Kit (CDK). Airbyte is used by data engineers and analysts at companies of all sizes to move data for a variety of purposes, including data warehousing, data analysis, and machine learning.
airbyte-connectors
This repository contains Airbyte connectors used in Faros and Faros Community Edition platforms as well as Airbyte Connector Development Kit (CDK) for JavaScript/TypeScript.
instill-core
Instill Core is an open-source orchestrator comprising a collection of source-available projects designed to streamline every aspect of building versatile AI features with unstructured data. It includes Instill VDP (Versatile Data Pipeline) for unstructured data, AI, and pipeline orchestration, Instill Model for scalable MLOps and LLMOps for open-source or custom AI models, and Instill Artifact for unified unstructured data management. Instill Core can be used for tasks such as building, testing, and sharing pipelines, importing, serving, fine-tuning, and monitoring ML models, and transforming documents, images, audio, and video into a unified AI-ready format.
opendataeditor
The Open Data Editor (ODE) is a no-code application to explore, validate and publish data in a simple way. It is an open source project powered by the Frictionless Framework. The ODE is currently available for download and testing in beta.
For similar jobs
sweep
Sweep is an AI junior developer that turns bugs and feature requests into code changes. It automatically handles developer experience improvements like adding type hints and improving test coverage.
teams-ai
The Teams AI Library is a software development kit (SDK) that helps developers create bots that can interact with Teams and Microsoft 365 applications. It is built on top of the Bot Framework SDK and simplifies the process of developing bots that interact with Teams' artificial intelligence capabilities. The SDK is available for JavaScript/TypeScript, .NET, and Python.
ai-guide
This guide is dedicated to Large Language Models (LLMs) that you can run on your home computer. It assumes your PC is a lower-end, non-gaming setup.
classifai
Supercharge WordPress Content Workflows and Engagement with Artificial Intelligence. Tap into leading cloud-based services like OpenAI, Microsoft Azure AI, Google Gemini and IBM Watson to augment your WordPress-powered websites. Publish content faster while improving SEO performance and increasing audience engagement. ClassifAI integrates Artificial Intelligence and Machine Learning technologies to lighten your workload and eliminate tedious tasks, giving you more time to create original content that matters.
chatbot-ui
Chatbot UI is an open-source AI chat app that allows users to create and deploy their own AI chatbots. It is easy to use and can be customized to fit any need. Chatbot UI is perfect for businesses, developers, and anyone who wants to create a chatbot.
BricksLLM
BricksLLM is a cloud native AI gateway written in Go. Currently, it provides native support for OpenAI, Anthropic, Azure OpenAI and vLLM. BricksLLM aims to provide enterprise level infrastructure that can power any LLM production use cases. Here are some use cases for BricksLLM: * Set LLM usage limits for users on different pricing tiers * Track LLM usage on a per user and per organization basis * Block or redact requests containing PIIs * Improve LLM reliability with failovers, retries and caching * Distribute API keys with rate limits and cost limits for internal development/production use cases * Distribute API keys with rate limits and cost limits for students
uAgents
uAgents is a Python library developed by Fetch.ai that allows for the creation of autonomous AI agents. These agents can perform various tasks on a schedule or take action on various events. uAgents are easy to create and manage, and they are connected to a fast-growing network of other uAgents. They are also secure, with cryptographically secured messages and wallets.
griptape
Griptape is a modular Python framework for building AI-powered applications that securely connect to your enterprise data and APIs. It offers developers the ability to maintain control and flexibility at every step. Griptape's core components include Structures (Agents, Pipelines, and Workflows), Tasks, Tools, Memory (Conversation Memory, Task Memory, and Meta Memory), Drivers (Prompt and Embedding Drivers, Vector Store Drivers, Image Generation Drivers, Image Query Drivers, SQL Drivers, Web Scraper Drivers, and Conversation Memory Drivers), Engines (Query Engines, Extraction Engines, Summary Engines, Image Generation Engines, and Image Query Engines), and additional components (Rulesets, Loaders, Artifacts, Chunkers, and Tokenizers). Griptape enables developers to create AI-powered applications with ease and efficiency.