
arkflow
High performance Rust stream processing engine seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis.
Stars: 1153

ArkFlow is a high-performance Rust stream processing engine that seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis. It supports multiple input/output sources and processors, enabling easy loading and execution of machine learning models for streaming data and inference, anomaly detection, and complex event processing. The tool is built on Rust and Tokio async runtime, offering excellent performance and low latency. It features built-in SQL queries, Python script, JSON processing, Protobuf encoding/decoding, and batch processing capabilities. ArkFlow is extensible with a modular design, making it easy to extend with new components.
README:
English | 中文
High performance Rust stream processing engine seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis. It not only supports multiple input/output sources and processors, but also enables easy loading and execution of machine learning models, enabling streaming data and inference, anomaly detection, and complex event processing.
ArkFlow enlisted in the CNCF Cloud Native Landscape.
- High Performance: Built on Rust and Tokio async runtime, offering excellent performance and low latency
- Multiple Data Sources: Support for Kafka, MQTT, HTTP, files, and other input/output sources
- Powerful Processing Capabilities: Built-in SQL queries, Python script, JSON processing, Protobuf encoding/decoding, batch processing, and other processors
- Extensible: Modular design, easy to extend with new input, buffer, output, and processor components
# Clone the repository
git clone https://github.com/arkflow-rs/arkflow.git
cd arkflow
# Build the project
cargo build --release
# Run tests
cargo test
- Create a configuration file
config.yaml
:
logging:
level: info
streams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1s
batch_size: 10
pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT * FROM flow WHERE value >= 10"
output:
type: "stdout"
error_output:
type: "stdout"
- Run ArkFlow:
./target/release/arkflow --config config.yaml
ArkFlow uses YAML format configuration files, supporting the following main configuration items:
logging:
level: info # Log level: debug, info, warn, error
streams: # Stream definition list
- input: # Input configuration
# ...
pipeline: # Processing pipeline configuration
# ...
output: # Output configuration
# ...
error_output: # Error output configuration
# ...
buffer: # Buffer configuration
# ...
ArkFlow supports multiple input sources:
- Kafka: Read data from Kafka topics
- MQTT: Subscribe to messages from MQTT topics
- HTTP: Receive data via HTTP
- File: Reading data from files(Csv,Json, Parquet, Avro, Arrow) using SQL
- Generator: Generate test data
- Database: Query data from databases(MySQL, PostgreSQL, SQLite, Duckdb)
- Nats: Subscribe to messages from Nats topics
- Redis: Subscribe to messages from Redis channels or lists
- Websocket: Subscribe to messages from WebSocket connections
- Modbus: Read data from Modbus devices
Example:
input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group
client_id: arkflow
start_from_latest: true
ArkFlow provides multiple data processors:
- JSON: JSON data processing and transformation
- SQL: Process data using SQL queries
- Protobuf: Protobuf encoding/decoding
- Batch Processing: Process messages in batches
- Vrl: Process data using VRL
Example:
pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value >= 10"
ArkFlow supports multiple output targets:
- Kafka: Write data to Kafka topics
- MQTT: Publish messages to MQTT topics
- HTTP: Send data via HTTP
- Standard Output: Output data to the console
- Drop: Discard data
- Nats: Publish messages to Nats topics
Example:
output:
type: kafka
brokers:
- localhost:9092
topic:
type: value
value:
type: value
value: test-topic
client_id: arkflow-producer
ArkFlow supports multiple error output targets:
- Kafka: Write error data to Kafka topics
- MQTT: Publish error messages to MQTT topics
- HTTP: Send error data via HTTP
- Standard Output: Output error data to the console
- Drop: Discard error data
- Nats: Publish messages to Nats topics
Example:
error_output:
type: kafka
brokers:
- localhost:9092
topic:
type: value
value: error-topic
client_id: error-arkflow-producer
ArkFlow provides buffer capabilities to handle backpressure and temporary storage of messages:
- Memory Buffer: Memory buffer, for high-throughput scenarios and window aggregation.
- Session Window: The Session Window buffer component provides a session-based message grouping mechanism where messages are grouped based on activity gaps. It implements a session window that closes after a configurable period of inactivity.
- Sliding Window: The Sliding Window buffer component provides a time-based windowing mechanism for processing message batches. It implements a sliding window algorithm with configurable window size, slide interval and slide size.
- Tumbling Window: The Tumbling Window buffer component provides a fixed-size, non-overlapping windowing mechanism for processing message batches. It implements a tumbling window algorithm with configurable interval settings.
Example:
buffer:
type: memory
capacity: 10000 # Maximum number of messages to buffer
timeout: 10s # Maximum time to buffer messages
streams:
- input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group
pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value > 100"
output:
type: kafka
brokers:
- localhost:9092
topic:
type: value
value: test-topic
streams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1ms
batch_size: 10000
pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor"
output:
type: "stdout"
- Conalog(Country: South Korea)
ArkFlow is licensed under the Apache License 2.0.
Discord: https://discord.gg/CwKhzb8pux
If you like or are using this project to learn or start your solution, please give it a star⭐. Thanks!
For Tasks:
Click tags to check more tools for each tasksFor Jobs:
Alternative AI tools for arkflow
Similar Open Source Tools

