Skip to content

E-commerce Events

Track user behavior in real-time for personalization, recommendations, and analytics.

Architecture

graph LR
    A[clickstream] --> B[events_clean]
    C[products] --> D[events_enriched]
    B --> D
    D --> E[user_sessions]
    D --> F[product_popularity]
    E --> G[Personalization API]
    F --> H[Recommendations]

Use Cases

  • Session Analytics: Track user journeys in real-time
  • Product Popularity: Real-time trending products
  • Personalization: Feed ML models with user behavior
  • A/B Testing: Measure feature impact live

Sources

sources/events.yml
sources:
  - name: clickstream
    topic: ecom.clickstream.v1
    description: Raw user interaction events from web/mobile
    owner: frontend-team
    freshness:
      warn_after: 30s
      error_after: 2m
    columns:
      - name: event_id
        description: Unique event identifier
      - name: session_id
        description: User session identifier
      - name: user_id
        description: Authenticated user ID (nullable)
        classification: internal
      - name: anonymous_id
        description: Anonymous tracking ID
        classification: internal
      - name: event_type
        description: Event type (page_view, click, add_to_cart, purchase)
      - name: page_url
        description: Current page URL
      - name: referrer_url
        description: Referring URL
      - name: product_id
        description: Product ID (if applicable)
      - name: search_query
        description: Search query (if applicable)
      - name: device_type
        description: Device type (desktop, mobile, tablet)
      - name: browser
        description: Browser name
      - name: country
        description: User country (from IP)
      - name: timestamp
        description: Event timestamp

  - name: products
    topic: ecom.products.v1
    description: Product catalog for enrichment
    owner: catalog-team
    columns:
      - name: product_id
      - name: name
      - name: category
      - name: subcategory
      - name: price
      - name: brand

Models

Clean Events

models/events_clean.yml
models:
  - name: events_clean
    description: Cleaned and validated user events
    owner: data-platform
    key: session_id

    # Simple SELECT/WHERE is auto-inferred as 'topic' materialization
    sql: |
      SELECT
        event_id,
        session_id,
        COALESCE(user_id, anonymous_id) as user_identifier,
        user_id,
        anonymous_id,
        event_type,
        page_url,
        referrer_url,
        product_id,
        search_query,
        device_type,
        browser,
        country,
        `timestamp`
      FROM {{ source("clickstream") }}
      WHERE event_id IS NOT NULL
        AND session_id IS NOT NULL
        AND event_type IN ('page_view', 'click', 'add_to_cart', 'remove_from_cart', 'purchase', 'search')

    # Override defaults only when needed
    advanced:
      topic:
        name: ecom.events-clean.v1
        partitions: 24

Enrich with Product Data

models/events_enriched.yml
models:
  - name: events_enriched
    description: Events enriched with product details
    owner: data-platform
    key: session_id

    # JOIN is auto-inferred as 'flink' materialization
    sql: |
      SELECT
        e.event_id,
        e.session_id,
        e.user_identifier,
        e.user_id,
        e.event_type,
        e.page_url,
        e.product_id,
        e.search_query,
        e.device_type,
        e.country,
        e.`timestamp`,
        p.name as product_name,
        p.category as product_category,
        p.subcategory as product_subcategory,
        p.price as product_price,
        p.brand as product_brand
      FROM {{ ref("events_clean") }} e
      LEFT JOIN {{ source("products") }} FOR SYSTEM_TIME AS OF e.`timestamp` AS p
        ON e.product_id = p.product_id

    # Override defaults only when needed
    advanced:
      flink:
        parallelism: 8
      topic:
        name: ecom.events-enriched.v1
        partitions: 24

User Sessions

models/user_sessions.yml
models:
  - name: user_sessions
    description: |
      Aggregated user sessions with behavior metrics.
      Sessions are 30-minute tumbling windows.
    owner: analytics-team
    key: session_id

    # TUMBLE + GROUP BY is auto-inferred as 'flink' materialization
    sql: |
      SELECT
        session_id,
        user_identifier,
        device_type,
        country,
        TUMBLE_START(`timestamp`, INTERVAL '30' MINUTE) as session_start,
        TUMBLE_END(`timestamp`, INTERVAL '30' MINUTE) as session_end,
        COUNT(*) as event_count,
        COUNT(DISTINCT page_url) as pages_viewed,
        SUM(CASE WHEN event_type = 'page_view' THEN 1 ELSE 0 END) as page_views,
        SUM(CASE WHEN event_type = 'click' THEN 1 ELSE 0 END) as clicks,
        SUM(CASE WHEN event_type = 'add_to_cart' THEN 1 ELSE 0 END) as add_to_carts,
        SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchases,
        SUM(CASE WHEN event_type = 'search' THEN 1 ELSE 0 END) as searches,
        COLLECT(DISTINCT product_category) as categories_browsed,
        MAX(product_price) as max_product_price,
        AVG(product_price) as avg_product_price
      FROM {{ ref("events_enriched") }}
      GROUP BY
        session_id,
        user_identifier,
        device_type,
        country,
        TUMBLE(`timestamp`, INTERVAL '30' MINUTE)

    # Override defaults only when needed
    advanced:
      flink:
        parallelism: 8
        checkpoint_interval: 60000
      topic:
        name: ecom.user-sessions.v1
        partitions: 12

Product Popularity

models/product_popularity.yml
models:
  - name: product_popularity
    description: |
      Real-time product popularity scores.
      Updated every 5 minutes based on user interactions.
    owner: recommendations-team
    key: product_id

    # TUMBLE + GROUP BY is auto-inferred as 'flink' materialization
    sql: |
      SELECT
        product_id,
        product_name,
        product_category,
        TUMBLE_START(`timestamp`, INTERVAL '5' MINUTE) as window_start,
        TUMBLE_END(`timestamp`, INTERVAL '5' MINUTE) as window_end,
        COUNT(*) as total_interactions,
        SUM(CASE WHEN event_type = 'page_view' THEN 1 ELSE 0 END) as views,
        SUM(CASE WHEN event_type = 'add_to_cart' THEN 1 ELSE 0 END) as add_to_carts,
        SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchases,
        COUNT(DISTINCT user_identifier) as unique_users,
        -- Popularity score: weighted sum
        (
          SUM(CASE WHEN event_type = 'page_view' THEN 1 ELSE 0 END) * 1 +
          SUM(CASE WHEN event_type = 'add_to_cart' THEN 5 ELSE 0 END) +
          SUM(CASE WHEN event_type = 'purchase' THEN 10 ELSE 0 END)
        ) as popularity_score
      FROM {{ ref("events_enriched") }}
      WHERE product_id IS NOT NULL
      GROUP BY
        product_id,
        product_name,
        product_category,
        TUMBLE(`timestamp`, INTERVAL '5' MINUTE)

    # Override defaults only when needed
    advanced:
      flink:
        parallelism: 4
        checkpoint_interval: 30000
      topic:
        name: ecom.product-popularity.v1
        partitions: 6

Tests

tests/events_tests.yml
tests:
  - name: events_schema
    model: events_clean
    type: schema
    assertions:
      - not_null:
          columns: [event_id, session_id, event_type]
      - accepted_values:
          column: event_type
          values: [page_view, click, add_to_cart, remove_from_cart, purchase, search]
      - accepted_values:
          column: device_type
          values: [desktop, mobile, tablet]

  - name: sessions_quality
    model: user_sessions
    type: sample
    sample_size: 1000
    assertions:
      - range:
          column: event_count
          min: 1
          max: 10000

  - name: events_freshness
    model: events_clean
    type: continuous
    assertions:
      - max_lag:
          seconds: 30
      - throughput:
          min_per_minute: 1000
    on_failure:
      - alert:
          channel: slack
          webhook: ${SLACK_WEBHOOK}

Exposures

exposures/services.yml
exposures:
  - name: personalization_api
    type: api
    description: Serves personalized content based on user behavior
    owner: personalization-team
    role: consumer
    consumer_group: personalization-cg
    consumes:
      - ref: user_sessions
    sla:
      latency_p99_ms: 50
      availability: 99.95

  - name: recommendation_service
    type: ml_inference
    description: Real-time product recommendations
    owner: ml-team
    role: consumer
    consumer_group: recommendations-cg
    consumes:
      - ref: product_popularity
      - ref: user_sessions
    sla:
      latency_p99_ms: 100