
arkflow
High performance Rust stream processing engine seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis.
Stars: 1161

ArkFlow is a high-performance Rust stream processing engine that seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis. It supports multiple input/output sources and processors, enabling easy loading and execution of machine learning models for streaming data and inference, anomaly detection, and complex event processing. The tool is built on Rust and Tokio async runtime, offering excellent performance and low latency. It features built-in SQL queries, Python script, JSON processing, Protobuf encoding/decoding, and batch processing capabilities. ArkFlow is extensible with a modular design, making it easy to extend with new components.
README:
English | 中文
High performance Rust stream processing engine seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis. It not only supports multiple input/output sources and processors, but also enables easy loading and execution of machine learning models, enabling streaming data and inference, anomaly detection, and complex event processing.
ArkFlow enlisted in the CNCF Cloud Native Landscape.
- High Performance: Built on Rust and Tokio async runtime, offering excellent performance and low latency
- Multiple Data Sources: Support for Kafka, MQTT, HTTP, files, and other input/output sources
- Powerful Processing Capabilities: Built-in SQL queries, Python script, JSON processing, Protobuf encoding/decoding, batch processing, and other processors
- Extensible: Modular design, easy to extend with new input, buffer, output, and processor components
# Clone the repository
git clone https://github.com/arkflow-rs/arkflow.git
cd arkflow
# Build the project
cargo build --release
# Run tests
cargo test
- Create a configuration file
config.yaml
:
logging:
level: info
streams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1s
batch_size: 10
pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT * FROM flow WHERE value >= 10"
output:
type: "stdout"
error_output:
type: "stdout"
- Run ArkFlow:
./target/release/arkflow --config config.yaml
ArkFlow uses YAML format configuration files, supporting the following main configuration items:
logging:
level: info # Log level: debug, info, warn, error
streams: # Stream definition list
- input: # Input configuration
# ...
pipeline: # Processing pipeline configuration
# ...
output: # Output configuration
# ...
error_output: # Error output configuration
# ...
buffer: # Buffer configuration
# ...
ArkFlow supports multiple input sources:
- Kafka: Read data from Kafka topics
- MQTT: Subscribe to messages from MQTT topics
- HTTP: Receive data via HTTP
- File: Reading data from files(Csv,Json, Parquet, Avro, Arrow) using SQL
- Generator: Generate test data
- Database: Query data from databases(MySQL, PostgreSQL, SQLite, Duckdb)
- Nats: Subscribe to messages from Nats topics
- Redis: Subscribe to messages from Redis channels or lists
- Websocket: Subscribe to messages from WebSocket connections
- Modbus: Read data from Modbus devices
Example:
input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group
client_id: arkflow
start_from_latest: true
ArkFlow provides multiple data processors:
- JSON: JSON data processing and transformation
- SQL: Process data using SQL queries
- Protobuf: Protobuf encoding/decoding
- Batch Processing: Process messages in batches
- Vrl: Process data using VRL
Example:
pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value >= 10"
ArkFlow supports multiple output targets:
- Kafka: Write data to Kafka topics
- MQTT: Publish messages to MQTT topics
- HTTP: Send data via HTTP
- Standard Output: Output data to the console
- Drop: Discard data
- Nats: Publish messages to Nats topics
Example:
output:
type: kafka
brokers:
- localhost:9092
topic:
type: value
value:
type: value
value: test-topic
client_id: arkflow-producer
ArkFlow supports multiple error output targets:
- Kafka: Write error data to Kafka topics
- MQTT: Publish error messages to MQTT topics
- HTTP: Send error data via HTTP
- Standard Output: Output error data to the console
- Drop: Discard error data
- Nats: Publish messages to Nats topics
Example:
error_output:
type: kafka
brokers:
- localhost:9092
topic:
type: value
value: error-topic
client_id: error-arkflow-producer
ArkFlow provides buffer capabilities to handle backpressure and temporary storage of messages:
- Memory Buffer: Memory buffer, for high-throughput scenarios and window aggregation.
- Session Window: The Session Window buffer component provides a session-based message grouping mechanism where messages are grouped based on activity gaps. It implements a session window that closes after a configurable period of inactivity.
- Sliding Window: The Sliding Window buffer component provides a time-based windowing mechanism for processing message batches. It implements a sliding window algorithm with configurable window size, slide interval and slide size.
- Tumbling Window: The Tumbling Window buffer component provides a fixed-size, non-overlapping windowing mechanism for processing message batches. It implements a tumbling window algorithm with configurable interval settings.
Example:
buffer:
type: memory
capacity: 10000 # Maximum number of messages to buffer
timeout: 10s # Maximum time to buffer messages
streams:
- input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group
pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value > 100"
output:
type: kafka
brokers:
- localhost:9092
topic:
type: value
value: test-topic
streams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1ms
batch_size: 10000
pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor"
output:
type: "stdout"
- Conalog(Country: South Korea)
ArkFlow is licensed under the Apache License 2.0.
Discord: https://discord.gg/CwKhzb8pux
If you like or are using this project to learn or start your solution, please give it a star⭐. Thanks!
For Tasks:
Click tags to check more tools for each tasksFor Jobs:
Alternative AI tools for arkflow
Similar Open Source Tools

