· Talweg Team · Announcements  · 3 min read

Announcing Native Flink SQL Support in Flinkflow

Flinkflow now supports Flink SQL as a first-class step type. Define schemas, write SQL queries, perform windowed aggregations, and join multiple streams — all from your YAML pipeline.

Flinkflow now supports Flink SQL as a first-class step type. Define schemas, write SQL queries, perform windowed aggregations, and join multiple streams — all from your YAML pipeline.

We’re excited to announce that Flinkflow now natively supports Apache Flink SQL as a first-class type: sql step in the YAML DSL. This means Data Analysts, SQL-native developers, and anyone comfortable with SQL can now contribute streaming logic directly into production Flinkflow pipelines — no Java, no Maven, no boilerplate.


SQL is the most widely understood data language in the world. By bringing Flink SQL into Flinkflow’s declarative model, we’re expanding the platform’s accessibility to an entirely new audience:

  • Data Analysts who think in SQL can now author production streaming transformations.
  • Existing Flink SQL users can embed their queries in Flinkflow YAML and gain Kubernetes-native deployment, GitOps workflows, and Agentic AI capabilities.
  • Polyglot teams can mix SQL with Python, Java, and Camel in a single pipeline — using the right tool for each step.

What You Can Do with type: sql

🔍 Filtering & Transformation

Write standard SQL queries against your streaming data:

- type: sql
  name: filter-active-users
  properties:
    schema.userId: "string"
    schema.age: "int"
    schema.status: "string"
    query: "SELECT userId, age FROM input WHERE status = 'active' AND age >= 18"

📊 Windowed Aggregations

Perform time-windowed analytics with watermark support:

- type: sql
  name: revenue-per-window
  properties:
    schema.productId: "string"
    schema.eventTime: "timestamp"
    schema.revenue: "double"
    watermark.column: "eventTime"
    watermark.delay: "5"
    query: |
      SELECT window_start, window_end, productId, SUM(revenue) AS total_revenue
      FROM TABLE(TUMBLE(TABLE input, DESCRIPTOR(eventTime), INTERVAL '1' MINUTE))
      GROUP BY window_start, window_end, productId

🔗 Multi-Table Joins

Join data from multiple upstream sources with per-table schemas:

- type: sql
  name: enriched-transactions
  inputs: [transactions, accounts]
  properties:
    schema.transactions.txId: "string"
    schema.transactions.accountId: "string"
    schema.transactions.amount: "double"
    schema.accounts.accountId: "string"
    schema.accounts.ownerName: "string"
    schema.accounts.riskLevel: "string"
    query: |
      SELECT t.txId, a.ownerName, t.amount, a.riskLevel
      FROM transactions t
      JOIN accounts a ON t.accountId = a.accountId
      WHERE t.amount > 10000.0

📤 Changelog Output

Control how stateful query results are emitted downstream:

- type: sql
  name: running-totals
  properties:
    schema.userId: "string"
    schema.amount: "double"
    outputMode: "changelog"
    query: "SELECT userId, SUM(amount) AS total_spent FROM input GROUP BY userId"

Output modes: append (default), changelog (for stateful aggregations), or auto (FlinkFlow chooses).


Schema Validation at Load Time

Unlike standalone Flink SQL where schema errors surface at runtime, FlinkFlow validates your entire SQL step configuration at pipeline load time:

  • ✅ Schema type validation (string, int, long, float, double, boolean, timestamp, decimal)
  • ✅ Watermark column existence and type checking
  • ✅ Output mode validation
  • ✅ Multi-table schema namespace verification
  • ✅ Missing query detection

Errors are caught before your job ever touches a Flink cluster.


Fits Right Into the Polyglot Model

The SQL step is not a separate system — it’s a native step type that integrates seamlessly with FlinkFlow’s polyglot engine. A single pipeline can flow through SQL, Python, Java, and Camel steps:

Source → SQL (filter/join) → Python (ML scoring) → Camel (routing) → Sink

This is how modern data teams work: each contributor uses the language they know best, and FlinkFlow orchestrates the entire pipeline declaratively.


How to Get Started

If you’re already using Flinkflow, simply add type: sql steps to your existing YAML pipelines. No new dependencies or configuration required.

If you’re new to Flinkflow:

  1. Clone the repository: git clone https://github.com/talwegai/flinkflow
  2. Explore SQL examples in the examples/ directory
  3. Run locally: ./scripts/run-local.sh your-sql-pipeline.yaml
  4. Read the full docs at talwegai.github.io/flinkflow

What’s Next

This is just the beginning of our SQL journey. On the roadmap:

  • Temporal table joins for slowly changing dimension enrichment
  • SQL-based Flowlets — reusable, parameterized SQL components
  • Catalog integration (Hive Metastore, AWS Glue) for shared schema management
  • Interactive SQL console in the Flinkflow dashboard

We’d love your feedback. Join us on Zulip or open an issue on GitHub.

Flinkflow: SQL, Python, Java, and Camel — one pipeline, zero boilerplate.

Back to Blog

Related Posts

View All Posts »