aiokafka
asyncio client for kafka
Stars: 1108
aiokafka is an asyncio client for Kafka that provides high-level, asynchronous message producer and consumer functionalities. It allows users to interact with Kafka for sending and consuming messages in an efficient and scalable manner. The tool supports features like cluster layout retrieval, topic/partition leadership information, group coordination, and message consumption load balancing. Users can easily integrate aiokafka into their Python projects to work with Kafka seamlessly.
README:
.. image:: https://github.com/aio-libs/aiokafka/actions/workflows/tests.yml/badge.svg?branch=master :target: https://github.com/aio-libs/aiokafka/actions/workflows/tests.yml?query=branch%3Amaster :alt: |Build status| .. image:: https://codecov.io/github/aio-libs/aiokafka/coverage.svg?branch=master :target: https://codecov.io/gh/aio-libs/aiokafka/branch/master :alt: |Coverage| .. image:: https://badges.gitter.im/Join%20Chat.svg :target: https://gitter.im/aio-libs/Lobby :alt: |Chat on Gitter|
asyncio client for Kafka
AIOKafkaProducer
AIOKafkaProducer is a high-level, asynchronous message producer.
Example of AIOKafkaProducer usage:
.. code-block:: python
from aiokafka import AIOKafkaProducer
import asyncio
async def send_one():
producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
# Get cluster layout and initial topic/partition leadership information
await producer.start()
try:
# Produce message
await producer.send_and_wait("my_topic", b"Super message")
finally:
# Wait for all pending messages to be delivered or expire.
await producer.stop()
asyncio.run(send_one())
AIOKafkaConsumer
AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).
Example of AIOKafkaConsumer usage:
.. code-block:: python
from aiokafka import AIOKafkaConsumer
import asyncio
async def consume():
consumer = AIOKafkaConsumer(
'my_topic', 'my_other_topic',
bootstrap_servers='localhost:9092',
group_id="my-group")
# Get cluster layout and join group `my-group`
await consumer.start()
try:
# Consume messages
async for msg in consumer:
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
asyncio.run(consume())
https://aiokafka.readthedocs.io/
Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that lz4 compression libraries for python will require python-dev package,
or python source header files for compilation on Linux.
NOTE: You will also need a valid java installation. It's required for the keytool utility, used to
generate ssh keys for some tests.
Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+)::
sudo apt-get install -y libkrb5-dev krb5-user
make setup
Running tests with coverage::
make cov
To run tests with a specific version of Kafka (default one is 2.8.1) use KAFKA_VERSION variable::
make cov SCALA_VERSION=2.11 KAFKA_VERSION=0.10.2.1
Test running cheatsheat:
-
make test FLAGS="-l -x --ff"- run until 1 failure, rerun failed tests first. Great for cleaning up a lot of errors, say after a big refactor. -
make test FLAGS="-k consumer"- run only the consumer tests. -
make test FLAGS="-m 'not ssl'"- run tests excluding ssl. -
make test FLAGS="--no-pull"- do not try to pull new docker image before test run.
For Tasks:
Click tags to check more tools for each tasksFor Jobs:
Alternative AI tools for aiokafka
Similar Open Source Tools
aiokafka
aiokafka is an asyncio client for Kafka that provides high-level, asynchronous message producer and consumer functionalities. It allows users to interact with Kafka for sending and consuming messages in an efficient and scalable manner. The tool supports features like cluster layout retrieval, topic/partition leadership information, group coordination, and message consumption load balancing. Users can easily integrate aiokafka into their Python projects to work with Kafka seamlessly.
BentoML
BentoML is an open-source model serving library for building performant and scalable AI applications with Python. It comes with everything you need for serving optimization, model packaging, and production deployment.
archgw
Arch is an intelligent Layer 7 gateway designed to protect, observe, and personalize AI agents with APIs. It handles tasks related to prompts, including detecting jailbreak attempts, calling backend APIs, routing between LLMs, and managing observability. Built on Envoy Proxy, it offers features like function calling, prompt guardrails, traffic management, and observability. Users can build fast, observable, and personalized AI agents using Arch to improve speed, security, and personalization of GenAI apps.
clarifai-python-grpc
This is the official Clarifai gRPC Python client for interacting with their recognition API. Clarifai offers a platform for data scientists, developers, researchers, and enterprises to utilize artificial intelligence for image, video, and text analysis through computer vision and natural language processing. The client allows users to authenticate, predict concepts in images, and access various functionalities provided by the Clarifai API. It follows a versioning scheme that aligns with the backend API updates and includes specific instructions for installation and troubleshooting. Users can explore the Clarifai demo, sign up for an account, and refer to the documentation for detailed information.
codebox-api
CodeBox is a cloud infrastructure tool designed for running Python code in an isolated environment. It also offers simple file input/output capabilities and will soon support vector database operations. Users can install CodeBox using pip and utilize it by setting up an API key. The tool allows users to execute Python code snippets and interact with the isolated environment. CodeBox is currently in early development stages and requires manual handling for certain operations like refunds and cancellations. The tool is open for contributions through issue reporting and pull requests. It is licensed under MIT and can be contacted via email at [email protected].
tgpt
tgpt is a cross-platform command-line interface (CLI) tool that allows users to interact with AI chatbots in the Terminal without needing API keys. It supports various AI providers such as KoboldAI, Phind, Llama2, Blackbox AI, and OpenAI. Users can generate text, code, and images using different flags and options. The tool can be installed on GNU/Linux, MacOS, FreeBSD, and Windows systems. It also supports proxy configurations and provides options for updating and uninstalling the tool.
labo
LABO is a time series forecasting and analysis framework that integrates pre-trained and fine-tuned LLMs with multi-domain agent-based systems. It allows users to create and tune agents easily for various scenarios, such as stock market trend prediction and web public opinion analysis. LABO requires a specific runtime environment setup, including system requirements, Python environment, dependency installations, and configurations. Users can fine-tune their own models using LABO's Low-Rank Adaptation (LoRA) for computational efficiency and continuous model updates. Additionally, LABO provides a Python library for building model training pipelines and customizing agents for specific tasks.
axar
AXAR AI is a lightweight framework designed for building production-ready agentic applications using TypeScript. It aims to simplify the process of creating robust, production-grade LLM-powered apps by focusing on familiar coding practices without unnecessary abstractions or steep learning curves. The framework provides structured, typed inputs and outputs, familiar and intuitive patterns like dependency injection and decorators, explicit control over agent behavior, real-time logging and monitoring tools, minimalistic design with little overhead, model agnostic compatibility with various AI models, and streamed outputs for fast and accurate results. AXAR AI is ideal for developers working on real-world AI applications who want a tool that gets out of the way and allows them to focus on shipping reliable software.
aioimaplib
aioimaplib is a Python library inspired by imaplib and imaplib2, aiming to port imaplib with asyncio for asynchronous benefits. It provides functionalities to interact with IMAP servers using asyncio, including checking mailbox, waiting for new messages, handling IDLE command, threading, IMAP command concurrency, logging configuration, and authentication with OAuth2. The library is tested with various IMAP servers like dovecot, Gmail, Outlook, Yahoo, etc. Developers are encouraged to contribute by improving, bug fixing, testing with other IMAP servers, and providing feedback. The library supports most IMAP4rev1 commands from RFC3501 and plans to implement additional commands like 'STARTTLS', 'AUTHENTICATE', 'COMPRESS', 'SETACL', 'DELETEACL', 'GETACL', 'MYRIGHTS', 'LISTRIGHTS', 'GETQUOTA', 'GETQUOTAROOT', 'SETQUOTA', 'SORT', 'THREAD', 'ID', 'NAMESPACE', 'CATENATE', and tests with other servers.
openai-kit
OpenAIKit is a Swift package designed to facilitate communication with the OpenAI API. It provides methods to interact with various OpenAI services such as chat, models, completions, edits, images, embeddings, files, moderations, and speech to text. The package encourages the use of environment variables to securely inject the OpenAI API key and organization details. It also offers error handling for API requests through the `OpenAIKit.APIErrorResponse`.
Biomni
Biomni is a general-purpose biomedical AI agent designed to autonomously execute a wide range of research tasks across diverse biomedical subfields. By integrating cutting-edge large language model (LLM) reasoning with retrieval-augmented planning and code-based execution, Biomni helps scientists dramatically enhance research productivity and generate testable hypotheses.
agents
The LiveKit Agent Framework is designed for building real-time, programmable participants that run on servers. Easily tap into LiveKit WebRTC sessions and process or generate audio, video, and data streams. The framework includes plugins for common workflows, such as voice activity detection and speech-to-text. Agents integrates seamlessly with LiveKit server, offloading job queuing and scheduling responsibilities to it. This eliminates the need for additional queuing infrastructure. Agent code developed on your local machine can scale to support thousands of concurrent sessions when deployed to a server in production.
codecompanion.nvim
CodeCompanion.nvim is a Neovim plugin that provides a Copilot Chat experience, adapter support for various LLMs, agentic workflows, inline code creation and modification, built-in actions for language prompts and error fixes, custom actions creation, async execution, and more. It supports Anthropic, Ollama, and OpenAI adapters. The plugin is primarily developed for personal workflows with no guarantees of regular updates or support. Users can customize the plugin to their needs by forking the project.
codellm-devkit
Codellm-devkit (CLDK) is a Python library that serves as a multilingual program analysis framework bridging traditional static analysis tools and Large Language Models (LLMs) specialized for code (CodeLLMs). It simplifies the process of analyzing codebases across multiple programming languages, enabling the extraction of meaningful insights and facilitating LLM-based code analysis. The library provides a unified interface for integrating outputs from various analysis tools and preparing them for effective use by CodeLLMs. Codellm-devkit aims to enable the development and experimentation of robust analysis pipelines that combine traditional program analysis tools and CodeLLMs, reducing friction in multi-language code analysis and ensuring compatibility across different tools and LLM platforms. It is designed to seamlessly integrate with popular analysis tools like WALA, Tree-sitter, LLVM, and CodeQL, acting as a crucial intermediary layer for efficient communication between these tools and CodeLLMs. The project is continuously evolving to include new tools and frameworks, maintaining its versatility for code analysis and LLM integration.
aioclock
An asyncio-based scheduling framework designed for execution of periodic tasks with integrated support for dependency injection, enabling efficient and flexible task management. Aioclock is 100% async, light, fast, and resource-friendly. It offers features like task scheduling, grouping, trigger definition, easy syntax, Pydantic v2 validation, and upcoming support for running the task dispatcher on a different process and backend support for horizontal scaling.
semantic-kernel
Semantic Kernel is an SDK that integrates Large Language Models (LLMs) like OpenAI, Azure OpenAI, and Hugging Face with conventional programming languages like C#, Python, and Java. Semantic Kernel achieves this by allowing you to define plugins that can be chained together in just a few lines of code. What makes Semantic Kernel _special_ , however, is its ability to _automatically_ orchestrate plugins with AI. With Semantic Kernel planners, you can ask an LLM to generate a plan that achieves a user's unique goal. Afterwards, Semantic Kernel will execute the plan for the user.
For similar tasks
aiokafka
aiokafka is an asyncio client for Kafka that provides high-level, asynchronous message producer and consumer functionalities. It allows users to interact with Kafka for sending and consuming messages in an efficient and scalable manner. The tool supports features like cluster layout retrieval, topic/partition leadership information, group coordination, and message consumption load balancing. Users can easily integrate aiokafka into their Python projects to work with Kafka seamlessly.
claude-api
claude-api is a web conversation library for ClaudeAI implemented in GoLang. It provides functionalities to interact with ClaudeAI for web-based conversations. Users can easily integrate this library into their Go projects to enable chatbot capabilities and handle conversations with ClaudeAI. The library includes features for sending messages, receiving responses, and managing chat sessions, making it a valuable tool for developers looking to incorporate AI-powered chatbots into their applications.
Chat-With-RTX-python-api
This repository contains a Python API for Chat With RTX, which allows users to interact with RTX models for natural language processing. The API provides functionality to send messages and receive responses from various LLM models. It also includes information on the speed of different models supported by Chat With RTX. The repository has a history of updates, including the removal of a feature and the addition of a new model for speech-to-text conversion. The repository is licensed under CC0.
Ollama
Ollama SDK for .NET is a fully generated C# SDK based on OpenAPI specification using OpenApiGenerator. It supports automatic releases of new preview versions, source generator for defining tools natively through C# interfaces, and all modern .NET features. The SDK provides support for all Ollama API endpoints including chats, embeddings, listing models, pulling and creating new models, and more. It also offers tools for interacting with weather data and providing weather-related information to users.
tinyclaw
TinyClaw is a lightweight wrapper around Claude Code that connects WhatsApp via QR code, processes messages sequentially, maintains conversation context, runs 24/7 in tmux, and is ready for multi-channel support. Its key innovation is the file-based queue system that prevents race conditions and enables multi-channel support. TinyClaw consists of components like whatsapp-client.js for WhatsApp I/O, queue-processor.js for message processing, heartbeat-cron.sh for health checks, and tinyclaw.sh as the main orchestrator with a CLI interface. It ensures no race conditions, is multi-channel ready, provides clean responses using claude -c -p, and supports persistent sessions. Security measures include local storage of WhatsApp session and queue files, channel-specific authentication, and running Claude with user permissions.
For similar jobs
db2rest
DB2Rest is a modern low-code REST DATA API platform that simplifies the development of intelligent applications. It seamlessly integrates existing and new databases with language models (LMs/LLMs) and vector stores, enabling the rapid delivery of context-aware, reasoning applications without vendor lock-in.
mage-ai
Mage is an open-source data pipeline tool for transforming and integrating data. It offers an easy developer experience, engineering best practices built-in, and data as a first-class citizen. Mage makes it easy to build, preview, and launch data pipelines, and provides observability and scaling capabilities. It supports data integrations, streaming pipelines, and dbt integration.
airbyte
Airbyte is an open-source data integration platform that makes it easy to move data from any source to any destination. With Airbyte, you can build and manage data pipelines without writing any code. Airbyte provides a library of pre-built connectors that make it easy to connect to popular data sources and destinations. You can also create your own connectors using Airbyte's no-code Connector Builder or low-code CDK. Airbyte is used by data engineers and analysts at companies of all sizes to build and manage their data pipelines.
labelbox-python
Labelbox is a data-centric AI platform for enterprises to develop, optimize, and use AI to solve problems and power new products and services. Enterprises use Labelbox to curate data, generate high-quality human feedback data for computer vision and LLMs, evaluate model performance, and automate tasks by combining AI and human-centric workflows. The academic & research community uses Labelbox for cutting-edge AI research.
telemetry-airflow
This repository codifies the Airflow cluster that is deployed at workflow.telemetry.mozilla.org (behind SSO) and commonly referred to as "WTMO" or simply "Airflow". Some links relevant to users and developers of WTMO: * The `dags` directory in this repository contains some custom DAG definitions * Many of the DAGs registered with WTMO don't live in this repository, but are instead generated from ETL task definitions in bigquery-etl * The Data SRE team maintains a WTMO Developer Guide (behind SSO)
airflow
Apache Airflow (or simply Airflow) is a platform to programmatically author, schedule, and monitor workflows. When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative. Use Airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.
airbyte-platform
Airbyte is an open-source data integration platform that makes it easy to move data from any source to any destination. With Airbyte, you can build and manage data pipelines without writing any code. Airbyte provides a library of pre-built connectors that make it easy to connect to popular data sources and destinations. You can also create your own connectors using Airbyte's low-code Connector Development Kit (CDK). Airbyte is used by data engineers and analysts at companies of all sizes to move data for a variety of purposes, including data warehousing, data analysis, and machine learning.
chronon
Chronon is a platform that simplifies and improves ML workflows by providing a central place to define features, ensuring point-in-time correctness for backfills, simplifying orchestration for batch and streaming pipelines, offering easy endpoints for feature fetching, and guaranteeing and measuring consistency. It offers benefits over other approaches by enabling the use of a broad set of data for training, handling large aggregations and other computationally intensive transformations, and abstracting away the infrastructure complexity of data plumbing.