arkflow
ArkFlow is a high-performance Rust stream processing engine that seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis. It supports multiple input/output sources and processors, enabling easy loading and execution of machine learning models for streaming data and inference, anomaly detection, and complex event processing. The tool is built on Rust and Tokio async runtime, offering excellent performance and low latency. It features built-in SQL queries, Python script, JSON processing, Protobuf encoding/decoding, and batch processing capabilities. ArkFlow is extensible with a modular design, making it easy to extend with new components.

quantalogic
QuantaLogic is a ReAct framework for building advanced AI agents that seamlessly integrates large language models with a robust tool system. It aims to bridge the gap between advanced AI models and practical implementation in business processes by enabling agents to understand, reason about, and execute complex tasks through natural language interaction. The framework includes features such as ReAct Framework, Universal LLM Support, Secure Tool System, Real-time Monitoring, Memory Management, and Enterprise Ready components.

simba
Simba is an open source, portable Knowledge Management System (KMS) designed to seamlessly integrate with any Retrieval-Augmented Generation (RAG) system. It features a modern UI and modular architecture, allowing developers to focus on building advanced AI solutions without the complexities of knowledge management. Simba offers a user-friendly interface to visualize and modify document chunks, supports various vector stores and embedding models, and simplifies knowledge management for developers. It is community-driven, extensible, and aims to enhance AI functionality by providing a seamless integration with RAG-based systems.

oxylabs-mcp
The Oxylabs MCP Server acts as a bridge between AI models and the web, providing clean, structured data from any site. It enables scraping of URLs, rendering JavaScript-heavy pages, content extraction for AI use, bypassing anti-scraping measures, and accessing geo-restricted web data from 195+ countries. The implementation utilizes the Model Context Protocol (MCP) to facilitate secure interactions between AI assistants and web content. Key features include scraping content from any site, automatic data cleaning and conversion, bypassing blocks and geo-restrictions, flexible setup with cross-platform support, and built-in error handling and request management.

swift-ocr-llm-powered-pdf-to-markdown
Swift OCR is a powerful tool for extracting text from PDF files using OpenAI's GPT-4 Turbo with Vision model. It offers flexible input options, advanced OCR processing, performance optimizations, structured output, robust error handling, and scalable architecture. The tool ensures accurate text extraction, resilience against failures, and efficient handling of multiple requests.

