Skip to main content

πŸ“– Flinkflow User's Guide

Welcome to the Flinkflow User's Guide! This document will walk you through the core concepts of the platform and help you build your first declarative stream processing pipeline.


πŸš€ 1. What is Flinkflow?​

Flinkflow is a Declarative Stream Processing Engine built on top of Apache Flink. It allows you to define enterprise-grade, stateful data pipelines using a simple Kubernetes-native YAML DSL.

🌟 Key Value Proposition​

  • No Java Required: You write pipeline logic in YAML, with optional Java (Janino) or Python (GraalVM) snippets for custom logic.
  • GitOps Ready: Treat your data jobs as version-controlled code.
  • Portability: Move pipelines between Local, Docker, and Kubernetes with zero code changes.

πŸ—οΈ 2. Core Concepts​

To master Flinkflow, you need to understand its three primary building blocks:

πŸ”„ Pipelines​

A Pipeline is a directed acyclic graph (DAG) of processing steps. It defines where your data comes from, how it's transformed, and where it's stored.

πŸ§ͺ Steps​

Each step in a pipeline is a discrete unit of work. Steps are categorized into:

  • Source: Ingests data (e.g., Kafka, S3, Static content).
  • Process: Transforms data (e.g., Filtering, Mapping, Aggregating).
  • Sink: Exports data (e.g., Kafka, JDBC, Console, Webhooks).

πŸ“¦ Flowlets​

Flowlets are reusable, parameterized pipeline components. Think of them as "templates" for steps. Instead of re-configuring a complex Kafka connector every time, you define it once as a Flowlet and reuse it across multiple pipelines with simple arguments.


⚑ 3. Writing Your First Pipeline​

A Flinkflow pipeline is just a YAML file. Here is the canonical "Hello World" example (hello-world.yaml):

name: "Hello World Pipeline"
parallelism: 1
steps:
- type: source
name: static-source
properties:
content: "Flinkflow,is,awesome"

- type: process
name: upper-case-transform
code: |
return input.toUpperCase();

- type: sink
name: console-sink
properties:
type: console

πŸƒ Running it locally:​

If you have the Flinkflow JAR, you can run this instantly:

./scripts/run-local.sh examples/standalone/hello-world.yaml

🐍 4. Using Polyglot Business Logic​

One of Flinkflow's most powerful features is the ability to inject custom logic directly into the YAML.

Java Snippets (Janino)​

High-performance, JVM-native compilation for filters and maps.

- type: filter
code: "return input.length() > 10;"

Python Snippets (GraalVM)​

Secure, sandboxed Python for data scientists and analysts.

- type: process
language: python
code: |
import json
data = json.loads(input)
data['processed'] = True
return json.dumps(data)

Apache Camel DSL​

The preferred way to build complex, low-code integrations. (See: Apache Camel Docs)

Simple Expressions: Best for field extraction and fast string logic. (Ref: Simple Language)

- type: process
language: camel
code: "User ${jsonpath($.id)} processed at ${date:now}"

YAML DSL Fragments: Best for complex routing (Enterprise Integration Patterns) like the Choice EIP. (Ref: YAML DSL)

- type: process
language: camel-yaml
code: |
- from:
uri: "direct:start"
steps:
- choice:
when:
- simple: "${body} contains 'fraud'"
steps:
- setBody:
constant: "ALERT"
otherwise:
steps:
- setBody:
constant: "SAFE"

🌍 Comprehensive Example: IoT Fleet Analytics​

For a full end-to-end pipeline that streams mock datagen payloads, processes them, reduces them with a 5-second tumbling window, and evaluates them with a condition engine to trigger alerts, see the iot-fleet-analytics examples!

We provide both Kubernetes CRD and Standalone definitions, implemented in both Java and Python:

  • Kubernetes (Java): examples/k8s/iot-fleet-analytics.yaml
  • Kubernetes (Python): examples/k8s/iot-fleet-analytics-python.yaml

πŸƒ Running the Examples Locally:​

You can test the standalone versions (without the apiVersion Kubernetes wrappers) of these IoT pipelines using the local runner:

To run the Python version:

./scripts/run-local.sh examples/standalone/iot-fleet-analytics-python.yaml

To run the Java version:

./scripts/run-local.sh examples/standalone/iot-fleet-analytics.yaml

☸️ 5. Moving to Kubernetes​

Flinkflow is designed for the modern cloud stack. Once your local YAML is ready, you can deploy it as a Pipeline CRD:

apiVersion: flinkflow.io/v1alpha1
kind: Pipeline
metadata:
name: my-cloud-job
spec:
steps:
- type: flowlet
name: kafka-source
with:
topic: "raw-events"
- type: sink
name: console-sink

πŸ“š 6. Next Steps​


Need help? Connect with the community on Zulip.


πŸ”Œ 7. Sources & Sinks Reference​

Sources​

ConnectorKey PropertiesDescription
kafka-sourcebootstrap.servers, topic, group.id, formatReads from Apache Kafka or Confluent Cloud.
confluent-kafka-sourcebootstrapServers, topic, apiKey, apiSecretPre-built Flowlet for Confluent SASL_SSL consumers.
file-sourcepath, monitorIntervalReads local or S3 files; set monitorInterval for continuous streaming.
s3-sourcebucket, prefix, regionDirect Amazon S3 read (requires AWS credentials).
static-sourcecontentPipe-separated bounded test data. Great for local development.
datagen(auto configured)Random data generator using the Flink Datagen connector.

Sinks​

