Skip to main content

Guide: Using XSLT DataMapper in Flinkflow

The datamapper step in Flinkflow provides a powerful way to perform complex structural transformations on your data stream. While Java (Janino) and Python (GraalVM) snippets are great for custom logic, XSLT 3.0 is often superior for mapping fields, restructuring deep JSON/XML objects, or performing conditional formatting at scale.

Flinkflow uses Saxon-HE to support the full XSLT 3.0 specification.


🛠️ How it Works

  1. Input: A string from the Flink stream (usually JSON or XML).
  2. Execution: Flinkflow loads your .xsl file once and applies it to every record in the stream.
  3. Output: The result of the XSLT transformation (the xsl:output method).

🚀 Step-by-Step Implementation

Step 1: Create your XSLT Stylesheet

Create a file (e.g., user-transform.xsl) using XSLT 3.0. Since XSLT 3.0 supports JSON, you can use json-to-xml() to parse input and xml-to-json() to format output.

<?xml version="1.0" encoding="UTF-8"?>
<xsl:stylesheet version="3.0"
xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:xs="http://www.w3.org/2001/XMLSchema"
xmlns:fn="http://www.w3.org/2005/xpath-functions"
expand-text="yes">

<xsl:output method="text"/>

<xsl:template match=".[. instance of xs:string]">
<!-- 1. Parse Input JSON into an internal XML representation -->
<xsl:variable name="json" select="json-to-xml(.)"/>

<!-- 2. Construct the transformed structure -->
<xsl:variable name="out">
<fn:map>
<fn:string key="user_id">{$json//fn:number[@key='id']}</fn:string>
<fn:string key="display_name">{$json//fn:string[@key='name']}</fn:string>
<fn:string key="status">ACTIVE</fn:string>
</fn:map>
</xsl:variable>

<!-- 3. Convert back to JSON String for the Flink stream -->
<xsl:value-of select="xml-to-json($out)"/>
</xsl:template>
</xsl:stylesheet>

Step 2: Place the file in mappings/

Flinkflow looks for stylesheets relative to the execution directory. By convention, place them in the mappings/ folder.

mkdir -p mappings
mv user-transform.xsl mappings/

Step 3: Configure your Pipeline YAML

Add a step of type: datamapper and provide the xsltPath.

name: "User Enrichment Job"
steps:
- type: source
name: kafka-source
properties:
topic: "raw-users"

- type: datamapper
name: rename-fields
properties:
xsltPath: "mappings/user-transform.xsl"

- type: sink
name: console-sink

💡 Practical Examples

Example 1: JSON to JSON (Field Flattening)

If your input is nested: {"user": {"id": 1, "profile": {"name": "Alice"}}}, and you want it flat:

<fn:map>
<fn:number key="id">{$json//fn:map[@key='user']/fn:number[@key='id']}</fn:number>
<fn:string key="name">{$json//fn:map[@key='profile']/fn:string[@key='name']}</fn:string>
</fn:map>

Example 2: Filtering / Conditional Logic

Only include a field if a certain condition is met:

<fn:map>
<fn:string key="id">{$json//fn:number[@key='id']}</fn:string>
<xsl:if test="$json//fn:number[@key='age'] > 18">
<fn:boolean key="is_adult">true</fn:boolean>
</xsl:if>
</fn:map>

Example 3: Traditional XML Processing

If your stream contains raw XML, you can use traditional XSLT templates:

<xsl:template match="/">
<UserRecord>
<ID><xsl:value-of select="/raw/user/id"/></ID>
<Name><xsl:value-of select="upper-case(/raw/user/name)"/></Name>
</UserRecord>
</xsl:template>

🎨 Kaoto Integration: Visual Data Mapping

Kaoto is a low-code visual designer for Apache Camel and streaming pipelines. Flinkflow's datamapper is specifically designed to be compatible with the XSLT 3.0 scripts generated by the Kaoto Data Mapper.

Why use Kaoto with Flinkflow?

  • Visual Drag-and-Drop: Map source fields to target fields without writing XSLT manually.
  • Schema Awareness: Kaoto can load your source and target JSON schemas to provide autocomplete and validation.
  • Complex Expressions: Use Kaoto's expression builder for math, string manipulation, and logical conditions.

How to Integrate:

  1. Design in Kaoto:
    • Open Kaoto and create a new integration.
    • Add a Data Mapper step.
    • Define your Source and Target data structures (upload JSON Schema or sample JSON).
    • Drag lines between fields to create mappings.
  2. Export the XSLT:
    • Once satisfied, switch to the "Source" or "Code" view in Kaoto.
    • Copy the generated XSLT 3.0 content.
    • Save it as a .xsl file in your Flinkflow mappings/ directory.
  3. Reference in Flinkflow:
    • Add the type: datamapper step to your YAML as shown in the guides above.

[!TIP] Since Kaoto's Data Mapper outputs standard XSLT 3.0, Flinkflow's Saxon-HE engine executes it with 100% fidelity, ensuring your visual design behaves exactly as expected at runtime.


✅ Best Practices & Performance

  • Pre-Compiled Stylesheets: Flinkflow compiles the XSLT stylesheet once during the open() phase of the Flink task. This ensures high throughput during execution.
  • JSON-to-XML Parsing: Note that json-to-xml() creates a specific XML schema defined by the XSLT 3.0 specification. Keys become @key attributes on elements like fn:map, fn:array, fn:string, etc.
  • Method Choice: Always set <xsl:output method="text"/> when outputting JSON strings to ensure Saxon doesn't add XML declarations to your JSON.
  • Saxon-HE: Flinkflow uses the Home Edition (HE). Advanced features like schema-awareness or streaming XSLT require the professional edition, but HE is sufficient for almost all mapping tasks.

For a working demo, check datamapper-example.yaml.