arkflow
ArkFlow is a high-performance Rust stream processing engine that seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis. It supports multiple input/output sources and processors, enabling easy loading and execution of machine learning models for streaming data and inference, anomaly detection, and complex event processing. The tool is built on Rust and Tokio async runtime, offering excellent performance and low latency. It features built-in SQL queries, Python script, JSON processing, Protobuf encoding/decoding, and batch processing capabilities. ArkFlow is extensible with a modular design, making it easy to extend with new components.

memento-mcp
Memento MCP is a scalable, high-performance knowledge graph memory system designed for LLMs. It offers semantic retrieval, contextual recall, and temporal awareness to any LLM client supporting the model context protocol. The system is built on core concepts like entities and relations, utilizing Neo4j as its storage backend for unified graph and vector search capabilities. With advanced features such as semantic search, temporal awareness, confidence decay, and rich metadata support, Memento MCP provides a robust solution for managing knowledge graphs efficiently and effectively.

mcp-omnisearch
mcp-omnisearch is a Model Context Protocol (MCP) server that acts as a unified gateway to multiple search providers and AI tools. It integrates Tavily, Perplexity, Kagi, Jina AI, Brave, Exa AI, and Firecrawl to offer a wide range of search, AI response, content processing, and enhancement features through a single interface. The server provides powerful search capabilities, AI response generation, content extraction, summarization, web scraping, structured data extraction, and more. It is designed to work flexibly with the API keys available, enabling users to activate only the providers they have keys for and easily add more as needed.

MassGen
MassGen is a cutting-edge multi-agent system that leverages the power of collaborative AI to solve complex tasks. It assigns a task to multiple AI agents who work in parallel, observe each other's progress, and refine their approaches to converge on the best solution to deliver a comprehensive and high-quality result. The system operates through an architecture designed for seamless multi-agent collaboration, with key features including cross-model/agent synergy, parallel processing, intelligence sharing, consensus building, and live visualization. Users can install the system, configure API settings, and run MassGen for various tasks such as question answering, creative writing, research, development & coding tasks, and web automation & browser tasks. The roadmap includes plans for advanced agent collaboration, expanded model, tool & agent integration, improved performance & scalability, enhanced developer experience, and a web interface.

tunacode
TunaCode CLI is an AI-powered coding assistant that provides a command-line interface for developers to enhance their coding experience. It offers features like model selection, parallel execution for faster file operations, and various commands for code management. The tool aims to improve coding efficiency and provide a seamless coding environment for developers.

quantalogic
QuantaLogic is a ReAct framework for building advanced AI agents that seamlessly integrates large language models with a robust tool system. It aims to bridge the gap between advanced AI models and practical implementation in business processes by enabling agents to understand, reason about, and execute complex tasks through natural language interaction. The framework includes features such as ReAct Framework, Universal LLM Support, Secure Tool System, Real-time Monitoring, Memory Management, and Enterprise Ready components.

