Guide: Using XSLT DataMapper in Flinkflow
The datamapper step in Flinkflow provides a powerful way to perform complex structural transformations on your data stream. While Java (Janino) and Python (GraalVM) snippets are great for custom logic, XSLT 3.0 is often superior for mapping fields, restructuring deep JSON/XML objects, or performing conditional formatting at scale.
Flinkflow uses Saxon-HE to support the full XSLT 3.0 specification.
🛠️ How it Works
- Input: A string from the Flink stream (usually JSON or XML).
- Execution: Flinkflow loads your
.xslfile once and applies it to every record in the stream. - Output: The result of the XSLT transformation (the
xsl:outputmethod).
🚀 Step-by-Step Implementation
Step 1: Create your XSLT Stylesheet
Create a file (e.g., user-transform.xsl) using XSLT 3.0. Since XSLT 3.0 supports JSON, you can use json-to-xml() to parse input and xml-to-json() to format output.
<?xml version="1.0" encoding="UTF-8"?>
<xsl:stylesheet version="3.0"
xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:fn="http://www.w3.org/2005/xpath-functions"
expand-text="yes">
<xsl:output method="text"/>
<xsl:template match=".[. instance of xs:string]">
<!-- 1. Parse Input JSON into an internal XML representation -->
<xsl:variable name="json" select="json-to-xml(.)"/>
<!-- 2. Construct the transformed structure -->
<xsl:variable name="out">
<fn:map>
<fn:string key="user_id">{$json//fn:number[@key='id']}</fn:string>
<fn:string key="display_name">{$json//fn:string[@key='name']}</fn:string>
<fn:string key="status">ACTIVE</fn:string>
</fn:map>
</xsl:variable>
<!-- 3. Convert back to JSON String for the Flink stream -->
<xsl:value-of select="xml-to-json($out)"/>
</xsl:template>
</xsl:stylesheet>
Step 2: Place the file in mappings/
Flinkflow looks for stylesheets relative to the execution directory. By convention, place them in the mappings/ folder.
mkdir -p mappings
mv user-transform.xsl mappings/
Step 3: Configure your Pipeline YAML
Add a step of type: datamapper and provide the xsltPath.
name: "User Enrichment Job"
steps:
- type: source
name: kafka-source
properties:
topic: "raw-users"
- type: datamapper
name: rename-fields
properties:
xsltPath: "mappings/user-transform.xsl"
- type: sink
name: console-sink
💡 Practical Examples
Example 1: JSON to JSON (Field Flattening)
If your input is nested: {"user": {"id": 1, "profile": {"name": "Alice"}}}, and you want it flat:
<fn:map>
<fn:number key="id">{$json//fn:map[@key='user']/fn:number[@key='id']}</fn:number>
<fn:string key="name">{$json//fn:map[@key='profile']/fn:string[@key='name']}</fn:string>
</fn:map>
Example 2: Filtering / Conditional Logic
Only include a field if a certain condition is met:
<fn:map>
<fn:string key="id">{$json//fn:number[@key='id']}</fn:string>
<xsl:if test="$json//fn:number[@key='age'] > 18">
<fn:boolean key="is_adult">true</fn:boolean>
</xsl:if>
</fn:map>
Example 3: Traditional XML Processing
If your stream contains raw XML, you can use traditional XSLT templates:
<xsl:template match="/">
<UserRecord>
<ID><xsl:value-of select="/raw/user/id"/></ID>
<Name><xsl:value-of select="upper-case(/raw/user/name)"/></Name>
</UserRecord>
</xsl:template>
🎨 Kaoto Integration: Visual Data Mapping
Kaoto is a low-code visual designer for Apache Camel and streaming pipelines. Flinkflow's datamapper is specifically designed to be compatible with the XSLT 3.0 scripts generated by the Kaoto Data Mapper.
Why use Kaoto with Flinkflow?
- Visual Drag-and-Drop: Map source fields to target fields without writing XSLT manually.
- Schema Awareness: Kaoto can load your source and target JSON schemas to provide autocomplete and validation.
- Complex Expressions: Use Kaoto's expression builder for math, string manipulation, and logical conditions.
How to Integrate:
- Design in Kaoto:
- Open Kaoto and create a new integration.
- Add a Data Mapper step.
- Define your Source and Target data structures (upload JSON Schema or sample JSON).
- Drag lines between fields to create mappings.
- Export the XSLT:
- Once satisfied, switch to the "Source" or "Code" view in Kaoto.
- Copy the generated XSLT 3.0 content.
- Save it as a
.xslfile in your Flinkflowmappings/directory.
- Reference in Flinkflow:
- Add the
type: datamapperstep to your YAML as shown in the guides above.
- Add the
[!TIP] Since Kaoto's Data Mapper outputs standard XSLT 3.0, Flinkflow's Saxon-HE engine executes it with 100% fidelity, ensuring your visual design behaves exactly as expected at runtime.
✅ Best Practices & Performance
- Pre-Compiled Stylesheets: Flinkflow compiles the XSLT stylesheet once during the
open()phase of the Flink task. This ensures high throughput during execution. - JSON-to-XML Parsing: Note that
json-to-xml()creates a specific XML schema defined by the XSLT 3.0 specification. Keys become@keyattributes on elements likefn:map,fn:array,fn:string, etc. - Method Choice: Always set
<xsl:output method="text"/>when outputting JSON strings to ensure Saxon doesn't add XML declarations to your JSON. - Saxon-HE: Flinkflow uses the Home Edition (HE). Advanced features like schema-awareness or streaming XSLT require the professional edition, but HE is sufficient for almost all mapping tasks.
For a working demo, check datamapper-example.yaml.