YAML Schema Reference¶
Complete reference for all YAML configuration in streamt projects.
Project Configuration¶
The main project configuration file (stream_project.yml) defines project metadata, runtime connections, defaults, and governance rules.
Project Metadata¶
project:
name: my-pipeline # Required: Project identifier
version: "1.0.0" # Optional: Semantic version
description: "Pipeline desc" # Optional: Human-readable description
| Field | Type | Required | Description |
|---|---|---|---|
name |
string | Yes | Unique project identifier |
version |
string | No | Semantic version string |
description |
string | No | Project description |
Runtime Configuration¶
Kafka¶
runtime:
kafka:
bootstrap_servers: kafka:9092 # Required
security_protocol: SASL_SSL # Optional
sasl_mechanism: PLAIN # Optional
sasl_username: ${KAFKA_USER} # Optional
sasl_password: ${KAFKA_PASSWORD} # Optional
| Field | Type | Required | Description |
|---|---|---|---|
bootstrap_servers |
string | Yes | Kafka broker address(es) |
security_protocol |
string | No | PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL |
sasl_mechanism |
string | No | PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 |
sasl_username |
string | No | SASL username |
sasl_password |
string | No | SASL password |
Schema Registry¶
runtime:
schema_registry:
url: http://schema-registry:8081 # Required
username: ${SR_USER} # Optional
password: ${SR_PASSWORD} # Optional
| Field | Type | Required | Description |
|---|---|---|---|
url |
string | Yes | Schema Registry URL |
username |
string | No | Basic auth username |
password |
string | No | Basic auth password |
Flink¶
runtime:
flink:
default: local # Optional: Default cluster name
clusters:
local:
type: rest # Cluster type
rest_url: http://localhost:8082
sql_gateway_url: http://localhost:8084
version: "1.18" # Optional
environment: dev # Optional
api_key: ${FLINK_API_KEY} # Optional
| Field | Type | Required | Description |
|---|---|---|---|
default |
string | No | Name of default cluster |
clusters |
map | No | Named cluster configurations |
Cluster Configuration:
| Field | Type | Required | Description |
|---|---|---|---|
type |
string | No | rest, docker, kubernetes, confluent |
rest_url |
string | No | Flink REST API URL |
sql_gateway_url |
string | No | Flink SQL Gateway URL |
version |
string | No | Flink version |
environment |
string | No | Environment identifier |
api_key |
string | No | API key for managed services |
Connect¶
| Field | Type | Required | Description |
|---|---|---|---|
default |
string | No | Default cluster name |
clusters.<name>.rest_url |
string | Yes | Connect REST API URL |
Conduktor Gateway (Optional)¶
Required for virtual topic models (auto-inferred from gateway: configuration). See Gateway Guide.
runtime:
conduktor:
gateway:
admin_url: http://localhost:8888 # Required: Gateway Admin API
proxy_bootstrap: localhost:6969 # Required: Gateway proxy for clients
username: ${GATEWAY_USER} # Optional (default: admin)
password: ${GATEWAY_PASSWORD} # Optional (default: conduktor)
virtual_cluster: default # Optional: For multi-tenant setups
| Field | Type | Required | Description |
|---|---|---|---|
admin_url |
string | Yes | Gateway Admin API URL |
proxy_bootstrap |
string | Yes | Gateway proxy for Kafka clients |
username |
string | No | Admin API username (default: admin) |
password |
string | No | Admin API password (default: conduktor) |
virtual_cluster |
string | No | Virtual cluster for multi-tenancy |
Defaults¶
Set project-wide default values:
defaults:
topic:
partitions: 1
replication_factor: 1
models:
cluster: production
topic:
partitions: 6
replication_factor: 3
tests:
flink_cluster: production
| Section | Field | Type | Description |
|---|---|---|---|
topic |
partitions |
int | Default partition count |
topic |
replication_factor |
int | Default replication factor |
models |
cluster |
string | Default Flink cluster |
models.topic |
partitions |
int | Default for model output topics |
models.topic |
replication_factor |
int | Default for model output topics |
tests |
flink_cluster |
string | Default cluster for tests |
Governance Rules¶
Topic Rules¶
rules:
topics:
min_partitions: 3
max_partitions: 128
min_replication_factor: 2
required_config:
- retention.ms
naming_pattern: "^[a-z]+\\.[a-z]+\\.v[0-9]+$"
forbidden_prefixes:
- "_"
- "test"
| Field | Type | Description |
|---|---|---|
min_partitions |
int | Minimum allowed partitions |
max_partitions |
int | Maximum allowed partitions |
min_replication_factor |
int | Minimum replication factor |
required_config |
list | Required topic config keys |
naming_pattern |
string | Regex pattern for topic names |
forbidden_prefixes |
list | Disallowed topic name prefixes |
Model Rules¶
rules:
models:
require_description: true
require_owner: true
require_tests: true
max_dependencies: 10
| Field | Type | Description |
|---|---|---|
require_description |
bool | Models must have description |
require_owner |
bool | Models must have owner |
require_tests |
bool | Models must have associated tests |
max_dependencies |
int | Max upstream dependencies |
Source Rules¶
| Field | Type | Description |
|---|---|---|
require_schema |
bool | Sources must define schema |
require_freshness |
bool | Sources must define freshness SLA |
Security Rules¶
| Field | Type | Description |
|---|---|---|
require_classification |
bool | Columns must have classification |
sensitive_columns_require_masking |
bool | Sensitive columns require masking policy |
Sources¶
Sources declare external Kafka topics that your project consumes.
sources:
- name: orders_raw # Required: Unique identifier
topic: orders.raw.v1 # Required: Kafka topic name
description: "Raw order events" # Optional
cluster: production # Optional: Kafka cluster
owner: orders-team # Optional
tags: # Optional
- orders
- raw
schema: # Optional: Schema definition
registry: confluent # Schema registry type
subject: orders-raw-value # Subject name
format: avro # avro, json, protobuf
definition: | # Inline schema (alternative to registry)
{
"type": "record",
"name": "Order",
"fields": [...]
}
columns: # Optional: Column metadata
- name: order_id
description: "Unique order ID"
classification: internal
- name: customer_email
description: "Customer email"
classification: sensitive
freshness: # Optional: Freshness SLA
max_lag_seconds: 300
warn_after_seconds: 120
Source Fields¶
| Field | Type | Required | Description |
|---|---|---|---|
name |
string | Yes | Unique source identifier |
topic |
string | Yes | Kafka topic name |
description |
string | No | Human-readable description |
cluster |
string | No | Kafka cluster name |
owner |
string | No | Team or person responsible |
tags |
list | No | Categorization tags |
schema |
object | No | Schema definition |
columns |
list | No | Column metadata |
freshness |
object | No | Freshness SLA configuration |
Schema Definition¶
| Field | Type | Description |
|---|---|---|
registry |
string | confluent or custom registry type |
subject |
string | Schema Registry subject name |
format |
string | avro, json, protobuf |
definition |
string | Inline schema definition |
Column Definition¶
| Field | Type | Description |
|---|---|---|
name |
string | Column name |
type |
string | Flink SQL type (e.g., STRING, BIGINT, TIMESTAMP(3)) |
description |
string | Column description |
classification |
string | public, internal, confidential, sensitive, highly_sensitive |
proctime |
bool | Mark as processing time attribute (for Flink) |
Processing Time Example:
columns:
- name: order_id
type: STRING
- name: proc_time
type: TIMESTAMP(3)
proctime: true # This column will use PROCTIME() in Flink SQL
This generates:
Freshness Configuration¶
| Field | Type | Description |
|---|---|---|
max_lag_seconds |
int | Maximum acceptable lag (error) |
warn_after_seconds |
int | Lag threshold for warning |
Event Time Configuration¶
Configure event time semantics for streaming processing. This enables proper watermark generation for windowed aggregations.
Simple case (just specify the column):
sources:
- name: events
topic: events.raw.v1
event_time:
column: event_timestamp # Simple: just the column name
Advanced case (custom watermark settings):
sources:
- name: events
topic: events.raw.v1
columns:
- name: event_id
- name: user_id
- name: event_timestamp # Will be TIMESTAMP(3) in Flink
# Top-level: just the column name
event_time:
column: event_timestamp
# Advanced section: watermark details
advanced:
event_time:
watermark:
strategy: bounded_out_of_orderness
max_out_of_orderness_ms: 5000 # Allow 5 seconds of late data
allowed_lateness_ms: 60000 # Accept data up to 1 minute late
This generates proper Flink SQL watermark declarations:
CREATE TABLE events (
...
`event_timestamp` TIMESTAMP(3),
WATERMARK FOR `event_timestamp` AS `event_timestamp` - INTERVAL '5' SECOND
) WITH (...)
Event Time Fields:
| Field | Location | Type | Description | Required |
|---|---|---|---|---|
column |
Top-level | string | Column name containing event time | Yes |
watermark.strategy |
Advanced | string | bounded_out_of_orderness or monotonously_increasing |
No (default: bounded) |
watermark.max_out_of_orderness_ms |
Advanced | int | Max delay in ms | No (default: 5000) |
allowed_lateness_ms |
Advanced | int | Accept late events within window | No |
Watermark Strategies:
| Strategy | Use Case |
|---|---|
bounded_out_of_orderness |
Events can arrive out of order (most common) |
monotonously_increasing |
Events always arrive in order (rare) |
See Streaming Fundamentals for more on watermarks.
Models¶
Models define transformations that create new data streams.
Auto-Inferred Materialization¶
IMPORTANT: The materialized field is now auto-inferred from your SQL and configuration. You no longer need to specify it explicitly.
Inference Rules:
| SQL Pattern | Inferred Materialization |
|---|---|
Contains TUMBLE, HOP, SESSION, or CUMULATE |
flink |
Contains GROUP BY without window functions |
flink |
Contains JOIN |
flink |
Has gateway: configuration |
virtual_topic |
Has from: without sql: |
sink |
Simple SELECT, WHERE, projection |
topic |
Basic Example (No Advanced Config)¶
models:
- name: orders_validated
description: "Validated orders"
owner: orders-team
tags: [orders, validated]
sql: |
SELECT
order_id,
customer_id,
amount,
CASE WHEN amount > 1000 THEN 'high' ELSE 'normal' END as tier
FROM {{ source("orders_raw") }}
WHERE order_id IS NOT NULL
AND amount > 0
# materialized: topic (auto-inferred from simple SELECT)
Example With Advanced Overrides¶
models:
- name: hourly_revenue
description: "Hourly revenue aggregation"
owner: analytics-team
sql: |
SELECT
TUMBLE_START(order_time, INTERVAL '1' HOUR) as window_start,
SUM(amount) as revenue
FROM {{ ref("orders_validated") }}
GROUP BY TUMBLE(order_time, INTERVAL '1' HOUR)
# materialized: flink (auto-inferred from TUMBLE)
# Only specify when overriding defaults:
advanced:
flink:
parallelism: 8
checkpoint_interval_ms: 60000
state_backend: rocksdb
state_ttl_ms: 86400000
flink_cluster: production
topic:
name: analytics.revenue.v1
partitions: 12
replication_factor: 3
config:
retention.ms: 604800000
cleanup.policy: delete
columns:
- name: window_start
description: "Start of the hourly window"
- name: revenue
description: "Total revenue in the window"
classification: confidential
Full Example (All Options)¶
models:
- name: orders_validated
description: "Validated orders" # Optional but recommended
owner: orders-team # Optional
tags: [orders, validated] # Optional
access: protected # Optional: private, protected, public
group: orders # Optional: Logical grouping
version: 1 # Optional: Model version
key: order_id # Optional: Partition key
sql: | # Required (except for sink models)
SELECT
order_id,
customer_id,
amount,
CASE WHEN amount > 1000 THEN 'high' ELSE 'normal' END as tier
FROM {{ source("orders_raw") }}
WHERE order_id IS NOT NULL
AND amount > 0
# Advanced configuration (optional - only when overriding defaults)
advanced:
# Flink job settings (for flink materialization)
flink:
parallelism: 4
checkpoint_interval_ms: 60000
state_backend: rocksdb
state_ttl_ms: 86400000
# Target Flink cluster
flink_cluster: production
# Output topic configuration
topic:
name: orders.validated.v1
partitions: 12
replication_factor: 3
config:
retention.ms: 604800000
cleanup.policy: delete
# Connect cluster (for sink materialization)
connect_cluster: production
# Event time configuration (advanced)
event_time:
watermark:
strategy: bounded_out_of_orderness
max_out_of_orderness_ms: 5000
allowed_lateness_ms: 60000
# Column metadata (stays at top level)
columns:
- name: customer_id
description: "Customer identifier"
classification: internal
- name: amount
description: "Order amount"
classification: confidential
security: # Optional
classification:
customer_id: internal
amount: confidential
policies:
- mask:
column: customer_email
method: hash
for_roles: [analyst]
deprecation: # Optional: Version deprecation
v1:
sunset_date: "2025-06-01"
message: "Use v2 instead"
Model Fields (Top Level)¶
Top-level fields (always visible, business-focused):
| Field | Type | Required | Description |
|---|---|---|---|
name |
string | Yes | Unique model identifier |
description |
string | No | Human-readable description |
owner |
string | No | Team or person responsible |
tags |
list | No | Categorization tags |
access |
string | No | private, protected, public |
group |
string | No | Logical grouping |
version |
int | No | Model version number |
key |
string | No | Partition key column |
sql |
string | Conditional | SQL transformation (required except for sink) |
from |
string | Conditional | Source model (required for sink) |
columns |
list | No | Column metadata and documentation |
security |
object | No | Security policies and classification |
ml_outputs |
object | No | ML model output schemas for ML_PREDICT (see below) |
Note: The materialized field is no longer specified - it's auto-inferred from your SQL and configuration.
ML Output Schemas¶
When using ML_PREDICT in your SQL, declare the expected output schema so streamt can properly type-check the results:
models:
- name: fraud_scored_orders
description: "Orders with fraud predictions"
sql: |
SELECT
order_id,
amount,
ML_PREDICT('fraud_detector', order_id, amount, customer_id) as prediction
FROM {{ ref("orders_validated") }}
# Declare ML model output schemas for type inference
ml_outputs:
fraud_detector:
description: "Fraud detection ML model"
columns:
- name: is_fraud
type: BOOLEAN
- name: confidence
type: DOUBLE
- name: risk_score
type: DOUBLE
| Field | Type | Description |
|---|---|---|
ml_outputs.<model_name>.columns |
list | Output columns from the ML model |
ml_outputs.<model_name>.description |
string | Description of the ML model |
Confluent Cloud Only
ML_PREDICT and ML_EVALUATE are only available on Confluent Cloud Flink clusters.
Model Fields (Advanced Section)¶
Advanced section fields (optional, infrastructure/tuning):
| Field | Type | Description |
|---|---|---|
advanced.flink |
object | Flink job configuration (see below) |
advanced.flink_cluster |
string | Target Flink cluster name |
advanced.topic |
object | Output topic configuration (see below) |
advanced.connect_cluster |
string | Target Kafka Connect cluster |
advanced.event_time |
object | Event time watermark configuration |
advanced.gateway |
object | Gateway-specific settings |
Advanced: Topic Configuration¶
Nested under advanced.topic::
| Field | Type | Description |
|---|---|---|
name |
string | Explicit topic name (default: model name) |
partitions |
int | Partition count |
replication_factor |
int | Replication factor |
config |
map | Kafka topic configuration |
Advanced: Gateway Configuration¶
Nested under advanced.gateway: (for virtual topic models):
| Field | Type | Description |
|---|---|---|
virtual_topic.name |
string | Custom virtual topic name |
virtual_topic.compression |
string | Compression type (gzip, snappy, lz4, zstd) |
models:
- name: filtered_orders
description: "Orders filtered by region"
sql: |
SELECT * FROM {{ source("orders") }}
WHERE region = 'US'
# Gateway virtual topic with compression
advanced:
gateway:
virtual_topic:
name: orders.us.filtered
compression: lz4
Virtual Topics
Virtual topics are automatically inferred when Gateway is configured. They provide schema transformation and data masking without creating physical Kafka topics. See the Gateway Guide.
Advanced: Flink Configuration¶
Nested under advanced.flink::
| Field | Type | Description | Status |
|---|---|---|---|
parallelism |
int | Job parallelism | Supported |
checkpoint_interval_ms |
int | Checkpoint interval in milliseconds | Supported |
state_backend |
string | hashmap or rocksdb |
Parsed only |
state_ttl_ms |
int | State time-to-live in milliseconds | Supported |
Planned for Future:
# PLANNED - Not yet supported
advanced:
flink:
parallelism: 4
checkpoint_interval_ms: 60000
state_backend: rocksdb
state_ttl_ms: 86400000
# Advanced checkpointing (planned)
checkpoint_timeout_ms: 600000
checkpoint_min_pause_ms: 500
max_concurrent_checkpoints: 1
See Flink Options Reference for complete Flink configuration.
Security Policies¶
| Field | Type | Description |
|---|---|---|
classification |
map | Column name → classification level |
policies |
list | Masking and access policies |
Masking Methods¶
| Method | Description |
|---|---|
hash |
SHA-256 hash of value |
redact |
Replace with *** |
partial |
Show partial value (e.g., ***-1234) |
tokenize |
Replace with reversible token |
null |
Replace with null |
Sink Models¶
Sink models export data to external systems via Kafka Connect.
models:
- name: orders_snowflake
description: "Export orders to Snowflake"
# materialized: sink (auto-inferred from having 'from' without 'sql')
from: orders_validated # Source model or topic
sink:
connector: snowflake-sink
config:
snowflake.url.name: ${SNOWFLAKE_URL}
snowflake.user.name: ${SNOWFLAKE_USER}
snowflake.private.key: ${SNOWFLAKE_KEY}
snowflake.database.name: ANALYTICS
snowflake.schema.name: RAW
snowflake.table.name: ORDERS
tasks.max: 4
# Only when overriding defaults:
advanced:
connect_cluster: production
Sink Configuration¶
| Field | Type | Required | Description |
|---|---|---|---|
connector |
string | Yes | Connector type/class |
config |
map | Yes | Connector configuration |
Common Connector Types¶
| Type | Use Case |
|---|---|
snowflake-sink |
Snowflake data warehouse |
bigquery-sink |
Google BigQuery |
s3-sink |
Amazon S3 |
gcs-sink |
Google Cloud Storage |
jdbc-sink |
PostgreSQL, MySQL, etc. |
elasticsearch-sink |
Elasticsearch/OpenSearch |
http-sink |
HTTP/REST APIs |
Tests¶
Tests validate data quality in your streaming pipelines.
tests:
- name: orders_schema_test # Required
model: orders_validated # Required: Model to test
type: schema # Required: schema, sample, continuous
flink_cluster: production # Optional
assertions: # Required
- not_null:
columns: [order_id, customer_id, amount]
- accepted_values:
column: status
values: [pending, confirmed, shipped, delivered]
- accepted_types:
types:
order_id: STRING
amount: DOUBLE
created_at: TIMESTAMP
- range:
column: amount
min: 0
max: 1000000
- unique_key:
key: order_id
window: "1 HOUR"
tolerance: 0.01
on_failure: # Optional
severity: error # error, warning
actions:
- alert:
type: slack
channel: "#data-alerts"
- dlq:
topic: test_failures
Test Fields¶
| Field | Type | Required | Description |
|---|---|---|---|
name |
string | Yes | Unique test identifier |
model |
string | Yes | Model to test |
type |
string | Yes | schema, sample, continuous |
assertions |
list | Yes | Test assertions |
sample_size |
int | No | Rows to sample (for sample tests) |
flink_cluster |
string | No | Flink cluster for execution |
on_failure |
object | No | Failure handling |
Test Types¶
| Type | Description |
|---|---|
schema |
Validate schema structure and types |
sample |
Test assertions on sampled data |
continuous |
Ongoing streaming validation |
Assertion Types¶
| Assertion | Description | Fields |
|---|---|---|
not_null |
Columns must not be null | columns |
accepted_values |
Column values must be in set | column, values |
accepted_types |
Column types must match | types (map) |
range |
Numeric column within range | column, min, max |
unique_key |
Key uniqueness within window | key, window, tolerance |
max_lag |
Max lag from event time | column, max_seconds |
throughput |
Messages per second bounds | min_per_second, max_per_second |
distribution |
Value distribution buckets | column, buckets |
foreign_key |
Referential integrity | column, ref_model, ref_key |
custom_sql |
Custom SQL assertion | sql, expect |
Exposures¶
Exposures document downstream consumers of your data.
exposures:
- name: checkout-service # Required
type: application # Required
role: consumer # Optional: producer, consumer, both
description: "Checkout microservice"
owner: checkout-team
url: https://github.com/org/checkout
repo: org/checkout
language: java
consumes: # Topics/models consumed
- ref: orders_validated
- source: inventory_updates
produces: # Topics/models produced
- ref: checkout_events
consumer_group: checkout-service-cg
sla:
availability: "99.9%"
max_lag_messages: 1000
max_lag_minutes: 5
max_end_to_end_latency_ms: 500
contracts:
schema: orders-validated-value
compatibility: BACKWARD
access:
roles: [checkout-team, platform-team]
purpose: "Process customer checkouts"
Exposure Types¶
| Type | Description |
|---|---|
application |
Microservice or application |
dashboard |
BI dashboard or reporting |
ml_training |
ML training pipeline |
ml_inference |
ML inference service |
api |
External API |
Exposure Fields¶
| Field | Type | Required | Description |
|---|---|---|---|
name |
string | Yes | Unique exposure identifier |
type |
string | Yes | Exposure type |
role |
string | No | producer, consumer, both |
description |
string | No | Description |
owner |
string | No | Owner team/person |
url |
string | No | Documentation URL |
repo |
string | No | Repository |
language |
string | No | Programming language |
tool |
string | No | BI tool (for dashboards) |
consumes |
list | No | Consumed sources/refs |
produces |
list | No | Produced sources/refs |
consumer_group |
string | No | Kafka consumer group |
sla |
object | No | SLA configuration |
contracts |
object | No | Schema contracts |
access |
object | No | Access configuration |
schedule |
string | No | Cron schedule (for batch) |
SLA Configuration¶
| Field | Type | Description |
|---|---|---|
availability |
string | Uptime target (e.g., "99.9%") |
max_lag_messages |
int | Max message lag |
max_lag_minutes |
int | Max time lag |
max_produce_latency_ms |
int | Max producer latency |
max_end_to_end_latency_ms |
int | Max E2E latency |
max_error_rate |
float | Max error rate (0.0-1.0) |
SQL Macros¶
Use Jinja-style macros in SQL to reference sources and models.
source()¶
Reference a source topic:
ref()¶
Reference another model:
Example with Multiple References¶
models:
- name: order_enriched
description: "Orders enriched with customer data"
# materialized: flink (auto-inferred from JOIN)
sql: |
SELECT
o.order_id,
o.amount,
c.name as customer_name,
c.tier as customer_tier
FROM {{ ref("orders_validated") }} o
JOIN {{ source("customers") }} c
ON o.customer_id = c.id
AND o.order_time BETWEEN c.update_time - INTERVAL '1' HOUR
AND c.update_time + INTERVAL '1' HOUR
# Only when overriding defaults:
advanced:
flink:
parallelism: 4
Environment Variables¶
Use ${VAR_NAME} syntax to reference environment variables:
runtime:
kafka:
bootstrap_servers: ${KAFKA_BOOTSTRAP_SERVERS}
sasl_password: ${KAFKA_PASSWORD}
schema_registry:
url: ${SCHEMA_REGISTRY_URL}
Variables can be set via:
- System environment:
export KAFKA_PASSWORD=secret .envfile in project root- CI/CD secrets
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_PASSWORD=secret
SCHEMA_REGISTRY_URL=http://localhost:8081
File Organization¶
Single File (Small Projects)¶
project:
name: simple-pipeline
runtime:
kafka:
bootstrap_servers: localhost:9092
sources:
- name: events
topic: events.raw
models:
- name: events_clean
description: "Cleaned event stream"
# materialized: topic (auto-inferred from simple SELECT)
sql: |
SELECT * FROM {{ source("events") }}
WHERE event_id IS NOT NULL
Multi-File (Large Projects)¶
my-pipeline/
├── stream_project.yml # Project config + runtime
├── sources/
│ ├── orders.yml
│ └── customers.yml
├── models/
│ ├── staging/
│ │ ├── stg_orders.yml
│ │ └── stg_customers.yml
│ └── marts/
│ ├── order_metrics.yml
│ └── customer_360.yml
├── tests/
│ ├── orders_tests.yml
│ └── customers_tests.yml
└── exposures/
└── applications.yml
Each YAML file can contain its respective type: