
aioreactive
Async/await reactive tools for Python 3.10+
Stars: 379

Aioreactive is a Python library that brings ReactiveX functionality to asyncio using async and await. It is built on the Expression functional library and aims to provide a simple, clean, and async-based approach to reactive programming in Python. The library supports Python 3.10+ and focuses on using plain old functions for operators, running on the asyncio event loop, and providing implicit synchronous back-pressure for event processing.
README:
NEWS: Project rebooted Nov. 2020. Rebuilt using Expression.
Aioreactive is RxPY for asyncio. It's an asynchronous and reactive Python library for asyncio using async and await. Aioreactive is built on the Expression functional library and, integrates naturally with the Python language.
aioreactive is the unification of RxPY and reactive programming with asyncio using async and await.
- Python 3.10+ only. We have a hard dependency Expression v5.
- All operators and tools are implemented as plain old functions.
- Everything is
async
. Sending values is async, subscribing to observables is async. Disposing subscriptions is async. - One scheduler to rule them all. Everything runs on the asyncio base event-loop.
- No multi-threading. Only async and await with concurrency using
asyncio. Threads are hard, and in many cases it doesn’t make sense to
use multi-threading in Python applications. If you need to use threads
you may wrap them with
concurrent.futures
and compose them into the chain withflat_map()
or similar. Seeparallel.py
for an example. - Simple, clean and use few abstractions. Try to align with the itertools package, and reuse as much from the Python standard library as possible.
- Support type hints and static type checking using Pylance.
- Implicit synchronous back-pressure ™. Producers of events will simply be awaited until the event can be processed by the down-stream consumers.
With aioreactive you subscribe observers to observables, and the key abstractions of aioreactive can be seen in this single line of code:
subscription = await observable.subscribe_async(observer)
The difference from RxPY can be seen with the await
expression.
Aioreactive is built around the asynchronous duals, or opposites of the
AsyncIterable and AsyncIterator abstract base classes. These async
classes are called AsyncObservable and AsyncObserver.
AsyncObservable is a producer of events. It may be seen as the dual or
opposite of AsyncIterable and provides a single setter method called
subscribe_async()
that is the dual of the __aiter__()
getter method:
from abc import ABC, abstractmethod
class AsyncObservable(ABC):
@abstractmethod
async def subscribe_async(self, observer):
return NotImplemented
AsyncObserver is a consumer of events and is modeled after the
so-called consumer interface, the
enhanced generator interface in
PEP-342 and async
generators in PEP-525. It
is the dual of the AsyncIterator __anext__()
method, and expands to
three async methods asend()
, that is the opposite of __anext__()
,
athrow()
that is the opposite of an raise Exception()
and aclose()
that is the opposite of raise StopAsyncIteration
:
from abc import ABC, abstractmethod
class AsyncObserver(ABC):
@abstractmethod
async def asend(self, value):
return NotImplemented
@abstractmethod
async def athrow(self, error):
return NotImplemented
@abstractmethod
async def aclose(self):
return NotImplemented
An observable becomes hot and starts streaming items by using the
subscribe_async()
method. The subscribe_async()
method takes an
observable and returns a disposable subscription. So the
subscribe_async()
method is used to attach a observer to the
observable.
async def asend(value):
print(value)
disposable = await subscribe_async(source, AsyncAnonymousObserver(asend))
AsyncAnonymousObserver
is an anonymous observer that constructs an
AsyncObserver
out of plain async functions, so you don't have to
implement a new named observer every time you need one.
The subscription returned by subscribe_async()
is disposable, so to
unsubscribe you need to await the dispose_async()
method on the
subscription.
await subscription.dispose_async()
Even more interesting, with to_async_iterable
you can flip around from
AsyncObservable
to an AsyncIterable
and use async-for
to consume
the stream of events.
import aioreactive as rx
xs = rx.from_iterable([1, 2, 3])
async for x in xs:
print(x)
They effectively transform us from an async push model to an async pull
model, and lets us use the awesome new language features such as async for
and async-with
. We do this without any queueing, as a push by the
AsyncObservable
will await the pull by the `AsyncIterator. This
effectively applies so-called "back-pressure" up the subscription as the
producer will await the iterator to pick up the item send.
The for-loop may be wrapped with async-with to control the lifetime of the subscription:
import aioreactive as rx
xs = rx.from_iterable([1, 2, 3])
result = []
obv = rx.AsyncIteratorObserver(xs)
async with await xs.subscribe_async(obv) as subscription:
async for x in obv:
result.append(x)
assert result == [1, 2, 3]
An async stream is both an async observer and an async observable. Aioreactive lets you create streams explicitly.
import aioreactive as rx
stream = AsyncSubject() # Alias for AsyncMultiStream
sink = rx.AsyncAnonymousObserver()
await stream.subscribe_async(sink)
await stream.asend(42)
You can create streams directly from AsyncMultiStream
or
AsyncSingleStream
. AsyncMultiStream
supports multiple observers, and
is hot in the sense that it will drop any event that is sent if there
are currently no observers attached. AsyncSingleStream
on the other
hand supports a single observer, and is cold in the sense that it will
await any producer until there is an observer attached.
The Rx operators in aioreactive are all plain old functions. You can apply them to an observable and compose it into a transformed, filtered, aggregated or combined observable. This transformed observable can be streamed into an observer.
Observable -> Operator -> Operator -> Operator -> Observer
Aioreactive contains many of the same operators as you know from RxPY. Our goal is not to implement them all, but to provide the most essential ones.
- concat -- Concatenates two or more observables.
- choose -- Filters and/or transforms the observable.
- choose_asnc -- Asynchronously filters and/or transforms the observable.
- debounce -- Throttles an observable.
- delay -- delays the items within an observable.
- distinct_until_changed -- an observable with continuously distinct values.
- filter -- filters an observable.
- filteri -- filters an observable with index.
- flat_map -- transforms an observable into a stream of observables and flattens the resulting observable.
- flat_map_latest -- transforms an observable into a stream of observables and flattens the resulting observable by producing values from the latest observable.
- from_iterable -- Create an observable from an (async) iterable.
- subscribe -- Subscribes an observer to an observable. Returns a subscription.
- map -- transforms an observable.
- mapi -- transforms an observable with index.
- map_async -- transforms an observable asynchronously.
- mapi_async -- transforms an observable asynchronously with index.
- merge_inner -- Merges an observable of observables.
- merge -- Merge one observable with another observable.
- merge_seq -- Merge a sequence of observables.
- run -- Awaits the future returned by subscribe. Returns when the subscription closes.
- slice -- Slices an observable.
- skip -- Skip items from the start of the observable stream.
- skip_last -- Skip items from the end of the observable stream.
- starfilter -- Filters an observable with a predicate and spreads the arguments.
- starmap -- Transforms and async observable and spreads the arguments to the mapper.
- switch_latest -- Merges the latest stream in an observable of streams.
- take -- Take a number of items from the start of the observable stream.
- take_last -- Take a number of items from the end of the observable stream.
- unit -- Converts a value or future to an observable.
- with_latest_from -- Combines two observables into one.
With aioreactive you can choose to program functionally with plain old functions, or object-oriented with classes and methods. Aioreactive supports both method chaining or forward pipe programming styles.
AsyncObservable
may compose operators using forward pipelining with
the pipe
operator provided by the amazing
Expression library. This works
by having the operators partially applied with their arguments before
being given the source stream as the last curried argument.
ys = pipe(xs, filter(predicate), map(mapper), flat_map(request))
Longer pipelines may break lines as for binary operators:
import aioreactve as rx
async def main():
stream = rx.AsyncSubject()
obv = rx.AsyncIteratorObserver()
xs = pipe(
stream,
rx.map(lambda x: x["term"]),
rx.filter(lambda text: len(text) > 2),
rx.debounce(0.75),
rx.distinct_until_changed(),
rx.map(search_wikipedia),
rx.switch_latest(),
)
async with xs.subscribe_async(obv) as ys
async for value in obv:
print(value)
AsyncObservable also supports slicing using the Python slice notation.
@pytest.mark.asyncio
async def test_slice_special():
xs = rx.from_iterable([1, 2, 3, 4, 5])
values = []
async def asend(value):
values.append(value)
ys = xs[1:-1]
result = await run(ys, AsyncAnonymousObserver(asend))
assert result == 4
assert values == [2, 3, 4]
An alternative to pipelining is to use the classic and fluent method chaining as we know from ReactiveX.
An AsyncObservable
created from class methods such as
AsyncRx.from_iterable()
returns a AsyncChainedObservable
.
where we may use methods such as .filter()
and .map()
.
from aioreactive import AsyncRx
@pytest.mark.asyncio
async def test_observable_simple_pipe():
xs = AsyncRx.from_iterable([1, 2, 3])
result = []
async def mapper(value):
await asyncio.sleep(0.1)
return value * 10
async def predicate(value):
await asyncio.sleep(0.1)
return value > 1
ys = xs.filter(predicate).map(mapper)
async def on_next(value):
result.append(value)
subscription = await ys.subscribe_async(AsyncAnonymousObserver(on_next))
await subsubscription
assert result == [20, 30]
Aioreactive also provides a virtual time event loop
(VirtualTimeEventLoop
) that enables you to write asyncio unit-tests
that run in virtual time. Virtual time means that time is emulated, so
tests run as quickly as possible even if they sleep or awaits long-lived
operations. A test using virtual time still gives the same result as it
would have done if it had been run in real-time.
For example the following test still gives the correct result even if it takes 0 seconds to run:
@pytest.fixture()
def event_loop():
loop = VirtualTimeEventLoop()
yield loop
loop.close()
@pytest.mark.asyncio
async def test_call_later():
result = []
def action(value):
result.append(value)
loop = asyncio.get_event_loop()
loop.call_later(10, partial(action, 1))
loop.call_later(1, partial(action, 2))
loop.call_later(5, partial(action, 3))
await asyncio.sleep(10)
assert result == [2, 3, 1]
The aioreactive testing module provides a test AsyncSubject
that may
delay sending values, and a test AsyncTestObserver
that records all
events. These two classes helps you with testing in virtual time.
@pytest.fixture()
def event_loop():
loop = VirtualTimeEventLoop()
yield loop
loop.close()
@pytest.mark.asyncio
async def test_delay_done():
xs = AsyncTestSubject() # Test stream
ys = pipe(xs, rx.delay(1.0))
obv = AsyncTestObserver() # Test AsyncAnonymousObserver
async with await ys.subscribe_async(obv):
await xs.asend_later(0, 10)
await xs.asend_later(1.0, 20)
await xs.aclose_later(1.0)
await obv
assert obv.values == [
(ca(1), OnNext(10)),
(ca(2), OnNext(20)),
(ca(3), OnCompleted()),
]
AsyncIterable
and AsyncObservable
are closely related (in fact they
are duals). AsyncIterable
is an async iterable (pull) world, while
AsyncObservable
is an async reactive (push) based world. There are
many operations such as map()
and filter()
that may be simpler to
implement using AsyncIterable
, but once we start to include time, then
AsyncObservable
really starts to shine. Operators such as delay()
makes much more sense for AsyncObservable
than for AsyncIterable
.
However, aioreactive makes it easy for you to flip-around to async iterable just before you need to consume the stream, thus giving you the best of both worlds.
Aioreactive will not replace RxPY.
RxPY is an implementation of Observable
. Aioreactive is an
implementation of AsyncObservable
.
Rx and RxPY has hundreds of different query operators, and we currently have no plans to implementing all of them for aioreactive.
Many ideas from aioreactive have already been ported back into "classic" RxPY.
Aioreactive was inspired by:
- AsyncRx - Aioreactive is a direct port of AsyncRx from F#.
- Expression - Functional programming for Python.
- Is it really Pythonic to continue using LINQ operators instead of plain old functions?
- Reactive Extensions (Rx) and RxPY.
- Dart Streams
- Underscore.js.
- itertools and functools.
- dbrattli/OSlash
- kriskowal/q.
The MIT License (MIT) Copyright (c) 2016 Børge Lanes, Dag Brattli.
For Tasks:
Click tags to check more tools for each tasksFor Jobs:
Alternative AI tools for aioreactive
Similar Open Source Tools

aioreactive
Aioreactive is a Python library that brings ReactiveX functionality to asyncio using async and await. It is built on the Expression functional library and aims to provide a simple, clean, and async-based approach to reactive programming in Python. The library supports Python 3.10+ and focuses on using plain old functions for operators, running on the asyncio event loop, and providing implicit synchronous back-pressure for event processing.

LLMUnity
LLM for Unity enables seamless integration of Large Language Models (LLMs) within the Unity engine, allowing users to create intelligent characters for immersive player interactions. The tool supports major LLM models, runs locally without internet access, offers fast inference on CPU and GPU, and is easy to set up with a single line of code. It is free for both personal and commercial use, tested on Unity 2021 LTS, 2022 LTS, and 2023. Users can build multiple AI characters efficiently, use remote servers for processing, and customize model settings for text generation.

generative-models
Generative Models by Stability AI is a repository that provides various generative models for research purposes. It includes models like Stable Video 4D (SV4D) for video synthesis, Stable Video 3D (SV3D) for multi-view synthesis, SDXL-Turbo for text-to-image generation, and more. The repository focuses on modularity and implements a config-driven approach for building and combining submodules. It supports training with PyTorch Lightning and offers inference demos for different models. Users can access pre-trained models like SDXL-base-1.0 and SDXL-refiner-1.0 under a CreativeML Open RAIL++-M license. The codebase also includes tools for invisible watermark detection in generated images.

lantern
Lantern is an open-source PostgreSQL database extension designed to store vector data, generate embeddings, and handle vector search operations efficiently. It introduces a new index type called 'lantern_hnsw' for vector columns, which speeds up 'ORDER BY ... LIMIT' queries. Lantern utilizes the state-of-the-art HNSW implementation called usearch. Users can easily install Lantern using Docker, Homebrew, or precompiled binaries. The tool supports various distance functions, index construction parameters, and operator classes for efficient querying. Lantern offers features like embedding generation, interoperability with pgvector, parallel index creation, and external index graph generation. It aims to provide superior performance metrics compared to other similar tools and has a roadmap for future enhancements such as cloud-hosted version, hardware-accelerated distance metrics, industry-specific application templates, and support for version control and A/B testing of embeddings.

OllamaSharp
OllamaSharp is a .NET binding for the Ollama API, providing an intuitive API client to interact with Ollama. It offers support for all Ollama API endpoints, real-time streaming, progress reporting, and an API console for remote management. Users can easily set up the client, list models, pull models with progress feedback, stream completions, and build interactive chats. The project includes a demo console for exploring and managing the Ollama host.

DeepPavlov
DeepPavlov is an open-source conversational AI library built on PyTorch. It is designed for the development of production-ready chatbots and complex conversational systems, as well as for research in the area of NLP and dialog systems. The library offers a wide range of models for tasks such as Named Entity Recognition, Intent/Sentence Classification, Question Answering, Sentence Similarity/Ranking, Syntactic Parsing, and more. DeepPavlov also provides embeddings like BERT, ELMo, and FastText for various languages, along with AutoML capabilities and integrations with REST API, Socket API, and Amazon AWS.

ellmer
ellmer is a tool that facilitates the use of large language models (LLM) from R. It supports various LLM providers and offers features such as streaming outputs, tool/function calling, and structured data extraction. Users can interact with ellmer in different ways, including interactive chat console, interactive method call, and programmatic chat. The tool provides support for multiple model providers and offers recommendations for different use cases, such as exploration or organizational use.

lhotse
Lhotse is a Python library designed to make speech and audio data preparation flexible and accessible. It aims to attract a wider community to speech processing tasks by providing a Python-centric design and an expressive command-line interface. Lhotse offers standard data preparation recipes, PyTorch Dataset classes for speech tasks, and efficient data preparation for model training with audio cuts. It supports data augmentation, feature extraction, and feature-space cut mixing. The tool extends Kaldi's data preparation recipes with seamless PyTorch integration, human-readable text manifests, and convenient Python classes.

crb
CRB (Composable Runtime Blocks) is a unique framework that implements hybrid workloads by seamlessly combining synchronous and asynchronous activities, state machines, routines, the actor model, and supervisors. It is ideal for building massive applications and serves as a low-level framework for creating custom frameworks, such as AI-agents. The core idea is to ensure high compatibility among all blocks, enabling significant code reuse. The framework allows for the implementation of algorithms with complex branching, making it suitable for building large-scale applications or implementing complex workflows, such as AI pipelines. It provides flexibility in defining structures, implementing traits, and managing execution flow, allowing users to create robust and nonlinear algorithms easily.

AgentKit
AgentKit is a framework for constructing complex human thought processes from simple natural language prompts. It offers a unified way to represent and execute these processes as graphs, making it easy to design and tune agents without any programming experience. AgentKit can be used for a variety of tasks, including generating text, answering questions, and making decisions.

Aiwnios
Aiwnios is a HolyC Compiler/Runtime designed for 64-bit ARM, RISCV, and x86 machines, including Apple M1 Macs, with plans for supporting other architectures in the future. The project is currently a work in progress, with regular updates and improvements planned. Aiwnios includes a sockets API (currently tested on FreeBSD) and a HolyC assembler accessible through AARCH64. The heart of Aiwnios lies in `arm_backend.c`, where the compiler is located, and a powerful AARCH64 assembler in `arm64_asm.c`. The compiler uses reverse Polish notation and statements are reversed. The developer manual is intended for developers working on the C side, providing detailed explanations of the source code.

shellChatGPT
ShellChatGPT is a shell wrapper for OpenAI's ChatGPT, DALL-E, Whisper, and TTS, featuring integration with LocalAI, Ollama, Gemini, Mistral, Groq, and GitHub Models. It provides text and chat completions, vision, reasoning, and audio models, voice-in and voice-out chatting mode, text editor interface, markdown rendering support, session management, instruction prompt manager, integration with various service providers, command line completion, file picker dialogs, color scheme personalization, stdin and text file input support, and compatibility with Linux, FreeBSD, MacOS, and Termux for a responsive experience.

raft
RAFT (Reusable Accelerated Functions and Tools) is a C++ header-only template library with an optional shared library that contains fundamental widely-used algorithms and primitives for machine learning and information retrieval. The algorithms are CUDA-accelerated and form building blocks for more easily writing high performance applications.

jina
Jina is a tool that allows users to build multimodal AI services and pipelines using cloud-native technologies. It provides a Pythonic experience for serving ML models and transitioning from local deployment to advanced orchestration frameworks like Docker-Compose, Kubernetes, or Jina AI Cloud. Users can build and serve models for any data type and deep learning framework, design high-performance services with easy scaling, serve LLM models while streaming their output, integrate with Docker containers via Executor Hub, and host on CPU/GPU using Jina AI Cloud. Jina also offers advanced orchestration and scaling capabilities, a smooth transition to the cloud, and easy scalability and concurrency features for applications. Users can deploy to their own cloud or system with Kubernetes and Docker Compose integration, and even deploy to JCloud for autoscaling and monitoring.

rtdl-num-embeddings
This repository provides the official implementation of the paper 'On Embeddings for Numerical Features in Tabular Deep Learning'. It focuses on transforming scalar continuous features into vectors before integrating them into the main backbone of tabular neural networks, showcasing improved performance. The embeddings for continuous features are shown to enhance the performance of tabular DL models and are applicable to various conventional backbones, offering efficiency comparable to Transformer-based models. The repository includes Python packages for practical usage, exploration of metrics and hyperparameters, and reproducing reported results for different algorithms and datasets.

storm
STORM is a LLM system that writes Wikipedia-like articles from scratch based on Internet search. While the system cannot produce publication-ready articles that often require a significant number of edits, experienced Wikipedia editors have found it helpful in their pre-writing stage. **Try out our [live research preview](https://storm.genie.stanford.edu/) to see how STORM can help your knowledge exploration journey and please provide feedback to help us improve the system 🙏!**
For similar tasks

aioreactive
Aioreactive is a Python library that brings ReactiveX functionality to asyncio using async and await. It is built on the Expression functional library and aims to provide a simple, clean, and async-based approach to reactive programming in Python. The library supports Python 3.10+ and focuses on using plain old functions for operators, running on the asyncio event loop, and providing implicit synchronous back-pressure for event processing.

LakeSoul
LakeSoul is a cloud-native Lakehouse framework that supports scalable metadata management, ACID transactions, efficient and flexible upsert operation, schema evolution, and unified streaming & batch processing. It supports multiple computing engines like Spark, Flink, Presto, and PyTorch, and computing modes such as batch, stream, MPP, and AI. LakeSoul scales metadata management and achieves ACID control by using PostgreSQL. It provides features like automatic compaction, table lifecycle maintenance, redundant data cleaning, and permission isolation for metadata.
For similar jobs

resonance
Resonance is a framework designed to facilitate interoperability and messaging between services in your infrastructure and beyond. It provides AI capabilities and takes full advantage of asynchronous PHP, built on top of Swoole. With Resonance, you can: * Chat with Open-Source LLMs: Create prompt controllers to directly answer user's prompts. LLM takes care of determining user's intention, so you can focus on taking appropriate action. * Asynchronous Where it Matters: Respond asynchronously to incoming RPC or WebSocket messages (or both combined) with little overhead. You can set up all the asynchronous features using attributes. No elaborate configuration is needed. * Simple Things Remain Simple: Writing HTTP controllers is similar to how it's done in the synchronous code. Controllers have new exciting features that take advantage of the asynchronous environment. * Consistency is Key: You can keep the same approach to writing software no matter the size of your project. There are no growing central configuration files or service dependencies registries. Every relation between code modules is local to those modules. * Promises in PHP: Resonance provides a partial implementation of Promise/A+ spec to handle various asynchronous tasks. * GraphQL Out of the Box: You can build elaborate GraphQL schemas by using just the PHP attributes. Resonance takes care of reusing SQL queries and optimizing the resources' usage. All fields can be resolved asynchronously.

aiogram_bot_template
Aiogram bot template is a boilerplate for creating Telegram bots using Aiogram framework. It provides a solid foundation for building robust and scalable bots with a focus on code organization, database integration, and localization.

pluto
Pluto is a development tool dedicated to helping developers **build cloud and AI applications more conveniently** , resolving issues such as the challenging deployment of AI applications and open-source models. Developers are able to write applications in familiar programming languages like **Python and TypeScript** , **directly defining and utilizing the cloud resources necessary for the application within their code base** , such as AWS SageMaker, DynamoDB, and more. Pluto automatically deduces the infrastructure resource needs of the app through **static program analysis** and proceeds to create these resources on the specified cloud platform, **simplifying the resources creation and application deployment process**.

pinecone-ts-client
The official Node.js client for Pinecone, written in TypeScript. This client library provides a high-level interface for interacting with the Pinecone vector database service. With this client, you can create and manage indexes, upsert and query vector data, and perform other operations related to vector search and retrieval. The client is designed to be easy to use and provides a consistent and idiomatic experience for Node.js developers. It supports all the features and functionality of the Pinecone API, making it a comprehensive solution for building vector-powered applications in Node.js.

aiohttp-pydantic
Aiohttp pydantic is an aiohttp view to easily parse and validate requests. You define using function annotations what your methods for handling HTTP verbs expect, and Aiohttp pydantic parses the HTTP request for you, validates the data, and injects the parameters you want. It provides features like query string, request body, URL path, and HTTP headers validation, as well as Open API Specification generation.

gcloud-aio
This repository contains shared codebase for two projects: gcloud-aio and gcloud-rest. gcloud-aio is built for Python 3's asyncio, while gcloud-rest is a threadsafe requests-based implementation. It provides clients for Google Cloud services like Auth, BigQuery, Datastore, KMS, PubSub, Storage, and Task Queue. Users can install the library using pip and refer to the documentation for usage details. Developers can contribute to the project by following the contribution guide.

aioconsole
aioconsole is a Python package that provides asynchronous console and interfaces for asyncio. It offers asynchronous equivalents to input, print, exec, and code.interact, an interactive loop running the asynchronous Python console, customization and running of command line interfaces using argparse, stream support to serve interfaces instead of using standard streams, and the apython script to access asyncio code at runtime without modifying the sources. The package requires Python version 3.8 or higher and can be installed from PyPI or GitHub. It allows users to run Python files or modules with a modified asyncio policy, replacing the default event loop with an interactive loop. aioconsole is useful for scenarios where users need to interact with asyncio code in a console environment.

aiosqlite
aiosqlite is a Python library that provides a friendly, async interface to SQLite databases. It replicates the standard sqlite3 module but with async versions of all the standard connection and cursor methods, along with context managers for automatically closing connections and cursors. It allows interaction with SQLite databases on the main AsyncIO event loop without blocking execution of other coroutines while waiting for queries or data fetches. The library also replicates most of the advanced features of sqlite3, such as row factories and total changes tracking.