zotero-mcp
Zotero MCP seamlessly connects your Zotero research library with AI assistants like ChatGPT and Claude via the Model Context Protocol. It offers AI-powered semantic search, access to library content, PDF annotation extraction, and easy updates. Users can search their library, analyze citations, and get summaries, making it ideal for research tasks. The tool supports multiple embedding models, intelligent search results, and flexible access methods for both local and remote collaboration. With advanced features like semantic search and PDF annotation extraction, Zotero MCP enhances research efficiency and organization.

simba
Simba is an open source, portable Knowledge Management System (KMS) designed to seamlessly integrate with any Retrieval-Augmented Generation (RAG) system. It features a modern UI and modular architecture, allowing developers to focus on building advanced AI solutions without the complexities of knowledge management. Simba offers a user-friendly interface to visualize and modify document chunks, supports various vector stores and embedding models, and simplifies knowledge management for developers. It is community-driven, extensible, and aims to enhance AI functionality by providing a seamless integration with RAG-based systems.

mcp-documentation-server
The mcp-documentation-server is a lightweight server application designed to serve documentation files for projects. It provides a simple and efficient way to host and access project documentation, making it easy for team members and stakeholders to find and reference important information. The server supports various file formats, such as markdown and HTML, and allows for easy navigation through the documentation. With mcp-documentation-server, teams can streamline their documentation process and ensure that project information is easily accessible to all involved parties.

llm-context.py
LLM Context is a tool designed to assist developers in quickly injecting relevant content from code/text projects into Large Language Model chat interfaces. It leverages `.gitignore` patterns for smart file selection and offers a streamlined clipboard workflow using the command line. The tool also provides direct integration with Large Language Models through the Model Context Protocol (MCP). LLM Context is optimized for code repositories and collections of text/markdown/html documents, making it suitable for developers working on projects that fit within an LLM's context window. The tool is under active development and aims to enhance AI-assisted development workflows by harnessing the power of Large Language Models.

klavis
Klavis AI is a production-ready solution for managing Multiple Communication Protocol (MCP) servers. It offers self-hosted solutions and a hosted service with enterprise OAuth support. With Klavis AI, users can easily deploy and manage over 50 MCP servers for various services like GitHub, Gmail, Google Sheets, YouTube, Slack, and more. The tool provides instant access to MCP servers, seamless authentication, and integration with AI frameworks, making it ideal for individuals and businesses looking to streamline their communication and data management workflows.

oxylabs-mcp
The Oxylabs MCP Server acts as a bridge between AI models and the web, providing clean, structured data from any site. It enables scraping of URLs, rendering JavaScript-heavy pages, content extraction for AI use, bypassing anti-scraping measures, and accessing geo-restricted web data from 195+ countries. The implementation utilizes the Model Context Protocol (MCP) to facilitate secure interactions between AI assistants and web content. Key features include scraping content from any site, automatic data cleaning and conversion, bypassing blocks and geo-restrictions, flexible setup with cross-platform support, and built-in error handling and request management.

rkllama
RKLLama is a server and client tool designed for running and interacting with LLM models optimized for Rockchip RK3588(S) and RK3576 platforms. It allows models to run on the NPU, with features such as running models on NPU, partial Ollama API compatibility, pulling models from Huggingface, API REST with documentation, dynamic loading/unloading of models, inference requests with streaming modes, simplified model naming, CPU model auto-detection, and optional debug mode. The tool supports Python 3.8 to 3.12 and has been tested on Orange Pi 5 Pro and Orange Pi 5 Plus with specific OS versions.

browser4
Browser4 is a lightning-fast, coroutine-safe browser designed for AI integration with large language models. It offers ultra-fast automation, deep web understanding, and powerful data extraction APIs. Users can automate the browser, extract data at scale, and perform tasks like summarizing products, extracting product details, and finding specific links. The tool is developer-friendly, supports AI-powered automation, and provides advanced features like X-SQL for precise data extraction. It also offers RPA capabilities, browser control, and complex data extraction with X-SQL. Browser4 is suitable for web scraping, data extraction, automation, and AI integration tasks.

swift-ocr-llm-powered-pdf-to-markdown
Swift OCR is a powerful tool for extracting text from PDF files using OpenAI's GPT-4 Turbo with Vision model. It offers flexible input options, advanced OCR processing, performance optimizations, structured output, robust error handling, and scalable architecture. The tool ensures accurate text extraction, resilience against failures, and efficient handling of multiple requests.