R2R
R2R (RAG to Riches) is a fast and efficient framework for serving high-quality Retrieval-Augmented Generation (RAG) to end users. The framework is designed with customizable pipelines and a feature-rich FastAPI implementation, enabling developers to quickly deploy and scale RAG-based applications. R2R was conceived to bridge the gap between local LLM experimentation and scalable production solutions. **R2R is to LangChain/LlamaIndex what NextJS is to React**. A JavaScript client for R2R deployments can be found here. ### Key Features * **🚀 Deploy** : Instantly launch production-ready RAG pipelines with streaming capabilities. * **🧩 Customize** : Tailor your pipeline with intuitive configuration files. * **🔌 Extend** : Enhance your pipeline with custom code integrations. * **⚖️ Autoscale** : Scale your pipeline effortlessly in the cloud using SciPhi. * **🤖 OSS** : Benefit from a framework developed by the open-source community, designed to simplify RAG deployment.

pilottai
PilottAI is a Python framework for building autonomous multi-agent systems with advanced orchestration capabilities. It provides enterprise-ready features for building scalable AI applications. The framework includes hierarchical agent systems, production-ready features like asynchronous processing and fault tolerance, advanced memory management with semantic storage, and integrations with multiple LLM providers and custom tools. PilottAI offers specialized agents for various tasks such as customer service, document processing, email handling, knowledge acquisition, marketing, research analysis, sales, social media, and web search. The framework also provides documentation, example use cases, and advanced features like memory management, load balancing, and fault tolerance.

orra
Orra is a tool for building production-ready multi-agent applications that handle complex real-world interactions. It coordinates tasks across existing stack, agents, and tools run as services using intelligent reasoning. With features like smart pre-evaluated execution plans, domain grounding, durable execution, and automatic service health monitoring, Orra enables users to go fast with tools as services and revert state to handle failures. It provides real-time status tracking and webhook result delivery, making it ideal for developers looking to move beyond simple crews and agents.

code_puppy
Code Puppy is an AI-powered code generation agent designed to understand programming tasks, generate high-quality code, and explain its reasoning. It supports multi-language code generation, interactive CLI, and detailed code explanations. The tool requires Python 3.9+ and API keys for various models like GPT, Google's Gemini, Cerebras, and Claude. It also integrates with MCP servers for advanced features like code search and documentation lookups. Users can create custom JSON agents for specialized tasks and access a variety of tools for file management, code execution, and reasoning sharing.

WebAI-to-API
This project implements a web API that offers a unified interface to Google Gemini and Claude 3. It provides a self-hosted, lightweight, and scalable solution for accessing these AI models through a streaming API. The API supports both Claude and Gemini models, allowing users to interact with them in real-time. The project includes a user-friendly web UI for configuration and documentation, making it easy to get started and explore the capabilities of the API.

paelladoc
PAELLADOC is an intelligent documentation system that uses AI to analyze code repositories and generate comprehensive technical documentation. It offers a modular architecture with MECE principles, interactive documentation process, key features like Orchestrator and Commands, and a focus on context for successful AI programming. The tool aims to streamline documentation creation, code generation, and product management tasks for software development teams, providing a definitive standard for AI-assisted development documentation.

sim
Sim is a platform that allows users to build and deploy AI agent workflows quickly and easily. It provides cloud-hosted and self-hosted options, along with support for local AI models. Users can set up the application using Docker Compose, Dev Containers, or manual setup with PostgreSQL and pgvector extension. The platform utilizes technologies like Next.js, Bun, PostgreSQL with Drizzle ORM, Better Auth for authentication, Shadcn and Tailwind CSS for UI, Zustand for state management, ReactFlow for flow editor, Fumadocs for documentation, Turborepo for monorepo management, Socket.io for real-time communication, and Trigger.dev for background jobs.

probe
Probe is an AI-friendly, fully local, semantic code search tool designed to power the next generation of AI coding assistants. It combines the speed of ripgrep with the code-aware parsing of tree-sitter to deliver precise results with complete code blocks, making it perfect for large codebases and AI-driven development workflows. Probe is fully local, keeping code on the user's machine without relying on external APIs. It supports multiple languages, offers various search options, and can be used in CLI mode, MCP server mode, AI chat mode, and web interface. The tool is designed to be flexible, fast, and accurate, providing developers and AI models with full context and relevant code blocks for efficient code exploration and understanding.

