arkflow
High performance Rust stream processing engine seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis.
Stars: 1249
ArkFlow is a high-performance Rust stream processing engine that seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis. It supports multiple input/output sources and processors, enabling easy loading and execution of machine learning models for streaming data and inference, anomaly detection, and complex event processing. The tool is built on Rust and Tokio async runtime, offering excellent performance and low latency. It features built-in SQL queries, Python script, JSON processing, Protobuf encoding/decoding, and batch processing capabilities. ArkFlow is extensible with a modular design, making it easy to extend with new components.
README:
English | 中文
High performance Rust stream processing engine seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis. It not only supports multiple input/output sources and processors, but also enables easy loading and execution of machine learning models, enabling streaming data and inference, anomaly detection, and complex event processing.
ArkFlow enlisted in the CNCF Cloud Native Landscape.
- High Performance: Built on Rust and Tokio async runtime, offering excellent performance and low latency
- Multiple Data Sources: Support for Kafka, MQTT, HTTP, files, and other input/output sources
- Powerful Processing Capabilities: Built-in SQL queries, Python script, JSON processing, Protobuf encoding/decoding, batch processing, and other processors
- Extensible: Modular design, easy to extend with new input, buffer, output, and processor components
# Clone the repository
git clone https://github.com/arkflow-rs/arkflow.git
cd arkflow
# Build the project
cargo build --release
# Run tests
cargo test- Create a configuration file
config.yaml:
logging:
level: info
streams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1s
batch_size: 10
pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT * FROM flow WHERE value >= 10"
output:
type: "stdout"
error_output:
type: "stdout"- Run ArkFlow:
./target/release/arkflow --config config.yamlArkFlow uses YAML format configuration files, supporting the following main configuration items:
logging:
level: info # Log level: debug, info, warn, error
streams: # Stream definition list
- input: # Input configuration
# ...
pipeline: # Processing pipeline configuration
# ...
output: # Output configuration
# ...
error_output: # Error output configuration
# ...
buffer: # Buffer configuration
# ... ArkFlow supports multiple input sources:
- Kafka: Read data from Kafka topics
- MQTT: Subscribe to messages from MQTT topics
- HTTP: Receive data via HTTP
- File: Reading data from files(Csv,Json, Parquet, Avro, Arrow) using SQL
- Generator: Generate test data
- Database: Query data from databases(MySQL, PostgreSQL, SQLite, Duckdb)
- Nats: Subscribe to messages from Nats topics
- Redis: Subscribe to messages from Redis channels or lists
- Websocket: Subscribe to messages from WebSocket connections
- Modbus: Read data from Modbus devices
Example:
input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group
client_id: arkflow
start_from_latest: trueArkFlow provides multiple data processors:
- JSON: JSON data processing and transformation
- SQL: Process data using SQL queries
- Protobuf: Protobuf encoding/decoding
- Batch Processing: Process messages in batches
- Vrl: Process data using VRL
Example:
pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value >= 10"ArkFlow supports multiple output targets:
- Kafka: Write data to Kafka topics
- MQTT: Publish messages to MQTT topics
- HTTP: Send data via HTTP
- Standard Output: Output data to the console
- Drop: Discard data
- Nats: Publish messages to Nats topics
Example:
output:
type: kafka
brokers:
- localhost:9092
topic:
type: value
value:
type: value
value: test-topic
client_id: arkflow-producerArkFlow supports multiple error output targets:
- Kafka: Write error data to Kafka topics
- MQTT: Publish error messages to MQTT topics
- HTTP: Send error data via HTTP
- Standard Output: Output error data to the console
- Drop: Discard error data
- Nats: Publish messages to Nats topics
Example:
error_output:
type: kafka
brokers:
- localhost:9092
topic:
type: value
value: error-topic
client_id: error-arkflow-producerArkFlow provides buffer capabilities to handle backpressure and temporary storage of messages:
- Memory Buffer: Memory buffer, for high-throughput scenarios and window aggregation.
- Session Window: The Session Window buffer component provides a session-based message grouping mechanism where messages are grouped based on activity gaps. It implements a session window that closes after a configurable period of inactivity.
- Sliding Window: The Sliding Window buffer component provides a time-based windowing mechanism for processing message batches. It implements a sliding window algorithm with configurable window size, slide interval and slide size.
- Tumbling Window: The Tumbling Window buffer component provides a fixed-size, non-overlapping windowing mechanism for processing message batches. It implements a tumbling window algorithm with configurable interval settings.
Example:
buffer:
type: memory
capacity: 10000 # Maximum number of messages to buffer
timeout: 10s # Maximum time to buffer messagesstreams:
- input:
type: kafka
brokers:
- localhost:9092
topics:
- test-topic
consumer_group: test-group
pipeline:
thread_num: 4
processors:
- type: json_to_arrow
- type: sql
query: "SELECT * FROM flow WHERE value > 100"
output:
type: kafka
brokers:
- localhost:9092
topic:
type: value
value: test-topicstreams:
- input:
type: "generate"
context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
interval: 1ms
batch_size: 10000
pipeline:
thread_num: 4
processors:
- type: "json_to_arrow"
- type: "sql"
query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor"
output:
type: "stdout"- Conalog(Country: South Korea)
ArkFlow is licensed under the Apache License 2.0.
Discord: https://discord.gg/CwKhzb8pux
If you like or are using this project to learn or start your solution, please give it a star⭐. Thanks!
For Tasks:
Click tags to check more tools for each tasksFor Jobs:
Alternative AI tools for arkflow
Similar Open Source Tools
arkflow
ArkFlow is a high-performance Rust stream processing engine that seamlessly integrates AI capabilities, providing powerful real-time data processing and intelligent analysis. It supports multiple input/output sources and processors, enabling easy loading and execution of machine learning models for streaming data and inference, anomaly detection, and complex event processing. The tool is built on Rust and Tokio async runtime, offering excellent performance and low latency. It features built-in SQL queries, Python script, JSON processing, Protobuf encoding/decoding, and batch processing capabilities. ArkFlow is extensible with a modular design, making it easy to extend with new components.
memento-mcp
Memento MCP is a scalable, high-performance knowledge graph memory system designed for LLMs. It offers semantic retrieval, contextual recall, and temporal awareness to any LLM client supporting the model context protocol. The system is built on core concepts like entities and relations, utilizing Neo4j as its storage backend for unified graph and vector search capabilities. With advanced features such as semantic search, temporal awareness, confidence decay, and rich metadata support, Memento MCP provides a robust solution for managing knowledge graphs efficiently and effectively.
mcp-omnisearch
mcp-omnisearch is a Model Context Protocol (MCP) server that acts as a unified gateway to multiple search providers and AI tools. It integrates Tavily, Perplexity, Kagi, Jina AI, Brave, Exa AI, and Firecrawl to offer a wide range of search, AI response, content processing, and enhancement features through a single interface. The server provides powerful search capabilities, AI response generation, content extraction, summarization, web scraping, structured data extraction, and more. It is designed to work flexibly with the API keys available, enabling users to activate only the providers they have keys for and easily add more as needed.
mcp-documentation-server
The mcp-documentation-server is a lightweight server application designed to serve documentation files for projects. It provides a simple and efficient way to host and access project documentation, making it easy for team members and stakeholders to find and reference important information. The server supports various file formats, such as markdown and HTML, and allows for easy navigation through the documentation. With mcp-documentation-server, teams can streamline their documentation process and ensure that project information is easily accessible to all involved parties.
ai-counsel
AI Counsel is a true deliberative consensus MCP server where AI models engage in actual debate, refine positions across multiple rounds, and converge with voting and confidence levels. It features two modes (quick and conference), mixed adapters (CLI tools and HTTP services), auto-convergence, structured voting, semantic grouping, model-controlled stopping, evidence-based deliberation, local model support, data privacy, context injection, semantic search, fault tolerance, and full transcripts. Users can run local and cloud models to deliberate on various questions, ground decisions in reality by querying code and files, and query past decisions for analysis. The tool is designed for critical technical decisions requiring multi-model deliberation and consensus building.
llamactl
llamactl is a tool for unified management and routing of llama.cpp, MLX, and vLLM models with a web dashboard. It offers easy model management with built-in model downloader, dynamic multi-model instances, smart resource management, and a modern React UI dashboard. It provides flexible integration with API compatibility for OpenAI chat completions and resources endpoints, multi-backend support, and Docker readiness. The tool supports distributed deployment with remote instances and central management. Users can quickly start by installing a backend, downloading llamactl, creating an instance, and starting inferencing.
crawl4ai
Crawl4AI is a powerful and free web crawling service that extracts valuable data from websites and provides LLM-friendly output formats. It supports crawling multiple URLs simultaneously, replaces media tags with ALT, and is completely free to use and open-source. Users can integrate Crawl4AI into Python projects as a library or run it as a standalone local server. The tool allows users to crawl and extract data from specified URLs using different providers and models, with options to include raw HTML content, force fresh crawls, and extract meaningful text blocks. Configuration settings can be adjusted in the `crawler/config.py` file to customize providers, API keys, chunk processing, and word thresholds. Contributions to Crawl4AI are welcome from the open-source community to enhance its value for AI enthusiasts and developers.
FDAbench
FDABench is a benchmark tool designed for evaluating data agents' reasoning ability over heterogeneous data in analytical scenarios. It offers 2,007 tasks across various data sources, domains, difficulty levels, and task types. The tool provides ready-to-use data agent implementations, a DAG-based evaluation system, and a framework for agent-expert collaboration in dataset generation. Key features include data agent implementations, comprehensive evaluation metrics, multi-database support, different task types, extensible framework for custom agent integration, and cost tracking. Users can set up the environment using Python 3.10+ on Linux, macOS, or Windows. FDABench can be installed with a one-command setup or manually. The tool supports API configuration for LLM access and offers quick start guides for database download, dataset loading, and running examples. It also includes features like dataset generation using the PUDDING framework, custom agent integration, evaluation metrics like accuracy and rubric score, and a directory structure for easy navigation.
MassGen
MassGen is a cutting-edge multi-agent system that leverages the power of collaborative AI to solve complex tasks. It assigns a task to multiple AI agents who work in parallel, observe each other's progress, and refine their approaches to converge on the best solution to deliver a comprehensive and high-quality result. The system operates through an architecture designed for seamless multi-agent collaboration, with key features including cross-model/agent synergy, parallel processing, intelligence sharing, consensus building, and live visualization. Users can install the system, configure API settings, and run MassGen for various tasks such as question answering, creative writing, research, development & coding tasks, and web automation & browser tasks. The roadmap includes plans for advanced agent collaboration, expanded model, tool & agent integration, improved performance & scalability, enhanced developer experience, and a web interface.
simba
Simba is an open source, portable Knowledge Management System (KMS) designed to seamlessly integrate with any Retrieval-Augmented Generation (RAG) system. It features a modern UI and modular architecture, allowing developers to focus on building advanced AI solutions without the complexities of knowledge management. Simba offers a user-friendly interface to visualize and modify document chunks, supports various vector stores and embedding models, and simplifies knowledge management for developers. It is community-driven, extensible, and aims to enhance AI functionality by providing a seamless integration with RAG-based systems.
sdg_hub
sdg_hub is a modular Python framework designed for building synthetic data generation pipelines using composable blocks and flows. Users can mix and match LLM-powered and traditional processing blocks to create sophisticated data generation workflows. The toolkit offers features such as modular composability, async performance, built-in validation, auto-discovery, rich monitoring, dataset schema discovery, and easy extensibility. sdg_hub provides detailed documentation and supports high-throughput processing with error handling. It simplifies the process of transforming datasets by allowing users to chain blocks together in YAML-configured flows, enabling the creation of complex data generation pipelines.
routilux
Routilux is a powerful event-driven workflow orchestration framework designed for building complex data pipelines and workflows effortlessly. It offers features like event queue architecture, flexible connections, built-in state management, robust error handling, concurrent execution, persistence & recovery, and simplified API. Perfect for tasks such as data pipelines, API orchestration, event processing, workflow automation, microservices coordination, and LLM agent workflows.
pentagi
PentAGI is an innovative tool for automated security testing that leverages cutting-edge artificial intelligence technologies. It is designed for information security professionals, researchers, and enthusiasts who need a powerful and flexible solution for conducting penetration tests. The tool provides secure and isolated operations in a sandboxed Docker environment, fully autonomous AI-powered agent for penetration testing steps, a suite of 20+ professional security tools, smart memory system for storing research results, web intelligence for gathering information, integration with external search systems, team delegation system, comprehensive monitoring and reporting, modern interface, API integration, persistent storage, scalable architecture, self-hosted solution, flexible authentication, and quick deployment through Docker Compose.
rkllama
RKLLama is a server and client tool designed for running and interacting with LLM models optimized for Rockchip RK3588(S) and RK3576 platforms. It allows models to run on the NPU, with features such as running models on NPU, partial Ollama API compatibility, pulling models from Huggingface, API REST with documentation, dynamic loading/unloading of models, inference requests with streaming modes, simplified model naming, CPU model auto-detection, and optional debug mode. The tool supports Python 3.8 to 3.12 and has been tested on Orange Pi 5 Pro and Orange Pi 5 Plus with specific OS versions.
TranslateBookWithLLM
TranslateBookWithLLM is a Python application designed for large-scale text translation, such as entire books (.EPUB), subtitle files (.SRT), and plain text. It leverages local LLMs via the Ollama API or Gemini API. The tool offers both a web interface for ease of use and a command-line interface for advanced users. It supports multiple format translations, provides a user-friendly browser-based interface, CLI support for automation, multiple LLM providers including local Ollama models and Google Gemini API, and Docker support for easy deployment.
flo-ai
Flo AI is a Python framework that enables users to build production-ready AI agents and teams with minimal code. It allows users to compose complex AI architectures using pre-built components while maintaining the flexibility to create custom components. The framework supports composable, production-ready, YAML-first, and flexible AI systems. Users can easily create AI agents and teams, manage teams of AI agents working together, and utilize built-in support for Retrieval-Augmented Generation (RAG) and compatibility with Langchain tools. Flo AI also provides tools for output parsing and formatting, tool logging, data collection, and JSON output collection. It is MIT Licensed and offers detailed documentation, tutorials, and examples for AI engineers and teams to accelerate development, maintainability, scalability, and testability of AI systems.
For similar tasks
qdrant
Qdrant is a vector similarity search engine and vector database. It is written in Rust, which makes it fast and reliable even under high load. Qdrant can be used for a variety of applications, including: * Semantic search * Image search * Product recommendations * Chatbots * Anomaly detection Qdrant offers a variety of features, including: * Payload storage and filtering * Hybrid search with sparse vectors * Vector quantization and on-disk storage * Distributed deployment * Highlighted features such as query planning, payload indexes, SIMD hardware acceleration, async I/O, and write-ahead logging Qdrant is available as a fully managed cloud service or as an open-source software that can be deployed on-premises.
SynapseML
SynapseML (previously known as MMLSpark) is an open-source library that simplifies the creation of massively scalable machine learning (ML) pipelines. It provides simple, composable, and distributed APIs for various machine learning tasks such as text analytics, vision, anomaly detection, and more. Built on Apache Spark, SynapseML allows seamless integration of models into existing workflows. It supports training and evaluation on single-node, multi-node, and resizable clusters, enabling scalability without resource wastage. Compatible with Python, R, Scala, Java, and .NET, SynapseML abstracts over different data sources for easy experimentation. Requires Scala 2.12, Spark 3.4+, and Python 3.8+.
mlx-vlm
MLX-VLM is a package designed for running Vision LLMs on Mac systems using MLX. It provides a convenient way to install and utilize the package for processing large language models related to vision tasks. The tool simplifies the process of running LLMs on Mac computers, offering a seamless experience for users interested in leveraging MLX for vision-related projects.
Java-AI-Book-Code
The Java-AI-Book-Code repository contains code examples for the 2020 edition of 'Practical Artificial Intelligence With Java'. It is a comprehensive update of the previous 2013 edition, featuring new content on deep learning, knowledge graphs, anomaly detection, linked data, genetic algorithms, search algorithms, and more. The repository serves as a valuable resource for Java developers interested in AI applications and provides practical implementations of various AI techniques and algorithms.
Awesome-AI-Data-Guided-Projects
A curated list of data science & AI guided projects to start building your portfolio. The repository contains guided projects covering various topics such as large language models, time series analysis, computer vision, natural language processing (NLP), and data science. Each project provides detailed instructions on how to implement specific tasks using different tools and technologies.
awesome-AIOps
awesome-AIOps is a curated list of academic researches and industrial materials related to Artificial Intelligence for IT Operations (AIOps). It includes resources such as competitions, white papers, blogs, tutorials, benchmarks, tools, companies, academic materials, talks, workshops, papers, and courses covering various aspects of AIOps like anomaly detection, root cause analysis, incident management, microservices, dependency tracing, and more.
AI-Security-and-Privacy-Events
AI-Security-and-Privacy-Events is a curated list of academic events focusing on AI security and privacy. It includes seminars, conferences, workshops, tutorials, special sessions, and covers various topics such as NLP & LLM Security, Privacy and Security in ML, Machine Learning Security, AI System with Confidential Computing, Adversarial Machine Learning, and more.
AI_Spectrum
AI_Spectrum is a versatile machine learning library that provides a wide range of tools and algorithms for building and deploying AI models. It offers a user-friendly interface for data preprocessing, model training, and evaluation. With AI_Spectrum, users can easily experiment with different machine learning techniques and optimize their models for various tasks. The library is designed to be flexible and scalable, making it suitable for both beginners and experienced data scientists.
For similar jobs
llmops-promptflow-template
LLMOps with Prompt flow is a template and guidance for building LLM-infused apps using Prompt flow. It provides centralized code hosting, lifecycle management, variant and hyperparameter experimentation, A/B deployment, many-to-many dataset/flow relationships, multiple deployment targets, comprehensive reporting, BYOF capabilities, configuration-based development, local prompt experimentation and evaluation, endpoint testing, and optional Human-in-loop validation. The tool is customizable to suit various application needs.
azure-search-vector-samples
This repository provides code samples in Python, C#, REST, and JavaScript for vector support in Azure AI Search. It includes demos for various languages showcasing vectorization of data, creating indexes, and querying vector data. Additionally, it offers tools like Azure AI Search Lab for experimenting with AI-enabled search scenarios in Azure and templates for deploying custom chat-with-your-data solutions. The repository also features documentation on vector search, hybrid search, creating and querying vector indexes, and REST API references for Azure AI Search and Azure OpenAI Service.
geti-sdk
The Intel® Geti™ SDK is a python package that enables teams to rapidly develop AI models by easing the complexities of model development and enhancing collaboration between teams. It provides tools to interact with an Intel® Geti™ server via the REST API, allowing for project creation, downloading, uploading, deploying for local inference with OpenVINO, setting project and model configuration, launching and monitoring training jobs, and media upload and prediction. The SDK also includes tutorial-style Jupyter notebooks demonstrating its usage.
booster
Booster is a powerful inference accelerator designed for scaling large language models within production environments or for experimental purposes. It is built with performance and scaling in mind, supporting various CPUs and GPUs, including Nvidia CUDA, Apple Metal, and OpenCL cards. The tool can split large models across multiple GPUs, offering fast inference on machines with beefy GPUs. It supports both regular FP16/FP32 models and quantised versions, along with popular LLM architectures. Additionally, Booster features proprietary Janus Sampling for code generation and non-English languages.
xFasterTransformer
xFasterTransformer is an optimized solution for Large Language Models (LLMs) on the X86 platform, providing high performance and scalability for inference on mainstream LLM models. It offers C++ and Python APIs for easy integration, along with example codes and benchmark scripts. Users can prepare models in a different format, convert them, and use the APIs for tasks like encoding input prompts, generating token ids, and serving inference requests. The tool supports various data types and models, and can run in single or multi-rank modes using MPI. A web demo based on Gradio is available for popular LLM models like ChatGLM and Llama2. Benchmark scripts help evaluate model inference performance quickly, and MLServer enables serving with REST and gRPC interfaces.
amazon-transcribe-live-call-analytics
The Amazon Transcribe Live Call Analytics (LCA) with Agent Assist Sample Solution is designed to help contact centers assess and optimize caller experiences in real time. It leverages Amazon machine learning services like Amazon Transcribe, Amazon Comprehend, and Amazon SageMaker to transcribe and extract insights from contact center audio. The solution provides real-time supervisor and agent assist features, integrates with existing contact centers, and offers a scalable, cost-effective approach to improve customer interactions. The end-to-end architecture includes features like live call transcription, call summarization, AI-powered agent assistance, and real-time analytics. The solution is event-driven, ensuring low latency and seamless processing flow from ingested speech to live webpage updates.
ai-lab-recipes
This repository contains recipes for building and running containerized AI and LLM applications with Podman. It provides model servers that serve machine-learning models via an API, allowing developers to quickly prototype new AI applications locally. The recipes include components like model servers and AI applications for tasks such as chat, summarization, object detection, etc. Images for sample applications and models are available in `quay.io`, and bootable containers for AI training on Linux OS are enabled.
XLearning
XLearning is a scheduling platform for big data and artificial intelligence, supporting various machine learning and deep learning frameworks. It runs on Hadoop Yarn and integrates frameworks like TensorFlow, MXNet, Caffe, Theano, PyTorch, Keras, XGBoost. XLearning offers scalability, compatibility, multiple deep learning framework support, unified data management based on HDFS, visualization display, and compatibility with code at native frameworks. It provides functions for data input/output strategies, container management, TensorBoard service, and resource usage metrics display. XLearning requires JDK >= 1.7 and Maven >= 3.3 for compilation, and deployment on CentOS 7.2 with Java >= 1.7 and Hadoop 2.6, 2.7, 2.8.