Creating and Managing Data Products

Data Products

Data products are refined data assets with transformations applied, ready for consumption. They function as SQL tables containing processed data suitable for specific business purposes or analysis.

Data Product Types

  • Source-Aligned Data Products (SADPs): Direct transformations of raw data from data objects

  • Consumer-Aligned Data Products (CADPs): Transformations built on top of other data products, creating layers of refined data

When to Use Data Products

  • Business-ready data: Transform raw data into formats suitable for specific business purposes

  • Consumer needs: Create data tailored to specific analytical or operational requirements

  • Data refinement: Apply business logic, calculations, and aggregations to raw data

  • Consumption layer: Provide clean, structured data for applications and analytics tools

Creating a Data Product

Step 1: Create Data Product

Endpoint: POST /api/data/data_product

{
  "entity": {
    "name": "Customer Analytics Product",
    "entity_type": "data_product",
    "label": "CAP",
    "description": "Processed customer data for analytics dashboard"
  },
  "entity_info": {
    "owner": "[email protected]",
    "contact_ids": ["Data Product contact"],
    "links": ["example.com"]
  },
  "host_mesh_identifier": "mesh-id-here"
}

Step 2: Define Schema

Endpoint: PUT /api/data/data_product/schema?identifier={product_id}

{
  "details": {
    "product_type": "stored",
    "fields": [
      {
        "name": "customer_id",
        "primary": true,
        "optional": false,
        "data_type": {
          "meta": {},
          "column_type": "VARCHAR"
        },
        "classification": 3,
        "sensitivity": 1
      },
      {
        "name": "total_purchases",
        "primary": false,
        "optional": false,
        "data_type": {
          "meta": {},
          "column_type": "DECIMAL"
        },
        "classification": 1
      }
    ]
  }
}

Step 3: Assign to Mesh (if not done during creation)

Endpoint: PATCH /api/data/data_product?identifier={product_id}

{
  "mesh_identifier": "your-mesh-id"
}

Python Functions

def create_data_product(name, description, mesh_id=None, owner_email="[email protected]"):
    """Create a new data product"""
    request_body = {
        "entity": {
            "name": name,
            "entity_type": "data_product",
            "label": name[:3].upper(),
            "description": description
        },
        "entity_info": {
            "owner": owner_email,
            "contact_ids": [f"{name} contact"],
            "links": ["example.com"]
        }
    }
    
    if mesh_id:
        request_body["host_mesh_identifier"] = mesh_id
    
    response = requests.post(
        f"{API_URL}/data/data_product",
        headers=get_headers(),
        json=request_body
    )
    
    if response.status_code == 200:
        product_id = response.json()["entity"]["identifier"]
        print(f"Data product '{name}' created with ID: {product_id}")
        return product_id
    else:
        print(f"Error creating data product: {response.text}")
        return None

def assign_product_to_mesh(product_id, mesh_id):
    """Assign data product to a mesh"""
    response = requests.patch(
        f"{API_URL}/data/data_product?identifier={product_id}",
        headers=get_headers(),
        json={"mesh_identifier": mesh_id}
    )
    
    return response.status_code == 200

def set_product_schema(product_id, fields):
    """Set schema for data product"""
    schema = {
        "details": {
            "product_type": "stored",
            "fields": fields
        }
    }
    
    response = requests.put(
        f"{API_URL}/data/data_product/schema?identifier={product_id}",
        headers=get_headers(),
        json=schema
    )
    
    if response.status_code == 200:
        print(f"Schema set for product {product_id}")
        return True
    else:
        print(f"Error setting schema: {response.text}")
        return False

def create_complete_data_product(name, description, mesh_id, schema_fields):
    """Create data product with schema in one go"""
    # Create product
    product_id = create_data_product(name, description, mesh_id)
    if not product_id:
        return None
    
    # Set schema
    if not set_product_schema(product_id, schema_fields):
        print("Failed to set schema")
        return None
    
    print(f"Data product setup completed: {product_id}")
    return product_id

Schema Field Structure

def create_field(name, column_type, primary=False, optional=False, classification=1, sensitivity=None):
    """Helper function to create schema fields"""
    field = {
        "name": name,
        "primary": primary,
        "optional": optional,
        "data_type": {
            "meta": {},
            "column_type": column_type
        },
        "classification": classification
    }
    
    if sensitivity is not None:
        field["sensitivity"] = sensitivity
    
    return field

# Example schema fields
customer_schema = [
    create_field("customer_id", "VARCHAR", primary=True, classification=3, sensitivity=1),
    create_field("email", "VARCHAR", classification=3, sensitivity=2),
    create_field("total_orders", "INTEGER", classification=1),
    create_field("last_purchase_date", "DATE", classification=1),
    create_field("lifetime_value", "DECIMAL", classification=2)
]

Supported Column Types

  • VARCHAR - Variable character strings

  • INTEGER - 32-bit integers

  • BIGINT - 64-bit integers

  • DECIMAL - Decimal numbers

  • DOUBLE - Double precision floating point

  • BOOLEAN - True/false values

  • DATE - Date values

  • TIMESTAMP - Timestamp values

  • TIMESTAMPTZ - Timestamp with timezone

  • JSON - JSON data

  • ARRAY - Array data

  • UUID - UUID values

Classification and Sensitivity Levels

  • Classification: Categorizes the data product according to the data classification policies set at Foundation-level (for example: Confidential, Public, Secret). Only one value is allowed.

  • Sensitivity: Set of tags defined at Foundation-level that classifies the data in the data product acording to different sensitivities or frameworks (for example: PII, Biometric,...). More than one value can be selected.

Management Operations

def list_data_products():
    """List all data products"""
    response = requests.get(f"{API_URL}/data/data_product/list", headers=get_headers())
    return response.json() if response.status_code == 200 else None

def get_queryable_products():
    """Get products available for querying"""
    response = requests.get(f"{API_URL}/data/data_product/list/query", headers=get_headers())
    return response.json() if response.status_code == 200 else None

def get_data_product(product_id):
    """Get specific data product details"""
    response = requests.get(
        f"{API_URL}/data/data_product?identifier={product_id}",
        headers=get_headers()
    )
    return response.json() if response.status_code == 200 else None

def get_product_metadata(product_id):
    """Get data product metadata"""
    response = requests.get(
        f"{API_URL}/data/data_product/metadata?identifier={product_id}",
        headers=get_headers()
    )
    return response.json() if response.status_code == 200 else None

def update_product_metadata(product_id, metadata):
    """Update data product metadata"""
    response = requests.put(
        f"{API_URL}/data/data_product/metadata?identifier={product_id}",
        headers=get_headers(),
        json=metadata
    )
    return response.status_code == 200

Example Usage

# Define schema for customer analytics product
customer_fields = [
    create_field("customer_id", "VARCHAR", primary=True, classification=3, sensitivity=1),
    create_field("segment", "VARCHAR", classification=2),
    create_field("total_spent", "DECIMAL", classification=2),
    create_field("order_count", "INTEGER", classification=1),
    create_field("avg_order_value", "DECIMAL", classification=2),
    create_field("last_activity", "TIMESTAMP", classification=1)
]

# Create complete data product
product_id = create_complete_data_product(
    name="Customer Segmentation Analytics",
    description="Customer data processed for segmentation and analytics",
    mesh_id="customer-analytics-mesh-id",
    schema_fields=customer_fields
)

# Check if product was created successfully
if product_id:
    product_details = get_data_product(product_id)
    print("Product created:", product_details["entity"]["name"])
    
    # List all queryable products
    queryable = get_queryable_products()
    print(f"Total queryable products: {len(queryable)}")

SADP vs CADP Examples

# SADP: Transform raw transaction data
sadp_fields = [
    create_field("transaction_id", "VARCHAR", primary=True),
    create_field("customer_id", "VARCHAR", classification=3),
    create_field("amount", "DECIMAL"),
    create_field("transaction_date", "DATE"),
    create_field("product_category", "VARCHAR")
]

sadp_id = create_complete_data_product(
    name="Clean Transaction Data",
    description="SADP: Cleaned and validated transaction data",
    mesh_id="sales-mesh-id",
    schema_fields=sadp_fields
)

# CADP: Aggregate data from SADP
cadp_fields = [
    create_field("customer_id", "VARCHAR", primary=True),
    create_field("monthly_spend", "DECIMAL"),
    create_field("transaction_count", "INTEGER"),
    create_field("favorite_category", "VARCHAR"),
    create_field("report_month", "DATE", primary=True)
]

cadp_id = create_complete_data_product(
    name="Monthly Customer Summary",
    description="CADP: Monthly aggregated customer spending patterns",
    mesh_id="analytics-mesh-id",
    schema_fields=cadp_fields
)

Efficient Data Product Design

  • Start with the end in mind – understand what consumers need

  • Reuse existing data products where possible

  • Design transformations for performance and maintainability

  • Implement appropriate data quality checks

Last updated