one
ONE is a modern web and AI agent development toolkit that empowers developers to build AI-powered applications with high performance, beautiful UI, AI integration, responsive design, type safety, and great developer experience. It is perfect for building modern web applications, from simple landing pages to complex AI-powered platforms.
For similar tasks

qdrant
Qdrant is a vector similarity search engine and vector database. It is written in Rust, which makes it fast and reliable even under high load. Qdrant can be used for a variety of applications, including: * Semantic search * Image search * Product recommendations * Chatbots * Anomaly detection Qdrant offers a variety of features, including: * Payload storage and filtering * Hybrid search with sparse vectors * Vector quantization and on-disk storage * Distributed deployment * Highlighted features such as query planning, payload indexes, SIMD hardware acceleration, async I/O, and write-ahead logging Qdrant is available as a fully managed cloud service or as an open-source software that can be deployed on-premises.

SynapseML
SynapseML (previously known as MMLSpark) is an open-source library that simplifies the creation of massively scalable machine learning (ML) pipelines. It provides simple, composable, and distributed APIs for various machine learning tasks such as text analytics, vision, anomaly detection, and more. Built on Apache Spark, SynapseML allows seamless integration of models into existing workflows. It supports training and evaluation on single-node, multi-node, and resizable clusters, enabling scalability without resource wastage. Compatible with Python, R, Scala, Java, and .NET, SynapseML abstracts over different data sources for easy experimentation. Requires Scala 2.12, Spark 3.4+, and Python 3.8+.

mlx-vlm
MLX-VLM is a package designed for running Vision LLMs on Mac systems using MLX. It provides a convenient way to install and utilize the package for processing large language models related to vision tasks. The tool simplifies the process of running LLMs on Mac computers, offering a seamless experience for users interested in leveraging MLX for vision-related projects.

Java-AI-Book-Code
The Java-AI-Book-Code repository contains code examples for the 2020 edition of 'Practical Artificial Intelligence With Java'. It is a comprehensive update of the previous 2013 edition, featuring new content on deep learning, knowledge graphs, anomaly detection, linked data, genetic algorithms, search algorithms, and more. The repository serves as a valuable resource for Java developers interested in AI applications and provides practical implementations of various AI techniques and algorithms.

Awesome-AI-Data-Guided-Projects
A curated list of data science & AI guided projects to start building your portfolio. The repository contains guided projects covering various topics such as large language models, time series analysis, computer vision, natural language processing (NLP), and data science. Each project provides detailed instructions on how to implement specific tasks using different tools and technologies.

awesome-AIOps
awesome-AIOps is a curated list of academic researches and industrial materials related to Artificial Intelligence for IT Operations (AIOps). It includes resources such as competitions, white papers, blogs, tutorials, benchmarks, tools, companies, academic materials, talks, workshops, papers, and courses covering various aspects of AIOps like anomaly detection, root cause analysis, incident management, microservices, dependency tracing, and more.

AI-Security-and-Privacy-Events
AI-Security-and-Privacy-Events is a curated list of academic events focusing on AI security and privacy. It includes seminars, conferences, workshops, tutorials, special sessions, and covers various topics such as NLP & LLM Security, Privacy and Security in ML, Machine Learning Security, AI System with Confidential Computing, Adversarial Machine Learning, and more.

AI_Spectrum
AI_Spectrum is a versatile machine learning library that provides a wide range of tools and algorithms for building and deploying AI models. It offers a user-friendly interface for data preprocessing, model training, and evaluation. With AI_Spectrum, users can easily experiment with different machine learning techniques and optimize their models for various tasks. The library is designed to be flexible and scalable, making it suitable for both beginners and experienced data scientists.
For similar jobs

llmops-promptflow-template
LLMOps with Prompt flow is a template and guidance for building LLM-infused apps using Prompt flow. It provides centralized code hosting, lifecycle management, variant and hyperparameter experimentation, A/B deployment, many-to-many dataset/flow relationships, multiple deployment targets, comprehensive reporting, BYOF capabilities, configuration-based development, local prompt experimentation and evaluation, endpoint testing, and optional Human-in-loop validation. The tool is customizable to suit various application needs.

