🛠️ Flinkflow Configuration Reference
This guide provides a detailed specification of the Flinkflow YAML DSL, including step types, connector properties, secret management, and complex workflow examples.
🏗️ Pipeline Specification
Job Config
name: Name of the Flink job (displayed in the Flink UI).parallelism: Default parallelism for the job.steps: A sequential list of pipeline steps.
Step Config
type: The type of operation (source,process,datamapper,join,http-lookup,agent,sink, orflowlet).name: Unique identifier for the component (e.g.,kafka-source).code: The logic snippet for transformation (used inprocess,filter,flatmap, etc.).language: (Optional) The runtime for thecodesnippet:java(Default): High-performance Janino (Java) execution.python: Inline Python via GraalVM Polyglot script engine.camel-simple(orcamel): Declarative Camel Simple expression. [Docs]camel-jsonpath(orjsonpath): JSON extractions and filters. [Docs]camel-groovy(orgroovy): High-performance JVM-native scripting. [Docs]camel-yaml: Complex route fragments using Camel YAML DSL. [Docs]
properties: Key-value configuration map. Values can reference Kubernetes Secrets usingsecret:name/key.with: (Forflowletsteps) Mapping of parameters passed to a reusable Flowlet.
🔌 Supported Components
Sources
| Connector | Type | Properties | Description |
|---|---|---|---|
kafka-source | source | bootstrap.servers, topic, group.id, format | Reads from Apache Kafka or Confluent Cloud. |
file-source | source | path, monitorInterval | Reads local or S3 files. Set monitorInterval for streaming. |
static-source | source | content | Generates a bounded stream from a pipe-separated string. |
Sinks
| Connector | Type | Properties | Description |
|---|---|---|---|
console-sink | sink | (none) | Prints records to stdout. |
kafka-sink | sink | bootstrap.servers, topic, format | Writes results to a Kafka topic. |
file-sink | sink | path, rolloverInterval, maxPartSize | Writes partitioned results to local disk or S3. |
http-sink | sink | url or urlCode, method, authCode | Pushes records to an external API (Webhooks). |
jdbc-sink | sink | url, sql, code, batchSize | Batch inserts into PostgreSQL, MySQL, or Oracle. |
Operations & Transformations
| process | 1-to-1 transformation. | input/body | Java / Python / Camel |
| filter | Retains records returning true. | input/body | Java / Python / Camel |
| flatmap | 1-to-N transformation. | input/body | Java / Python / Camel |
| keyby | Partitions stream by an extracted key. | input/body | Java / Camel |
| reduce | Rolling aggregation of two elements. | value1, value2| Java / Camel |
| window | Time-based windowing (Tumbling/Sliding). | value1, value2| Java / Camel |
| sideoutput | Branches stream using ctx.output(). | input, ctx | Java-only |
| datamapper | XSLT 3.0 structural transformation. | xsltPath (prop)| XML/JSON |
| join | Interval join between two streams. | left, right| Java-only |
| agent | Autonomous LLM agent over each record. | input | OpenAI / Gemini / Vertex |
| http-lookup| Async enrichment via REST API. | input, resp | Java-only |
[!TIP] Python Snippets (GraalVM): Use
language: pythonto write your logic. Thecode:block is treated as the body of aprocess(input)function.- type: process
language: python
code: |
import json
from datetime import datetime
data = json.loads(input)
data["processed_at"] = datetime.now().isoformat()
return json.dumps(data)[!TIP] Declarative Logic (Apache Camel): Use
language: camelorlanguage: jsonpathfor low-code operations. This removes the need for manual JSON parsing and boilerplate.Field Extraction (JsonPath):
- type: keyby
language: jsonpath
code: "$.user.id"Template Formatting (Camel Simple):
- type: process
language: camel
code: "User ${jsonpath($.user.name)} logged in from ${headers.ip}"Complex Stateful Math (Camel Groovy): For windowed reductions, use
headers.get("value1")andheaders.get("value2").- type: window
language: camel-groovy
code: |
def o1 = new groovy.json.JsonSlurper().parseText(headers.get("value1"))
def o2 = new groovy.json.JsonSlurper().parseText(headers.get("value2"))
return groovy.json.JsonOutput.toJson([sum: o1.val + o2.val])Enterprise Patterns (Camel YAML DSL): For complex routing or multi-step logic within a single operator, use
language: camel-yaml. The route must start withfrom: uri: direct:start.- type: process
language: camel-yaml
code: |
- from:
uri: "direct:start"
steps:
- choice:
when:
- simple: "${body} > 100"
steps:
- setBody:
constant: "OVER_LIMIT"
otherwise:
steps:
- setBody:
simple: "Accepted: ${body}"
[!TIP] In all Java snippets, the
metricsobject (typeMetricGroup) is available to emit custom live metrics:metrics.counter("my_counter").inc();.
[!TIP] Agentic Bridge: Use
type: agentto run an autonomous LLM over each stream record. Supports stateful multi-turn memory, Flowlet-as-a-Tool execution, and multiple providers.- type: agent
name: support-triage
properties:
model: "gemini-2.5-flash" # or gpt-4o, claude-3-opus
systemPrompt: "Classify the following customer message as: BILLING, TECHNICAL, or GENERAL."
memory: "false" # set true for multi-turn stateful conversation
apiKey: "secret:llm-creds/google-api-key"
tools: "log-transform,http-enrich" # optional: Flowlets exposed as agent toolsSupported Providers (auto-detected from model name):
Provider Models Auth OpenAI gpt-4o,gpt-4,o1-*OPENAI_API_KEYorapiKeypropertyGoogle Gemini gemini-*GOOGLE_API_KEYorapiKeypropertyGoogle Vertex Any Gemini + provider: vertexApplication Default Credentials (ADC) Ollama ollama:*,phi*,llama*,mistral*baseUrl: "http://localhost:11434"Example: Local Ollama Agent
- type: agent
name: local-classifier
properties:
# Uses 'ollama:' prefix to target local Ollama server
model: "ollama:llama3"
baseUrl: "http://localhost:11434"
systemPrompt: "Classify this record."
🧩 Flowlets (Reusable Components)
Flowlets are parameterized, pre-built components (inspired by Apache Camel Kamelets) that can be shared across multiple pipelines.
- Define: Define as a Kubernetes CRD (
kind: Flowlet) with parameters and a template. - Install: Apply to the cluster using
./k8s/install-flowlets.sh. - Reference: Use
type: flowletin your pipeline and supply values inwith:.
Example Flowlet Usage
steps:
- type: flowlet
name: confluent-kafka-source
with:
bootstrapServers: "pkc-xxx.confluent.cloud:9092"
topic: "orders"
apiKey: "secret:my-creds/key"
apiSecret: "secret:my-creds/secret"
🔐 Secret Management
Flinkflow resolves secrets dynamically at job startup based on the environment:
| Strategy | Syntax | Source |
|---|---|---|
| Kubernetes | secret:name/key | Injected from K8s API (Best for prod). |
| Env Var | ${VARIABLE_NAME} | Injected via System.getenv(). |
| Plain Text | my-password | Hardcoded in YAML (Prototyping only). |
📖 Deep Dive Examples
Windowed Aggregation
name: "Tumbling Window Count"
steps:
- type: source
name: kafka-source
properties: { topic: "events" }
- type: keyby
code: "return input.split(',')[0];"
- type: window
properties: { windowType: tumbling, size: 60 }
code: |
int count1 = Integer.parseInt(value1.split(",")[1]);
int count2 = Integer.parseInt(value2.split(",")[1]);
return value1.split(",")[0] + "," + (count1 + count2);
- type: sink
name: console-sink
Async HTTP Enrichment
steps:
- type: http-lookup
properties:
urlCode: 'return "https://api.my.com/user/" + input.split(",")[1];'
timeout: "5000"
code: 'return input + " | meta=" + response;'
JDBC Batch Sink
steps:
- type: jdbc-sink
properties:
url: "jdbc:postgresql://localhost:5432/analytics"
sql: "INSERT INTO events (id, payload) VALUES (?, ?)"
code: |
stmt.setLong(1, Long.parseLong(input.split(",")[0]));
stmt.setString(2, input.split(",")[1]);
batchSize: "500"
For more examples, check the standalone catalog or Kubernetes library.