Tutorial: Stream Joins¶
Learn to join streams together and enrich streaming data with dimension tables. By the end, you'll understand the different join types and when to use each.
Prerequisites:
- Streaming Fundamentals (understand state, event time)
- Windowed Aggregations Tutorial
What you'll build:
orders_raw ──────┬─────→ orders_enriched ───→ order_payment_matched
│ (temporal join) (interval join)
customers_cdc ───┘ │
│
payments_raw ─────────────────────┘
The Challenge of Streaming Joins¶
In batch SQL, joins are straightforward:
In streaming, this is problematic:
- Orders never stop arriving - you can't wait for all of them
- Customers change over time - which version do you join with?
- State grows unbounded - keeping all unmatched events forever
Streaming joins solve this with time bounds and versioned tables.
Step 1: Project Setup¶
Create stream_project.yml:
project:
name: stream-joins-demo
version: "1.0.0"
runtime:
kafka:
bootstrap_servers: localhost:9092
schema_registry:
url: http://localhost:8081
flink:
default: local
clusters:
local:
rest_url: http://localhost:8082
sql_gateway_url: http://localhost:8084
sources:
# Orders stream
- name: orders_raw
description: Order events from the order service
topic: orders.raw.v1
event_time:
column: order_timestamp
watermark:
max_out_of_orderness_ms: 5000
columns:
- name: order_id
type: STRING
- name: customer_id
type: STRING
- name: amount
type: DECIMAL(10,2)
- name: order_timestamp
type: TIMESTAMP(3)
# Payments stream
- name: payments_raw
description: Payment events from the payment processor
topic: payments.raw.v1
event_time:
column: payment_timestamp
watermark:
max_out_of_orderness_ms: 10000 # Payments can be more delayed
columns:
- name: payment_id
type: STRING
- name: order_id
type: STRING
- name: payment_method
type: STRING
- name: payment_status
type: STRING
- name: payment_timestamp
type: TIMESTAMP(3)
# Customer CDC stream (changes from the customer database)
- name: customers_cdc
description: Customer changes via CDC (Debezium format)
topic: customers.cdc.v1
event_time:
column: updated_at
watermark:
max_out_of_orderness_ms: 1000
columns:
- name: id
type: STRING
- name: name
type: STRING
- name: email
type: STRING
- name: tier
type: STRING
description: Customer tier (bronze, silver, gold, platinum)
- name: updated_at
type: TIMESTAMP(3)
Step 2: Temporal Join - Enrich Orders with Customer Data¶
A temporal join looks up the customer state as of the order time. This ensures you get the customer's tier at the moment they placed the order, not their current tier.
Create models/orders_enriched.yml:
models:
- name: orders_enriched
description: |
Orders enriched with customer information.
Uses a temporal join to get customer state as of order time.
Important: This join uses the customer's state at order_timestamp,
not their current state. If a customer upgrades from Silver to Gold
after placing an order, that order still shows Silver.
# Auto-inferred as flink due to temporal JOIN (FOR SYSTEM_TIME AS OF)
sql: |
SELECT
o.order_id,
o.customer_id,
o.amount,
o.order_timestamp,
c.name AS customer_name,
c.email AS customer_email,
c.tier AS customer_tier,
-- Derive discount based on tier at order time
CASE c.tier
WHEN 'platinum' THEN 0.20
WHEN 'gold' THEN 0.15
WHEN 'silver' THEN 0.10
ELSE 0.0
END AS tier_discount
FROM {{ source("orders_raw") }} AS o
LEFT JOIN {{ source("customers_cdc") }} FOR SYSTEM_TIME AS OF o.order_timestamp AS c
ON o.customer_id = c.id
advanced: # Optional: tune state management
topic:
name: orders.enriched.v1
partitions: 6
flink:
parallelism: 4
state_ttl_ms: 86400000 # 24 hours - keep customer versions for a day
How temporal joins work:
Timeline:
─────────────────────────────────────────────────→ time
│ │ │
Customer Order Customer
tier=silver placed tier=gold
(10:00) (10:30) (11:00)
Temporal join at 10:30 → tier = 'silver' (correct!)
Regular join → tier = 'gold' (wrong!)
Key points:
FOR SYSTEM_TIME AS OF o.order_timestamp- look up customer as of order time- The CDC stream must be keyed by the join key (
id) - State stores historical versions of customers
state_ttl_mslimits how far back we keep versions
Step 3: Interval Join - Match Orders with Payments¶
An interval join matches events from two streams within a time window. Use this when both sides are unbounded streams.
Create models/order_payment_matched.yml:
models:
- name: order_payment_matched
description: |
Orders matched with their payments.
Uses an interval join: payments must occur within 24 hours of the order.
Orders without payment within 24h are emitted as unmatched (LEFT JOIN).
This helps identify abandoned orders.
# Auto-inferred as flink due to interval JOIN with BETWEEN time bounds
sql: |
SELECT
o.order_id,
o.customer_id,
o.amount AS order_amount,
o.order_timestamp,
p.payment_id,
p.payment_method,
p.payment_status,
p.payment_timestamp,
-- Calculate time to payment
TIMESTAMPDIFF(MINUTE, o.order_timestamp, p.payment_timestamp) AS minutes_to_payment,
-- Flag if payment was received
CASE WHEN p.payment_id IS NOT NULL THEN TRUE ELSE FALSE END AS is_paid
FROM {{ ref("orders_enriched") }} AS o
LEFT JOIN {{ source("payments_raw") }} AS p
ON o.order_id = p.order_id
AND p.payment_timestamp BETWEEN o.order_timestamp
AND o.order_timestamp + INTERVAL '24' HOUR
advanced: # Optional: tune parallelism
topic:
name: orders.payment.matched.v1
partitions: 6
flink:
parallelism: 4
How interval joins work:
Orders: [O1 10:00] [O2 10:30] [O3 11:00]
Payments: [P1 10:15] [P2 11:45] [P3 35:00]
Match window: order_time to order_time + 24h
O1 (10:00) matches P1 (10:15) ✓ - within 24h
O2 (10:30) matches P2 (11:45) ✓ - within 24h
O3 (11:00) matches nothing ✗ - P3 is 24h+ later
Key points:
- Time bounds are required - unbounded joins are dangerous
- Both streams must have event time configured
- State holds events within the join window
- Results emit when a match is found OR when the window expires (for LEFT JOIN)
Step 4: Build an Abandoned Orders Alert¶
Use the matched data to identify orders that weren't paid. Create models/abandoned_orders.yml:
models:
- name: abandoned_orders
description: |
Orders that haven't been paid within 1 hour.
These are candidates for reminder emails or cart recovery.
# Auto-inferred as topic (simple SELECT with WHERE, no aggregation/window/join)
sql: |
SELECT
order_id,
customer_id,
customer_email,
order_amount,
customer_tier,
order_timestamp,
-- For targeting, high-value customers get priority
CASE
WHEN customer_tier IN ('gold', 'platinum') AND order_amount > 100 THEN 'high'
WHEN order_amount > 500 THEN 'high'
ELSE 'normal'
END AS priority
FROM {{ ref("order_payment_matched") }}
WHERE is_paid = FALSE
-- Only alert for orders older than 1 hour
AND order_timestamp < CURRENT_TIMESTAMP - INTERVAL '1' HOUR
advanced: # Optional: configure output topic
topic:
name: orders.abandoned.v1
partitions: 3
Step 5: Add Monitoring Tests¶
Create tests/join_tests.yml:
tests:
- name: enriched_orders_quality
description: Ensure customer enrichment is working
model: orders_enriched
type: sample
sample_size: 100
assertions:
- not_null:
columns: [order_id, customer_id, order_timestamp]
# Customer name should be present for most orders
# NULL is acceptable for new customers not yet in CDC
- custom_sql:
sql: |
SELECT COUNT(*) as failures
FROM {{ model }}
WHERE customer_name IS NULL
max_failures: 10 # Allow up to 10% missing
- name: payment_matching_latency
description: Monitor payment matching timing
model: order_payment_matched
type: sample
sample_size: 100
assertions:
- range:
column: minutes_to_payment
min: 0
max: 1440 # 24 hours max
Join Types Reference¶
Regular Join (INNER)¶
Warning: Unbounded state! Both sides buffer all unmatched events forever. Only use with time bounds or TTL.
Left/Right/Full Outer Join¶
Emits unmatched orders when the time window expires.
Temporal Join (Versioned Table Lookup)¶
Looks up the table state at a specific point in time. The table side must be a CDC/changelog stream.
Interval Join¶
FROM orders o
JOIN payments p ON o.order_id = p.order_id
AND p.event_time BETWEEN o.event_time - INTERVAL '1' HOUR
AND o.event_time + INTERVAL '1' HOUR
Matches events within a symmetric or asymmetric time window.
State Management for Joins¶
Joins are the most state-intensive operations in streaming:
| Join Type | State Per Side | Growth |
|---|---|---|
| Unbounded inner | All unmatched events | Infinite! |
| Interval join | Events within window | Bounded |
| Temporal join | Table versions | Bounded by TTL |
Configuring State TTL¶
For temporal joins, TTL controls how far back you can look up historical versions. For interval joins, the join condition itself bounds the state.
Memory Estimation¶
State size ≈ (events/second) × (window size in seconds) × (avg event size)
Example:
- 1,000 orders/second
- 24-hour join window
- 500 bytes per order
State = 1,000 × 86,400 × 500 = 43.2 GB per side!
Tips:
- Reduce window size when possible
- Filter early (before the join)
- Use RocksDB for large state
- Project only needed columns
Troubleshooting¶
"Join produces no results"¶
- Check watermark progress on both sides
- Ensure join keys match exactly (case, type)
- Verify time bounds overlap
"Missing matches that should exist"¶
- Event may have arrived after watermark passed
- Increase
max_out_of_orderness_mson both sources - Check for clock skew between producers
"State grows unbounded"¶
- Add time bounds to join condition
- Add
state_ttl_msto flink config - Switch to RocksDB state backend
"Temporal join returns NULL"¶
- The lookup table had no entry at that timestamp
- State TTL may have expired the historical version
- Check if CDC events are reaching Kafka
Complete Pipeline¶
Your final lineage:
orders_raw (source)
└── orders_enriched (flink)
└── order_payment_matched (flink)
└── abandoned_orders (flink)
customers_cdc (source)
└── orders_enriched (flink)
payments_raw (source)
└── order_payment_matched (flink)
Deploy everything:
Next Steps¶
- CDC Pipelines Tutorial - Set up Debezium for the customers_cdc source
- State TTL Configuration - Fine-tune state management
- Streaming Fundamentals - Review watermarks and state