Your First Pipeline¶
In this guide, you'll build a complete streaming pipeline that:
- Reads raw order events from Kafka
- Cleans and validates the data
- Computes real-time aggregations with Flink
- Exports results to an analytics sink
Architecture Overview¶
graph LR
A[orders_raw] --> B[orders_clean]
B --> C[order_metrics]
B --> D[orders_snowflake]
C --> E[Analytics Dashboard]
D --> F[Snowflake DW]
style A fill:#fee2e2,stroke:#ef4444
style B fill:#e0e7ff,stroke:#5046e5
style C fill:#dbeafe,stroke:#3b82f6
style D fill:#d1fae5,stroke:#059669
Setup¶
Make sure you have completed the Quick Start and have the local infrastructure running.
Create a new project:
Step 1: Project Configuration¶
stream_project.yml
project:
name: orders-pipeline
version: "1.0.0"
description: Real-time order processing pipeline
runtime:
kafka:
bootstrap_servers: localhost:9092
schema_registry:
url: http://localhost:8081
flink:
default: local
clusters:
local:
type: rest
rest_url: http://localhost:8082
sql_gateway_url: http://localhost:8084
connect:
default: local
clusters:
local:
rest_url: http://localhost:8083
# Governance rules
rules:
topics:
min_partitions: 3
naming_pattern: "^orders\\.[a-z]+\\.v[0-9]+$"
models:
require_description: true
require_owner: true
Step 2: Define Sources¶
Create the source representing incoming orders:
sources/orders.yml
sources:
- name: orders_raw
topic: orders.raw.v1
description: |
Raw order events from the checkout service.
Contains all order attempts including failed ones.
owner: checkout-team
freshness:
warn_after: 5m
error_after: 15m
schema:
registry: confluent
subject: orders-raw-value
columns:
- name: order_id
description: Unique order identifier (UUID)
tests:
- not_null
- unique
- name: customer_id
description: Customer who placed the order
classification: internal
- name: items
description: Array of ordered items
- name: total_amount
description: Order total in cents
classification: internal
- name: currency
description: ISO 4217 currency code
- name: status
description: Order status (pending, confirmed, shipped, delivered, cancelled)
- name: created_at
description: Order creation timestamp
Step 3: Create Models¶
3a. Cleaned Orders (Topic Materialization)¶
A stateless transformation that filters and validates orders:
models/orders_clean.yml
models:
- name: orders_clean
description: |
Cleaned and validated orders.
Filters out invalid orders and normalizes data.
owner: data-platform
key: order_id
sql: |
SELECT
order_id,
customer_id,
items,
total_amount,
UPPER(currency) as currency,
status,
created_at
FROM {{ source("orders_raw") }}
WHERE order_id IS NOT NULL
AND customer_id IS NOT NULL
AND total_amount > 0
AND status IN ('pending', 'confirmed', 'shipped', 'delivered', 'cancelled')
# Optional: customize topic settings
advanced:
topic:
name: orders.clean.v1
partitions: 12
config:
retention.ms: 604800000 # 7 days
3b. Order Metrics (Flink Materialization)¶
A stateful aggregation computing real-time metrics:
models/order_metrics.yml
models:
- name: order_metrics
description: |
Real-time order metrics aggregated by 5-minute windows.
Used for operational dashboards.
owner: analytics-team
sql: |
SELECT
TUMBLE_START(created_at, INTERVAL '5' MINUTE) as window_start,
TUMBLE_END(created_at, INTERVAL '5' MINUTE) as window_end,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue,
AVG(total_amount) as avg_order_value,
COUNT(DISTINCT customer_id) as unique_customers
FROM {{ ref("orders_clean") }}
GROUP BY TUMBLE(created_at, INTERVAL '5' MINUTE)
# Optional: tune Flink performance
advanced:
flink:
parallelism: 4
checkpoint_interval: 60000
topic:
name: orders.metrics.v1
partitions: 6
The TUMBLE window function automatically triggers Flink materialization.
3c. Snowflake Sink (Sink Materialization)¶
Export orders to Snowflake for analytics:
models/orders_snowflake.yml
models:
- name: orders_snowflake
description: |
Exports cleaned orders to Snowflake data warehouse
for historical analysis and reporting.
owner: data-platform
from: orders_clean
advanced:
connector:
type: snowflake-sink
config:
snowflake.url.name: ${SNOWFLAKE_URL}
snowflake.user.name: ${SNOWFLAKE_USER}
snowflake.private.key: ${SNOWFLAKE_PRIVATE_KEY}
snowflake.database.name: ANALYTICS
snowflake.schema.name: ORDERS
snowflake.table.name: ORDERS_STREAM
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://localhost:8081
Using from: without sql: automatically triggers sink materialization.
Step 4: Add Tests¶
Schema Tests¶
tests/orders_schema.yml
tests:
- name: orders_clean_schema
model: orders_clean
type: schema
description: Validate orders_clean schema constraints
assertions:
- not_null:
columns: [order_id, customer_id, total_amount, status]
- accepted_values:
column: status
values: [pending, confirmed, shipped, delivered, cancelled]
- accepted_values:
column: currency
values: [USD, EUR, GBP, JPY, CAD]
Sample Tests¶
tests/orders_sample.yml
tests:
- name: orders_data_quality
model: orders_clean
type: sample
sample_size: 1000
description: Validate data quality on a sample of orders
assertions:
- range:
column: total_amount
min: 1
max: 10000000 # Max $100,000
- unique_key:
key: order_id
tolerance: 0.01 # Allow 1% duplicates in sample
Continuous Tests¶
tests/orders_continuous.yml
tests:
- name: orders_freshness_monitor
model: orders_clean
type: continuous
description: Continuously monitor order freshness
assertions:
- max_lag:
seconds: 300 # Alert if lag > 5 minutes
- throughput:
min_per_minute: 10 # At least 10 orders/minute
on_failure:
- alert:
channel: slack
webhook: ${SLACK_WEBHOOK_URL}
Step 5: Document Exposures¶
Document downstream consumers:
exposures/analytics.yml
exposures:
- name: operations_dashboard
type: dashboard
description: |
Real-time operations dashboard showing order metrics,
revenue trends, and system health.
owner: analytics-team
url: https://grafana.company.com/d/orders
consumes:
- ref: order_metrics
sla:
latency_p99_ms: 5000
availability: 99.9
- name: billing_service
type: application
description: |
Billing microservice that processes confirmed orders
and generates invoices.
owner: billing-team
role: consumer
consumer_group: billing-service-cg
consumes:
- ref: orders_clean
sla:
throughput_per_second: 100
Step 6: Validate and Deploy¶
Validate the Project¶
Expected output:
✓ Project 'orders-pipeline' is valid
Sources: 1
Models: 3
Tests: 3
Exposures: 2
Governance:
✓ All topics meet minimum partition requirement (3)
✓ All topic names match pattern
✓ All models have descriptions
✓ All models have owners
View Lineage¶
orders_raw (source)
└── orders_clean (topic)
├── order_metrics (flink)
│ └── operations_dashboard (exposure)
├── orders_snowflake (sink)
└── billing_service (exposure)
Plan Deployment¶
Plan: 3 to create, 0 to update, 0 to delete
Topics:
+ orders.clean.v1 (12 partitions)
+ orders.metrics.v1 (6 partitions)
Flink Jobs:
+ order_metrics (parallelism: 4)
Connectors:
+ orders_snowflake (snowflake-sink)
Apply¶
Run Tests¶
# Run all tests
streamt test
# Run only schema tests
streamt test --type schema
# Run tests for a specific model
streamt test --model orders_clean
Step 7: Monitor¶
Check Status¶
Topics:
orders.clean.v1 ✓ healthy (12 partitions, lag: 0)
orders.metrics.v1 ✓ healthy (6 partitions, lag: 23)
Flink Jobs:
order_metrics ✓ RUNNING (4/4 tasks)
Connectors:
orders_snowflake ✓ RUNNING (1/1 tasks)
View in Conduktor¶
Open http://localhost:8080 to see:
- Topics and their messages
- Consumer group lag
- Schema evolution
- Connector status
Final Project Structure¶
orders-pipeline/
├── stream_project.yml
├── sources/
│ └── orders.yml
├── models/
│ ├── orders_clean.yml
│ ├── order_metrics.yml
│ └── orders_snowflake.yml
├── tests/
│ ├── orders_schema.yml
│ ├── orders_sample.yml
│ └── orders_continuous.yml
└── exposures/
└── analytics.yml
What You've Learned¶
- Sources declare external data entry points
- Topic materialization for stateless transformations
- Flink materialization for stateful aggregations
- Sink materialization for exporting to external systems
- Tests validate data quality at different levels
- Exposures document downstream dependencies
- Governance rules enforce standards