Use Case

Akeneo as a Data Source: Building Production ETL Pipelines

Akeneo holds the most enriched, validated product data in the company — but most data pipelines ignore it because the API is complex and the product model hierarchy requires non-trivial transformation logic. SyncPIM bridges that gap.

Why Akeneo data belongs in your data pipeline

Most data warehouses have sales, orders, web events, and CRM data — but product data is missing or stale. This creates gaps that product data from Akeneo could fill:

Gap: Revenue by product attribute

Join sales facts with Akeneo product attributes (family, brand, material) to find which attributes drive revenue

Gap: Catalog completeness KPIs

Track % of products with descriptions in all locales, images per family, price coverage by channel

Gap: Search & recommendation quality

Feed enriched product data (categories, attributes, synonyms) into your ML pipeline

Gap: Pricing intelligence

Join Akeneo cost prices with competitor scraping data for margin analysis

Pipeline architectures

Akeneo → SyncPIM → PostgreSQL → dbt → BI

Recommended

1. SyncPIM syncs Akeneo → staging schema in PostgreSQL (raw_akeneo.products)

2. dbt models transform raw JSONB → normalized analytics tables

3. BI tools (Metabase, Looker) read from the dbt-generated schema

Best for teams with existing dbt workflows. SyncPIM handles the Extract + Load; dbt handles Transform.

Akeneo → SyncPIM → PostgreSQL → BigQuery (via Airbyte or Datastream)

1. SyncPIM syncs Akeneo → PostgreSQL (incremental daily)

2. Airbyte or Google Cloud Datastream replicates PostgreSQL → BigQuery

3. Looker / Data Studio queries BigQuery

Best for enterprises already on BigQuery. PostgreSQL acts as intermediate staging; no Akeneo-to-BigQuery custom connector needed.

Akeneo → SyncPIM → MongoDB → Spark / Databricks

1. SyncPIM syncs Akeneo → MongoDB Atlas

2. Databricks reads from MongoDB Atlas via Spark connector

3. ML pipelines consume product attributes as features

Best for ML teams that use Databricks. MongoDB's JSON model maps naturally to Akeneo's attribute structure.

dbt integration: transforming raw Akeneo JSONB

SyncPIM loads raw Akeneo data as JSONB into PostgreSQL. dbt models then flatten it into analytics-friendly tables. A typical dbt layer for Akeneo data:

-- models/staging/stg_akeneo_products.sql
-- Flatten JSONB into typed columns for downstream models

SELECT
  identifier,
  family,
  enabled,
  updated_at,

  -- Extract localizable text attribute (en_US)
  (SELECT elem->>'data'
   FROM jsonb_array_elements(data->'name') elem
   WHERE elem->>'locale' = 'en_US' LIMIT 1) AS name_en,

  -- Extract non-localizable attribute
  data->>'ean' AS ean,

  -- Extract price (EUR)
  (SELECT (price_item->>'amount')::numeric
   FROM jsonb_array_elements(data->'price'->0->'data') price_item
   WHERE price_item->>'currency' = 'EUR' LIMIT 1) AS price_eur,

  -- Extract category codes as array
  ARRAY(SELECT jsonb_array_elements_text(to_jsonb(categories))) AS category_codes

FROM raw_akeneo.products
WHERE enabled = true
-- models/marts/catalog_completeness.sql
-- Business KPI: % of products with required attributes per family

WITH base AS (
  SELECT
    family,
    COUNT(*) AS total_products,
    COUNT(name_en) AS has_name,
    COUNT(price_eur) AS has_price,
    COUNT(CASE WHEN array_length(category_codes, 1) > 0 THEN 1 END) AS has_category
  FROM {{ ref('stg_akeneo_products') }}
  GROUP BY family
)
SELECT
  family,
  total_products,
  ROUND(100.0 * has_name / total_products, 1) AS name_coverage_pct,
  ROUND(100.0 * has_price / total_products, 1) AS price_coverage_pct,
  ROUND(100.0 * has_category / total_products, 1) AS category_coverage_pct
FROM base
ORDER BY total_products DESC

Orchestration: where SyncPIM fits

SyncPIM is a managed EL (Extract-Load) step. It fits naturally into Airflow / Prefect / Dagster pipelines as a triggerable step:

# Airflow DAG — daily product data pipeline
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

with DAG('akeneo_product_pipeline',
         schedule_interval='0 2 * * *',  # 2am daily
         start_date=datetime(2025, 1, 1)) as dag:

    # Step 1: Trigger SyncPIM incremental export via API
    trigger_syncpim = SimpleHttpOperator(
        task_id='trigger_syncpim_export',
        http_conn_id='syncpim_api',
        endpoint='/api/v1/exports/{export_id}/trigger',
        method='POST',
        headers={"Authorization": "Bearer {{ var.value.syncpim_api_key }}"},
        response_check=lambda r: r.json()['status'] == 'queued',
    )

    # Step 2: Wait for export completion (poll)
    wait_for_export = SimpleHttpOperator(
        task_id='wait_syncpim_complete',
        http_conn_id='syncpim_api',
        endpoint='/api/v1/exports/{export_id}/status',
        method='GET',
        response_check=lambda r: r.json()['status'] == 'completed',
    )

    # Step 3: Run dbt transformations
    run_dbt = BashOperator(
        task_id='run_dbt_models',
        bash_command='dbt run --select staging.stg_akeneo_products+ --target prod',
    )

    trigger_syncpim >> wait_for_export >> run_dbt
Alternative: Use SyncPIM's built-in scheduling instead of Airflow for the Extract-Load step. Let SyncPIM handle the Akeneo sync on its own schedule, and have your dbt job run on a fixed offset (e.g., SyncPIM at 2am, dbt at 3am). Simpler — no orchestration needed for the first step.

Recommended schema design for analytics

-- Raw layer (loaded by SyncPIM):
CREATE SCHEMA raw_akeneo;
CREATE TABLE raw_akeneo.products (
  identifier TEXT PRIMARY KEY,
  family TEXT,
  parent TEXT,           -- null for standalone, code for variants
  categories TEXT[],
  enabled BOOLEAN,
  data JSONB,            -- all attribute values
  created_at TIMESTAMPTZ,
  updated_at TIMESTAMPTZ,
  synced_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX ON raw_akeneo.products USING GIN (data);
CREATE INDEX ON raw_akeneo.products (family);
CREATE INDEX ON raw_akeneo.products (updated_at);

-- Analytics layer (generated by dbt):
-- dim_products: one row per SKU, typed columns
-- dim_categories: category tree with paths
-- fct_catalog_completeness: daily snapshot of coverage KPIs

Add Akeneo to your data stack today

SyncPIM handles the Akeneo API complexity. Your pipeline gets clean, flattened product data in PostgreSQL or MongoDB.

Related resources