simulflow

simulflow

A Clojure library for building real-time voice-enabled AI pipelines. Simulflow handles the orchestration of speech recognition, audio processing, and AI service integration with the elegance of functional programming.

Stars: 87

Visit
 screenshot

Simulflow is a Clojure framework for building real-time voice-enabled AI applications using a data-driven, functional approach. It provides a composable pipeline architecture for processing audio, text, and AI interactions with built-in support for major AI providers. The framework uses processors that communicate through specialized frames to create voice-enabled AI agents, allowing for mental multitasking and rational thought. Simulflow offers a flow-based architecture, data-first design, streaming architecture, extensibility, flexible frame system, and built-in services for seamless integration with major AI providers. Users can easily swap components, add new functionality, or debug individual stages without affecting the entire system.

README:

Simulflow - Build realtime voice-enabled AI agents in a data centric way

Daydreaming is the first awakening of what we call simulflow. It is an essential tool of rational thought. With it you can clear the mind for better thinking. – Frank Herbert, Heretics of Dune

Bene Gesserit also have the ability to practice simulflow, literally the simultaneous flow of several threads of consciousness at any given time; mental multitasking, as it were. The combination of simulflow with their analytical abilities and Other Memory is responsible for the frightening intelligence of the average Bene Gesserit.

Simulflow, Dune Wiki

Clojars Project

simulflow is a Clojure framework for building real-time voice-enabled AI applications using a data-driven, functional approach. Built on top of clojure.core.async.flow, it provides a composable pipeline architecture for processing audio, text, and AI interactions with built-in support for major AI providers.

[!WARNING] While Simulflow has been used in live, production applications - it's still under active development. Expect breaking changes to support new usecases

What is this?

Simulflow is a framework that uses processors that communicate through specialized frames to create voice-enabled AI agents. Think of it as a data pipeline where each component transforms typed messages:

Microphone Transport → (audio-in frames) → Transcriptor → (transcription frames) → 
Context Aggregation → (context to LLM) → LLM → (streams response) → 
Text Assembler → (sentence frames) → Text-to-Speech → (audio-out frames) → 
Audio Splitter → (chunked audio) → Speaker Transport

This pipeline approach makes it easy to swap components, add new functionality, or debug individual stages without affecting the entire system.

Table of Contents

Installation

Clojure CLI/deps.edn

;; Add to your deps.edn
{:deps {com.shipclojure/simulflow {:mvn/version "0.1.8-alpha"}}}

Leiningen/Boot

;; Add to your project.clj
[com.shipclojure/simulflow "0.1.8-alpha"]

Maven

<dependency>
  <groupId>com.shipclojure</groupId>
  <artifactId>simulflow</artifactId>
  <version>0.1.8-alpha</version>
</dependency>

Requirements

  • Java 21+ - Required for virtual threads (Project Loom) support. If your java version doesn't support virtual threads, simulflow defaults to using normal threads.
  • Clojure 1.12+ - For core.async.flow and other modern Clojure features

Video presentation:

Watch the video

Core Features

  • Flow-Based Architecture: Built on core.async.flow for robust concurrent processing
  • Data-First Design: Define AI pipelines as data structures for easy configuration and modification
  • Streaming Architecture: Efficient real-time audio and text processing
  • Extensible: Seamless to add new processors to embed into AI flows
  • Flexible Frame System: Type-safe message passing between pipeline components
  • Built-in Services: Ready-to-use integrations with major AI providers

Quick Start Example

First, create a resources/secrets.edn:

{:deepgram {:api-key ""}
 :elevenlabs {:api-key ""
              :voice-id ""}
 :groq {:api-key ""}
 :openai {:new-api-sk ""}}

Obtain the API keys from the respective providers and fill in the blank values.

Start a REPL and evaluate the snippets in the (comment ...) blocks to start the flows. Allow Microphone access when prompted.

(ns simulflow-examples.local
  {:clj-reload/no-unload true}
  (:require
   [clojure.core.async :as a]
   [clojure.core.async.flow :as flow]
   [simulflow.async :refer [vthread-loop]]
   [simulflow.processors.activity-monitor :as activity-monitor]
   [simulflow.processors.deepgram :as deepgram]
   [simulflow.processors.elevenlabs :as xi]
   [simulflow.processors.llm-context-aggregator :as context]
   [simulflow.processors.openai :as openai]
   [simulflow.secrets :refer [secret]]
   [simulflow.transport :as transport]
   [simulflow.transport.in :as transport-in]
   [simulflow.transport.out :as transport-out]
   [simulflow.utils.core :as u]
   [simulflow.vad.silero :as silero]
   [taoensso.telemere :as t]))