generator
ctx is a tool designed to automatically generate organized context files from code files, GitHub repositories, Git commits, web pages, and plain text. It aims to efficiently provide necessary context to AI language models like ChatGPT and Claude, enabling users to streamline code refactoring, multiple iteration development, documentation generation, and seamless AI integration. With ctx, users can create structured markdown documents, save context files, and serve context through an MCP server for real-time assistance. The tool simplifies the process of sharing project information with AI assistants, making AI conversations smarter and easier.
For similar tasks

qdrant
Qdrant is a vector similarity search engine and vector database. It is written in Rust, which makes it fast and reliable even under high load. Qdrant can be used for a variety of applications, including: * Semantic search * Image search * Product recommendations * Chatbots * Anomaly detection Qdrant offers a variety of features, including: * Payload storage and filtering * Hybrid search with sparse vectors * Vector quantization and on-disk storage * Distributed deployment * Highlighted features such as query planning, payload indexes, SIMD hardware acceleration, async I/O, and write-ahead logging Qdrant is available as a fully managed cloud service or as an open-source software that can be deployed on-premises.

SynapseML
SynapseML (previously known as MMLSpark) is an open-source library that simplifies the creation of massively scalable machine learning (ML) pipelines. It provides simple, composable, and distributed APIs for various machine learning tasks such as text analytics, vision, anomaly detection, and more. Built on Apache Spark, SynapseML allows seamless integration of models into existing workflows. It supports training and evaluation on single-node, multi-node, and resizable clusters, enabling scalability without resource wastage. Compatible with Python, R, Scala, Java, and .NET, SynapseML abstracts over different data sources for easy experimentation. Requires Scala 2.12, Spark 3.4+, and Python 3.8+.

mlx-vlm
MLX-VLM is a package designed for running Vision LLMs on Mac systems using MLX. It provides a convenient way to install and utilize the package for processing large language models related to vision tasks. The tool simplifies the process of running LLMs on Mac computers, offering a seamless experience for users interested in leveraging MLX for vision-related projects.

Java-AI-Book-Code
The Java-AI-Book-Code repository contains code examples for the 2020 edition of 'Practical Artificial Intelligence With Java'. It is a comprehensive update of the previous 2013 edition, featuring new content on deep learning, knowledge graphs, anomaly detection, linked data, genetic algorithms, search algorithms, and more. The repository serves as a valuable resource for Java developers interested in AI applications and provides practical implementations of various AI techniques and algorithms.

Awesome-AI-Data-Guided-Projects
A curated list of data science & AI guided projects to start building your portfolio. The repository contains guided projects covering various topics such as large language models, time series analysis, computer vision, natural language processing (NLP), and data science. Each project provides detailed instructions on how to implement specific tasks using different tools and technologies.

awesome-AIOps
awesome-AIOps is a curated list of academic researches and industrial materials related to Artificial Intelligence for IT Operations (AIOps). It includes resources such as competitions, white papers, blogs, tutorials, benchmarks, tools, companies, academic materials, talks, workshops, papers, and courses covering various aspects of AIOps like anomaly detection, root cause analysis, incident management, microservices, dependency tracing, and more.

AI-Security-and-Privacy-Events
AI-Security-and-Privacy-Events is a curated list of academic events focusing on AI security and privacy. It includes seminars, conferences, workshops, tutorials, special sessions, and covers various topics such as NLP & LLM Security, Privacy and Security in ML, Machine Learning Security, AI System with Confidential Computing, Adversarial Machine Learning, and more.

AI_Spectrum
AI_Spectrum is a versatile machine learning library that provides a wide range of tools and algorithms for building and deploying AI models. It offers a user-friendly interface for data preprocessing, model training, and evaluation. With AI_Spectrum, users can easily experiment with different machine learning techniques and optimize their models for various tasks. The library is designed to be flexible and scalable, making it suitable for both beginners and experienced data scientists.
For similar jobs