ConnectorKey PropertiesDescription
console-sink(none)Prints records to stdout.
kafka-sinkbootstrap.servers, topic, formatWrites to a Kafka topic.
confluent-kafka-sinkbootstrapServers, topic, apiKey, apiSecretConfluent Cloud producer via Flowlet.
file-sink / s3-sinkpath, rolloverInterval, maxPartSizePartitioned writes to disk or S3.
http-sinkurlCode, method, authCodePush to webhooks or external REST APIs.
jdbc-sinkurl, sql, batchSizeHigh-throughput inserts into PostgreSQL, MySQL, or Oracle.

🧩 8. Working with Flowlets​

Flowlets are the reusable "building blocks" of Flinkflow β€” similar to how Apache Camel uses Kamelets. Once installed into your cluster, they can be referenced by name in any pipeline, with parameters supplied via the with: block.

[!TIP] Flowlets dramatically reduce repetition across pipelines. You define a complex Confluent Kafka connector once, and all teams reference it with just type: flowlet and their specific topic.

Installing the Default Catalog​

# Install Flowlet and Pipeline CRDs, then populate the cluster with built-in Flowlets
./deploy/k8s/install-flowlets.sh [NAMESPACE]

Using a Flowlet in Your Pipeline​

name: "Production Kafka Pipeline"
steps:
- type: flowlet
name: confluent-kafka-source
with:
bootstrapServers: "pkc-xxx.confluent.cloud:9092"
topic: "raw-orders"
apiKey: "secret:confluent-creds/key"
apiSecret: "secret:confluent-creds/secret"

- type: filter
code: "return input.contains(\"COMPLETED\");"

- type: flowlet
name: kafka-sink
with:
bootstrapServers: "my-kafka:9092"
topic: "cleaned-orders"

Built-in Flowlet Catalog​

Flowlet NameTypePurpose
kafka-sourceSourceApache Kafka consumer
kafka-sinkSinkApache Kafka producer
confluent-kafka-sourceSourceConfluent Cloud consumer (SASL_SSL)
confluent-kafka-sinkSinkConfluent Cloud producer (SASL_SSL)
http-enrichOperationAsync REST API enrichment
log-transformOperationLog records to stdout with a configurable prefix

πŸ” 9. Secret Management​

Never hardcode credentials. Flinkflow integrates directly with Kubernetes Secrets and environment variables.

# 1. Create the Kubernetes Secret
kubectl create secret generic kafka-creds \
--from-literal=api-key=mykey \
--from-literal=api-secret=mysecret

# 2. Reference it in your pipeline using secret:<name>/<key>
steps:
- type: flowlet
name: confluent-kafka-source
with:
apiKey: "secret:kafka-creds/api-key"
apiSecret: "secret:kafka-creds/api-secret"

Environment Variables​

properties:
bootstrap.servers: "${KAFKA_BOOTSTRAP_SERVERS}"
StrategySyntaxUse Case
Kubernetes Secretsecret:name/keyProduction (recommended)
Env Variable${VAR_NAME}CI/CD pipelines, staging
Plain textliteral-valueLocal development only

πŸ“Š 10. Monitoring Your Pipelines​

The NiceGUI-based monitoring dashboard provides real-time visibility into your Flink jobs running in Kubernetes.

What You Can Monitor​

  • βœ… Job health status (Running, Failed, Finished)
  • βœ… Record throughput per step
  • βœ… Backpressure and checkpoint health
  • βœ… TaskManager and JobManager resource usage

Deploy the Dashboard​

kubectl apply -f deploy/k8s/monitor-deployment.yaml

Access via NodePort, or use port-forwarding:

kubectl port-forward svc/flinkflow-dashboard 8081:8081 -n flinkmonitor

Inline Custom Metrics​

You can emit custom metrics directly from your code snippets using the built-in metrics object:

- type: process
code: |
metrics.counter("records_processed").inc();
return input.toUpperCase();

πŸ” 11. Troubleshooting​

Validate Before Running: Dry-Run Mode​

The fastest way to check your YAML is correct β€” without starting Flink β€” is the --dry-run flag:

./scripts/run-local.sh my-pipeline.yaml --dry-run

This will:

  1. Parse and validate the YAML structure
  2. Resolve all Flowlet references into concrete steps
  3. Print the fully expanded pipeline graph β€” without executing it

Common Issues​

SymptomLikely CauseFix
ClassNotFoundException in code snippetMissing import in JaninoAdd explicit Java import statements at top of code: block
SecretResolutionExceptionSecret not found in K8sRun kubectl get secret <name> to confirm it exists
FlowletNotFound errorFlowlet CRDs not installed yetRun ./deploy/k8s/install-flowlets.sh
Pipeline exits immediatelyBounded source finishedReplace static-source with kafka-source for unbounded streaming
High latency on Python stepsGraalVM Context initializationExpected on startup; use --dry-run to pre-warm in development

Log Filtering​

All Flinkflow errors are prefixed with [FLINKFLOW-ERROR], making them easy to filter in any logging stack:

# Tail logs from a running Flink JobManager
kubectl logs -f flinkflow-app-jm-0 | grep FLINKFLOW-ERROR

πŸ“š 12. Next Steps & Further Reading​

GuideDescription
Configuration ReferenceFull DSL spec for all connectors and operations
Operations & MonitoringPerformance tuning and advanced dashboard setup
XSLT DataMapper GuideComplex JSON/XML transformations using Saxon-HE
Kubernetes Deployment GuideStep-by-step K8s operator deployment
System ArchitectureHow the engine works under the hood
Project RoadmapPlanned features for upcoming releases

Questions? Feature requests? Connect with the community on Zulip or open an issue.