(defn make-local-flow
  "This example showcases a voice AI agent for the local computer."
  ([] (make-local-flow {}))
  ([{:keys [llm-context extra-procs extra-conns debug? vad-analyser
            language chunk-duration-ms]
     :or {llm-context {:messages
                       [{:role "system"
                         :content "You are a voice agent operating via phone. Be
                       concise in your answers. The input you receive comes from a
                       speech-to-text (transcription) system that isn't always
                       efficient and may send unclear text. Ask for
                       clarification when you're unsure what the person said."}]}

          language :en
          debug? false
          chunk-duration-ms 20
          extra-procs {}
          extra-conns []}}]

   (flow/create-flow
     {:procs
      (u/deep-merge
        {;; Capture audio from microphone and send raw-audio-input frames
         :transport-in {:proc transport-in/microphone-transport-in
                        :args {:vad/analyser vad-analyser}}
         ;; raw-audio-input -> transcription frames
         :transcriptor {:proc deepgram/deepgram-processor
                        :args {:transcription/api-key (secret [:deepgram :api-key])
                               :transcription/interim-results? true
                               :transcription/punctuate? false
                               :transcription/vad-events? false
                               :transcription/smart-format? true
                               :transcription/model :nova-2
                               :transcription/utterance-end-ms 1000
                               :transcription/language language}}

         ;; user transcription & llm message frames -> llm-context frames
         :context-aggregator  {:proc context/context-aggregator
                               :args {:llm/context llm-context
                                      :aggregator/debug? debug?}}

         ;; Takes llm-context frames and produces llm-text-chunk & llm-tool-call-chunk frames
         :llm {:proc openai/openai-llm-process
               :args {:openai/api-key (secret [:openai :new-api-sk])
                      :llm/model "gpt-4o-mini"}}

         ;; llm-text-chunk & llm-tool-call-chunk -> llm-context-messages-append frames
         :assistant-context-assembler {:proc context/assistant-context-assembler
                                       :args {:debug? debug?}}

         ;; llm-text-chunk -> sentence speak frames (faster for text to speech)
         :llm-sentence-assembler {:proc context/llm-sentence-assembler}

         ;; speak-frames -> audio-output-raw frames
         :tts {:proc xi/elevenlabs-tts-process
               :args {:elevenlabs/api-key (secret [:elevenlabs :api-key])
                      :elevenlabs/model-id "eleven_flash_v2_5"
                      :elevenlabs/voice-id (secret [:elevenlabs :voice-id])
                      :voice/stability 0.5
                      :voice/similarity-boost 0.8
                      :voice/use-speaker-boost? true
                      :pipeline/language language}}

         ;; audio-output-raw -> smaller audio-output-raw frames for realtime
         :audio-splitter {:proc transport/audio-splitter
                          :args {:audio.out/duration-ms chunk-duration-ms}}

         ;; speakers out
         :transport-out {:proc transport-out/realtime-speakers-out-processor
                         :args {:audio.out/sending-interval chunk-duration-ms
                                :audio.out/duration-ms chunk-duration-ms}}

         :activity-monitor {:proc activity-monitor/process
                            :args {::activity-monitor/timeout-ms 5000}}}
        extra-procs)
      :conns (concat
               [[[:transport-in :out] [:transcriptor :in]]

                [[:transcriptor :out] [:context-aggregator :in]]
                [[:transport-in :sys-out] [:context-aggregator :sys-in]]
                [[:context-aggregator :out] [:llm :in]]

                ;; Aggregate full context
                [[:llm :out] [:assistant-context-assembler :in]]
                [[:assistant-context-assembler :out] [:context-aggregator :in]]

                ;; Assemble sentence by sentence for fast speech
                [[:llm :out] [:llm-sentence-assembler :in]]
                [[:llm-sentence-assembler :out] [:tts :in]]

                [[:tts :out] [:audio-splitter :in]]
                [[:audio-splitter :out] [:transport-out :in]]

                ;; Activity detection
                [[:transport-out :sys-out] [:activity-monitor :sys-in]]
                [[:transport-in :sys-out] [:activity-monitor :sys-in]]
                [[:transcriptor :sys-out] [:activity-monitor :sys-in]]
                [[:activity-monitor :out] [:context-aggregator :in]]
                [[:activity-monitor :out] [:tts :in]]]
               extra-conns)})))

(comment
  (def local-ai (make-local-flow {:vad-analyser (silero/create-silero-vad)}))

  ;; Start local ai flow - starts paused
  (let [{:keys [report-chan error-chan]} (flow/start local-ai)]
    ;; Resume local ai -> you can now speak with the AI
    (flow/resume local-ai)
    (vthread-loop []
      (when-let [[msg c] (a/alts!! [report-chan error-chan])]
        (when (map? msg)
          (t/log! (cond-> {:level :debug :id (if (= c error-chan) :error :report)}
                    (= c error-chan) (assoc :error msg)) msg))
        (recur))))

  ;; Stop the conversation
  (flow/stop local-ai)

  ,)

Which roughly translates to:

Flow Diagram

Examples

For more complete examples and use cases, see the examples directory:

  • Local Example - Complete voice AI agent using microphone and speakers
  • Twilio WebSocket - Telephony integration for phone-based voice AI
  • Transport Examples - Different audio input/output configurations

These examples show real-world usage patterns and can be used as starting points for your own applications.

Supported Providers

Text-to-Speech (TTS)

  • ElevenLabs
    • Models: eleven_multilingual_v2, eleven_turbo_v2, eleven_flash_v2 and more.
    • Features: Real-time streaming, multiple voices, multilingual support

Speech-to-Text (STT)

  • Deepgram
    • Models: nova-2, nova-2-general, nova-2-meeting and more.
    • Features: Real-time transcription, punctuation, smart formatting

Text Based Large Language Models (LLM)

  • OpenAI
    • Models: gpt-4o-mini(fastest, cheapest), gpt-4, gpt-3.5-turbo and more
    • Features: Function calling, streaming responses
  • Google
    • Models: gemini-2.0-flash(fastest, cheapest), gemini-2.5-flash, and more
    • Features: Function calling, streaming responses, thinking
  • Groq
    • Models: llama-3.2-3b-preview llama-3.1-8b-instant llama-3.3-70b-versatile etc
    • Features: Function calling, streaming responses, thinking

Key Concepts

Flows

The core building block of simulflow pipelines:

  • Composed of processes connected by channels
  • Processes can be:
    • Input/output handlers
    • AI service integrations
    • Data transformers
  • Managed by core.async.flow for lifecycle control

Transport

The modality through which audio comes and goes from the voice ai pipeline. Example transport modalities:

  • local (microphone + speakers)
  • telephony (twilio through websocket)
  • webRTC (browser support) - TODO
  • async (through in & out core async channels)

You will see processors like :transport-in & :transport-out

Frames

The basic unit of data flow, representing typed messages like:

  • :simulflow.frame/audio-input-raw - Raw audio data from input transport
  • :simulflow.frame/transcription-result - Transcribed text from speech-to-text
  • :simulflow.frame/llm-text-chunk - LLM response text chunks
  • :simulflow.frame/llm-tool-call-chunk - LLM tool call request chunks
  • :simulflow.frame/audio-output-raw - Raw audio data for playback
  • :simulflow.frame/speak-frame - Text for TTS processing
  • :simulflow.frame/user-speech-start, :simulflow.frame/user-speech-stop - User speech events
  • :simulflow.frame/bot-speech-start, :simulflow.frame/bot-speech-stop - Bot speech events
  • :simulflow.frame/system-start, :simulflow.frame/system-stop - System control signals

Each frame has a type and optionally a schema for the data contained in it.

Frame Schema Validation (Development Only)

For development and debugging, you can enable frame schema validation to catch invalid frame data early. This should only be used during development as it adds runtime overhead:

# Enable frame schema checking via JVM property
clojure -J-Dsimulflow.frame.schema-checking=true -M:dev your-namespace

# Or add to your deps.edn :dev alias
{:aliases
 {:dev {:jvm-opts ["-Dsimulflow.frame.schema-checking=true"]
        ...}}}

When enabled, creating frames with invalid data will throw exceptions with detailed error messages:

;; This will throw if schema checking is enabled and data is invalid
(frame/audio-input-raw "invalid-data") ; Should be byte array
;; => ex-info "Invalid frame data" {...}

Warning: Never enable schema checking in production as it significantly impacts performance.

Creating Custom Frames

Simulflow provides the defframe macro to easily define new frame types with automatic validation and helper functions.

The defframe Macro

The defframe macro creates three things for each frame type:

  1. Frame Creator Function - Creates frames with optional timestamp
  2. Frame Predicate Function - Tests if a value is a frame of that type
  3. Frame Schema - Malli schema for validation
(ns my.custom.frames
  (:require [simulflow.frame :refer [defframe]]))

;; Define a custom frame type
(defframe custom-data
  "Frame containing custom application data"
  {:type ::custom-data-frame
   :schema [:map
            [:user-id :string]
            [:action [:enum :create :update :delete]]
            [:payload :any]]})

This generates:

;; 1. Frame creator function (supports both arities)
(custom-data {:user-id "123" :action :create :payload {...}})
(custom-data {:user-id "123" :action :create :payload {...}} {:timestamp 1640995200000})

;; 2. Frame predicate function
(custom-data? some-frame) ;; => true/false

;; 3. Frame schema (for advanced validation)
custom-data-schema ;; => Malli schema definition
Frame Creation Options

Frame creator functions support flexible timestamp handling:

;; Use current timestamp (default)
(custom-data {:user-id "123" :action :create :payload data})

;; Explicit timestamp as milliseconds
(custom-data data {:timestamp 1640995200000})

;; Explicit timestamp as java.util.Date
(custom-data data {:timestamp #inst "2022-01-01T00:00:00.000Z"})
Frame Structure

All frames have a consistent structure:

{:frame/type ::custom-data-frame           ; Frame type keyword
 :frame/data {:user-id "123" :action :create} ; Your data
 :frame/ts #inst "2022-01-01T00:00:00.000Z"   ; Timestamp
 ;; Plus metadata: ^{:type :simulflow.frame/frame}
}
Usage in Processors

Use your custom frames in processor transform functions:

(defn my-processor-transform [state input-port data]
  (cond
    (custom-data? data)
    (let [user-id (get-in data [:frame/data :user-id])]
      [state {:out [(frame/system-start true)]}])

    :else [state {}]))
Schema Validation

When frame schema checking is enabled, invalid data will be caught automatically:

;; This will throw if schema checking is enabled
(custom-data {:user-id 123 :action :invalid}) ; user-id should be string, action invalid
;; => ex-info "Invalid frame data" {:error {...}}

See frame.clj for all possible frames.

Processes

Components that transform frames:

  • Define input/output requirements
  • Can maintain state
  • Use core.async for async processing
  • Implement the flow/process protocol

Adding Custom Processes

    (defn custom-processor []
      (flow/process
        {:describe (fn [] {:ins {:in "Input channel"}
                           :outs {:out "Output channel"}})
         :init identity
         :transform (fn [state in msg]
                      [state {:out [(process-message msg)]}])}))

Read core.async.flow docs for more information about flow precesses.

Modular Processor Functions

Simulflow processors are designed for modularity and reuse. Each processor can expose its core functionality as multi-arity functions that can be used independently or composed into custom processors.

Multi-Arity Function Pattern

Processors follow a standard multi-arity pattern that maps directly to core.async.flow lifecycle:

(defn processor-fn
  ([] {:ins {:in "Description"} :outs {:out "Description"} :params {...}})  ; 0-arity: describe
  ([config] {...})                                                          ; 1-arity: init
  ([state transition] {...})                                                ; 2-arity: transition
  ([state input-port data] [state {...}]))                                  ; 3-arity: transform

Example: Reusing Transport Functions

Here's how you can reuse transport processor functions in your own custom processors:

(ns my-cool-processor
  (:require [simulflow.transport :as transport]
            [simulflow.frame :as frame]
            [simulflow.utils.audio :as audio]))

(defn mic-transport-fn
  "Custom microphone transport with audio processing"
  ([] (transport/mic-transport-in-describe))
  ([params] (transport/mic-transport-in-init! params))
  ([state transition]
   (transport/mic-transport-in-transition state transition))

  ;; Custom transform with audio processing
  ([state _ {:keys [audio-data timestamp]}]
   (let [processed-audio (audio/apply-noise-reduction audio-data)
         float-audio (PCMConverter/convertToFloat32Buffer processed-audio)]
     [state {:out [(frame/audio-input-raw float-audio {:timestamp timestamp})]}])))

;; Use in a flow
(def my-flow
  (flow/create-flow
    {:procs {:custom-mic {:proc (flow/process mic-transport-fn)
                          :args {:audio-in/sample-rate 16000}}}
     :conns [[:custom-mic :out] [:next-processor :in]]}))

Composing Processor Logic

You can also compose transform logic from multiple processors:

(defn hybrid-processor-fn
  ([] {:ins {:in "Mixed input"} :outs {:out "Processed output"}})
  ([params] {:config params})
  ([state transition] (when (= transition :stop) (cleanup state)))

  ([state input-port data]
   (cond
     ;; Handle audio using transport transform
     (frame/audio-input-raw? data)
     (transport/mic-transport-transform state input-port data)

     ;; Handle text using LLM transform
     (frame/llm-context? data)
     (openai/transform state input-port data)

     ;; Custom handling for other frames
     :else
     [state {:out [(custom-transform data)]}])))

Benefits of Modular Functions

  • Reusability: Use processor logic across different flows
  • Testability: Test individual transform functions in isolation
  • Composability: Mix and match functionality from different processors
  • Customization: Override specific behaviors while reusing core logic
  • Debugging: Easier to debug individual components

This pattern enables building complex AI pipelines by composing smaller, well-tested components while maintaining the data-driven architecture that makes simulflow powerful.

Built With

Acknowledgements

Voice-fn takes heavy inspiration from pipecat. Differences:

  • simulflow uses a graph instead of a bidirectional queue for frame transport
  • simulflow has a data centric implementation. The processors in simulflow are pure functions in the core.async.flow transform syntax

License

MIT

For Tasks:

Click tags to check more tools for each tasks

For Jobs:

Alternative AI tools for simulflow

Similar Open Source Tools

For similar tasks

For similar jobs