azure-search-vector-samples
This repository provides code samples in Python, C#, REST, and JavaScript for vector support in Azure AI Search. It includes demos for various languages showcasing vectorization of data, creating indexes, and querying vector data. Additionally, it offers tools like Azure AI Search Lab for experimenting with AI-enabled search scenarios in Azure and templates for deploying custom chat-with-your-data solutions. The repository also features documentation on vector search, hybrid search, creating and querying vector indexes, and REST API references for Azure AI Search and Azure OpenAI Service.

geti-sdk
The Intel® Geti™ SDK is a python package that enables teams to rapidly develop AI models by easing the complexities of model development and enhancing collaboration between teams. It provides tools to interact with an Intel® Geti™ server via the REST API, allowing for project creation, downloading, uploading, deploying for local inference with OpenVINO, setting project and model configuration, launching and monitoring training jobs, and media upload and prediction. The SDK also includes tutorial-style Jupyter notebooks demonstrating its usage.

booster
Booster is a powerful inference accelerator designed for scaling large language models within production environments or for experimental purposes. It is built with performance and scaling in mind, supporting various CPUs and GPUs, including Nvidia CUDA, Apple Metal, and OpenCL cards. The tool can split large models across multiple GPUs, offering fast inference on machines with beefy GPUs. It supports both regular FP16/FP32 models and quantised versions, along with popular LLM architectures. Additionally, Booster features proprietary Janus Sampling for code generation and non-English languages.

xFasterTransformer
xFasterTransformer is an optimized solution for Large Language Models (LLMs) on the X86 platform, providing high performance and scalability for inference on mainstream LLM models. It offers C++ and Python APIs for easy integration, along with example codes and benchmark scripts. Users can prepare models in a different format, convert them, and use the APIs for tasks like encoding input prompts, generating token ids, and serving inference requests. The tool supports various data types and models, and can run in single or multi-rank modes using MPI. A web demo based on Gradio is available for popular LLM models like ChatGLM and Llama2. Benchmark scripts help evaluate model inference performance quickly, and MLServer enables serving with REST and gRPC interfaces.

amazon-transcribe-live-call-analytics
The Amazon Transcribe Live Call Analytics (LCA) with Agent Assist Sample Solution is designed to help contact centers assess and optimize caller experiences in real time. It leverages Amazon machine learning services like Amazon Transcribe, Amazon Comprehend, and Amazon SageMaker to transcribe and extract insights from contact center audio. The solution provides real-time supervisor and agent assist features, integrates with existing contact centers, and offers a scalable, cost-effective approach to improve customer interactions. The end-to-end architecture includes features like live call transcription, call summarization, AI-powered agent assistance, and real-time analytics. The solution is event-driven, ensuring low latency and seamless processing flow from ingested speech to live webpage updates.

ai-lab-recipes
This repository contains recipes for building and running containerized AI and LLM applications with Podman. It provides model servers that serve machine-learning models via an API, allowing developers to quickly prototype new AI applications locally. The recipes include components like model servers and AI applications for tasks such as chat, summarization, object detection, etc. Images for sample applications and models are available in `quay.io`, and bootable containers for AI training on Linux OS are enabled.

XLearning
XLearning is a scheduling platform for big data and artificial intelligence, supporting various machine learning and deep learning frameworks. It runs on Hadoop Yarn and integrates frameworks like TensorFlow, MXNet, Caffe, Theano, PyTorch, Keras, XGBoost. XLearning offers scalability, compatibility, multiple deep learning framework support, unified data management based on HDFS, visualization display, and compatibility with code at native frameworks. It provides functions for data input/output strategies, container management, TensorBoard service, and resource usage metrics display. XLearning requires JDK >= 1.7 and Maven >= 3.3 for compilation, and deployment on CentOS 7.2 with Java >= 1.7 and Hadoop 2.6, 2.7, 2.8.