Creating and Managing Data Products
Data Products
Data products are refined data assets with transformations applied, ready for consumption. They function as SQL tables containing processed data suitable for specific business purposes or analysis.
Data Product Types
Source-Aligned Data Products (SADPs): Direct transformations of raw data from data objects
Consumer-Aligned Data Products (CADPs): Transformations built on top of other data products, creating layers of refined data
When to Use Data Products
Business-ready data: Transform raw data into formats suitable for specific business purposes
Consumer needs: Create data tailored to specific analytical or operational requirements
Data refinement: Apply business logic, calculations, and aggregations to raw data
Consumption layer: Provide clean, structured data for applications and analytics tools
Creating a Data Product
Step 1: Create Data Product
Endpoint: POST /api/data/data_product
{
"entity": {
"name": "Customer Analytics Product",
"entity_type": "data_product",
"label": "CAP",
"description": "Processed customer data for analytics dashboard"
},
"entity_info": {
"owner": "[email protected]",
"contact_ids": ["Data Product contact"],
"links": ["example.com"]
},
"host_mesh_identifier": "mesh-id-here"
}Step 2: Define Schema
Endpoint: PUT /api/data/data_product/schema?identifier={product_id}
{
"details": {
"product_type": "stored",
"fields": [
{
"name": "customer_id",
"primary": true,
"optional": false,
"data_type": {
"meta": {},
"column_type": "VARCHAR"
},
"classification": 3,
"sensitivity": 1
},
{
"name": "total_purchases",
"primary": false,
"optional": false,
"data_type": {
"meta": {},
"column_type": "DECIMAL"
},
"classification": 1
}
]
}
}Step 3: Assign to Mesh (if not done during creation)
Endpoint: PATCH /api/data/data_product?identifier={product_id}
{
"mesh_identifier": "your-mesh-id"
}Python Functions
def create_data_product(name, description, mesh_id=None, owner_email="[email protected]"):
"""Create a new data product"""
request_body = {
"entity": {
"name": name,
"entity_type": "data_product",
"label": name[:3].upper(),
"description": description
},
"entity_info": {
"owner": owner_email,
"contact_ids": [f"{name} contact"],
"links": ["example.com"]
}
}
if mesh_id:
request_body["host_mesh_identifier"] = mesh_id
response = requests.post(
f"{API_URL}/data/data_product",
headers=get_headers(),
json=request_body
)
if response.status_code == 200:
product_id = response.json()["entity"]["identifier"]
print(f"Data product '{name}' created with ID: {product_id}")
return product_id
else:
print(f"Error creating data product: {response.text}")
return None
def assign_product_to_mesh(product_id, mesh_id):
"""Assign data product to a mesh"""
response = requests.patch(
f"{API_URL}/data/data_product?identifier={product_id}",
headers=get_headers(),
json={"mesh_identifier": mesh_id}
)
return response.status_code == 200
def set_product_schema(product_id, fields):
"""Set schema for data product"""
schema = {
"details": {
"product_type": "stored",
"fields": fields
}
}
response = requests.put(
f"{API_URL}/data/data_product/schema?identifier={product_id}",
headers=get_headers(),
json=schema
)
if response.status_code == 200:
print(f"Schema set for product {product_id}")
return True
else:
print(f"Error setting schema: {response.text}")
return False
def create_complete_data_product(name, description, mesh_id, schema_fields):
"""Create data product with schema in one go"""
# Create product
product_id = create_data_product(name, description, mesh_id)
if not product_id:
return None
# Set schema
if not set_product_schema(product_id, schema_fields):
print("Failed to set schema")
return None
print(f"Data product setup completed: {product_id}")
return product_idSchema Field Structure
def create_field(name, column_type, primary=False, optional=False, classification=1, sensitivity=None):
"""Helper function to create schema fields"""
field = {
"name": name,
"primary": primary,
"optional": optional,
"data_type": {
"meta": {},
"column_type": column_type
},
"classification": classification
}
if sensitivity is not None:
field["sensitivity"] = sensitivity
return field
# Example schema fields
customer_schema = [
create_field("customer_id", "VARCHAR", primary=True, classification=3, sensitivity=1),
create_field("email", "VARCHAR", classification=3, sensitivity=2),
create_field("total_orders", "INTEGER", classification=1),
create_field("last_purchase_date", "DATE", classification=1),
create_field("lifetime_value", "DECIMAL", classification=2)
]Supported Column Types
VARCHAR- Variable character stringsINTEGER- 32-bit integersBIGINT- 64-bit integersDECIMAL- Decimal numbersDOUBLE- Double precision floating pointBOOLEAN- True/false valuesDATE- Date valuesTIMESTAMP- Timestamp valuesTIMESTAMPTZ- Timestamp with timezoneJSON- JSON dataARRAY- Array dataUUID- UUID values
Classification and Sensitivity Levels
Classification: Categorizes the data product according to the data classification policies set at Foundation-level (for example: Confidential, Public, Secret). Only one value is allowed.
Sensitivity: Set of tags defined at Foundation-level that classifies the data in the data product acording to different sensitivities or frameworks (for example: PII, Biometric,...). More than one value can be selected.
Management Operations
def list_data_products():
"""List all data products"""
response = requests.get(f"{API_URL}/data/data_product/list", headers=get_headers())
return response.json() if response.status_code == 200 else None
def get_queryable_products():
"""Get products available for querying"""
response = requests.get(f"{API_URL}/data/data_product/list/query", headers=get_headers())
return response.json() if response.status_code == 200 else None
def get_data_product(product_id):
"""Get specific data product details"""
response = requests.get(
f"{API_URL}/data/data_product?identifier={product_id}",
headers=get_headers()
)
return response.json() if response.status_code == 200 else None
def get_product_metadata(product_id):
"""Get data product metadata"""
response = requests.get(
f"{API_URL}/data/data_product/metadata?identifier={product_id}",
headers=get_headers()
)
return response.json() if response.status_code == 200 else None
def update_product_metadata(product_id, metadata):
"""Update data product metadata"""
response = requests.put(
f"{API_URL}/data/data_product/metadata?identifier={product_id}",
headers=get_headers(),
json=metadata
)
return response.status_code == 200Example Usage
# Define schema for customer analytics product
customer_fields = [
create_field("customer_id", "VARCHAR", primary=True, classification=3, sensitivity=1),
create_field("segment", "VARCHAR", classification=2),
create_field("total_spent", "DECIMAL", classification=2),
create_field("order_count", "INTEGER", classification=1),
create_field("avg_order_value", "DECIMAL", classification=2),
create_field("last_activity", "TIMESTAMP", classification=1)
]
# Create complete data product
product_id = create_complete_data_product(
name="Customer Segmentation Analytics",
description="Customer data processed for segmentation and analytics",
mesh_id="customer-analytics-mesh-id",
schema_fields=customer_fields
)
# Check if product was created successfully
if product_id:
product_details = get_data_product(product_id)
print("Product created:", product_details["entity"]["name"])
# List all queryable products
queryable = get_queryable_products()
print(f"Total queryable products: {len(queryable)}")SADP vs CADP Examples
# SADP: Transform raw transaction data
sadp_fields = [
create_field("transaction_id", "VARCHAR", primary=True),
create_field("customer_id", "VARCHAR", classification=3),
create_field("amount", "DECIMAL"),
create_field("transaction_date", "DATE"),
create_field("product_category", "VARCHAR")
]
sadp_id = create_complete_data_product(
name="Clean Transaction Data",
description="SADP: Cleaned and validated transaction data",
mesh_id="sales-mesh-id",
schema_fields=sadp_fields
)
# CADP: Aggregate data from SADP
cadp_fields = [
create_field("customer_id", "VARCHAR", primary=True),
create_field("monthly_spend", "DECIMAL"),
create_field("transaction_count", "INTEGER"),
create_field("favorite_category", "VARCHAR"),
create_field("report_month", "DATE", primary=True)
]
cadp_id = create_complete_data_product(
name="Monthly Customer Summary",
description="CADP: Monthly aggregated customer spending patterns",
mesh_id="analytics-mesh-id",
schema_fields=cadp_fields
)Efficient Data Product Design
Start with the end in mind – understand what consumers need
Reuse existing data products where possible
Design transformations for performance and maintainability
Implement appropriate data quality checks
Last updated