llmops-promptflow-template
LLMOps with Prompt flow is a template and guidance for building LLM-infused apps using Prompt flow. It provides centralized code hosting, lifecycle management, variant and hyperparameter experimentation, A/B deployment, many-to-many dataset/flow relationships, multiple deployment targets, comprehensive reporting, BYOF capabilities, configuration-based development, local prompt experimentation and evaluation, endpoint testing, and optional Human-in-loop validation. The tool is customizable to suit various application needs.

azure-search-vector-samples
This repository provides code samples in Python, C#, REST, and JavaScript for vector support in Azure AI Search. It includes demos for various languages showcasing vectorization of data, creating indexes, and querying vector data. Additionally, it offers tools like Azure AI Search Lab for experimenting with AI-enabled search scenarios in Azure and templates for deploying custom chat-with-your-data solutions. The repository also features documentation on vector search, hybrid search, creating and querying vector indexes, and REST API references for Azure AI Search and Azure OpenAI Service.

geti-sdk
The Intel® Geti™ SDK is a python package that enables teams to rapidly develop AI models by easing the complexities of model development and enhancing collaboration between teams. It provides tools to interact with an Intel® Geti™ server via the REST API, allowing for project creation, downloading, uploading, deploying for local inference with OpenVINO, setting project and model configuration, launching and monitoring training jobs, and media upload and prediction. The SDK also includes tutorial-style Jupyter notebooks demonstrating its usage.

booster
Booster is a powerful inference accelerator designed for scaling large language models within production environments or for experimental purposes. It is built with performance and scaling in mind, supporting various CPUs and GPUs, including Nvidia CUDA, Apple Metal, and OpenCL cards. The tool can split large models across multiple GPUs, offering fast inference on machines with beefy GPUs. It supports both regular FP16/FP32 models and quantised versions, along with popular LLM architectures. Additionally, Booster features proprietary Janus Sampling for code generation and non-English languages.

xFasterTransformer
xFasterTransformer is an optimized solution for Large Language Models (LLMs) on the X86 platform, providing high performance and scalability for inference on mainstream LLM models. It offers C++ and Python APIs for easy integration, along with example codes and benchmark scripts. Users can prepare models in a different format, convert them, and use the APIs for tasks like encoding input prompts, generating token ids, and serving inference requests. The tool supports various data types and models, and can run in single or multi-rank modes using MPI. A web demo based on Gradio is available for popular LLM models like ChatGLM and Llama2. Benchmark scripts help evaluate model inference performance quickly, and MLServer enables serving with REST and gRPC interfaces.

amazon-transcribe-live-call-analytics
The Amazon Transcribe Live Call Analytics (LCA) with Agent Assist Sample Solution is designed to help contact centers assess and optimize caller experiences in real time. It leverages Amazon machine learning services like Amazon Transcribe, Amazon Comprehend, and Amazon SageMaker to transcribe and extract insights from contact center audio. The solution provides real-time supervisor and agent assist features, integrates with existing contact centers, and offers a scalable, cost-effective approach to improve customer interactions. The end-to-end architecture includes features like live call transcription, call summarization, AI-powered agent assistance, and real-time analytics. The solution is event-driven, ensuring low latency and seamless processing flow from ingested speech to live webpage updates.

ai-lab-recipes
This repository contains recipes for building and running containerized AI and LLM applications with Podman. It provides model servers that serve machine-learning models via an API, allowing developers to quickly prototype new AI applications locally. The recipes include components like model servers and AI applications for tasks such as chat, summarization, object detection, etc. Images for sample applications and models are available in `quay.io`, and bootable containers for AI training on Linux OS are enabled.

XLearning
XLearning is a scheduling platform for big data and artificial intelligence, supporting various machine learning and deep learning frameworks. It runs on Hadoop Yarn and integrates frameworks like TensorFlow, MXNet, Caffe, Theano, PyTorch, Keras, XGBoost. XLearning offers scalability, compatibility, multiple deep learning framework support, unified data management based on HDFS, visualization display, and compatibility with code at native frameworks. It provides functions for data input/output strategies, container management, TensorBoard service, and resource usage metrics display. XLearning requires JDK >= 1.7 and Maven >= 3.3 for compilation, and deployment on CentOS 7.2 with Java >= 1.7 and Hadoop 2.6, 2.7, 2.8.