Using the API to implement Data Transformations
Transformations
Transformations convert raw data into usable data products by defining processing steps needed to transform input data (from data objects or other data products) into the structure defined by the data product's schema.
Purpose
Data cleaning: Remove inconsistencies and errors from raw data
Data enrichment: Add calculated fields and business logic
Data integration: Combine data from multiple sources
Schema compliance: Ensure output matches data product schema exactly
Important: The output of your transformations must match the schema defined for the data product exactly.
Creating Transformations
Transformation Builder Structure
Endpoint: PUT /api/data/data_product/compute/builder?identifier={product_id}
{
"config": {
"docker_tag": "0.0.23",
"executor_core_request": "800m",
"executor_core_limit": "1500m",
"executor_instances": 1,
"executor_memory": "5120m",
"driver_core_request": "0.3",
"driver_core_limit": "800m",
"driver_memory": "2048m"
},
"inputs": {
"input_data_object_id": {
"input_type": "data_object",
"identifier": "data-object-id-here",
"preview_limit": 10
}
},
"transformations": [
{
"transform": "cast",
"input": "input_data_object_id",
"output": "casted_data",
"changes": [
{
"column": "customer_id",
"data_type": "integer",
"kwargs": {}
}
]
}
],
"finalisers": {
"input": "casted_data",
"enable_quality": true,
"write_config": {"mode": "overwrite"},
"enable_profiling": true,
"enable_classification": false
},
"preview": false
}Python Functions
def create_transformation_builder(product_id, data_object_id, transformations):
"""Create transformation builder for data product"""
input_key = f"input_{data_object_id.replace('-', '_')}"
builder = {
"config": {
"docker_tag": "0.0.23",
"executor_core_request": "800m",
"executor_core_limit": "1500m",
"executor_instances": 1,
"min_executor_instances": 1,
"max_executor_instances": 1,
"executor_memory": "5120m",
"driver_core_request": "0.3",
"driver_core_limit": "800m",
"driver_memory": "2048m"
},
"inputs": {
input_key: {
"input_type": "data_object",
"identifier": data_object_id,
"preview_limit": 10
}
},
"transformations": transformations,
"finalisers": {
"input": transformations[-1]["output"] if transformations else input_key,
"enable_quality": True,
"write_config": {"mode": "overwrite"},
"enable_profiling": True,
"enable_classification": False
},
"preview": False
}
response = requests.put(
f"{API_URL}/data/data_product/compute/builder?identifier={product_id}",
headers=get_headers(),
json=builder
)
if response.status_code == 200:
print(f"Transformation builder created for product {product_id}")
return True
else:
print(f"Error creating transformation: {response.text}")
return False
def preview_transformation(product_id, data_object_id, transformations):
"""Preview transformation with limited data"""
input_key = f"input_{data_object_id.replace('-', '_')}"
builder = {
"config": {
"docker_tag": "0.0.23",
"executor_instances": 1,
"executor_memory": "2048m",
"driver_memory": "1024m"
},
"inputs": {
input_key: {
"input_type": "data_object",
"identifier": data_object_id,
"preview_limit": 10
}
},
"transformations": transformations,
"finalisers": {
"input": transformations[-1]["output"] if transformations else input_key,
"enable_quality": False,
"write_config": {"mode": "overwrite"},
"enable_profiling": False,
"enable_classification": False
},
"preview": True
}
response = requests.put(
f"{API_URL}/data/data_product/compute/builder?identifier={product_id}",
headers=get_headers(),
json=builder
)
return response.status_code == 200Common Transformations
Cast - Convert Data Types
def cast_transform(input_name, output_name, column_changes):
"""Cast columns to different data types"""
return {
"transform": "cast",
"input": input_name,
"output": output_name,
"changes": [
{
"column": col,
"data_type": dtype,
"kwargs": {}
} for col, dtype in column_changes.items()
]
}
# Example usage
cast_step = cast_transform("raw_data", "typed_data", {
"customer_id": "integer",
"purchase_date": "timestamp",
"amount": "decimal"
})Select Columns - Choose Specific Columns
def select_columns_transform(input_name, output_name, columns):
"""Select specific columns"""
return {
"transform": "select_columns",
"input": input_name,
"output": output_name,
"columns": columns
}
# Example usage
select_step = select_columns_transform("typed_data", "selected_data",
["customer_id", "amount", "purchase_date"])Filter - Apply Conditions
def filter_transform(input_name, output_name, condition):
"""Filter rows based on condition"""
return {
"transform": "filter_with_condition",
"input": input_name,
"output": output_name,
"condition": condition
}
# Example usage
filter_step = filter_transform("selected_data", "filtered_data", "amount > 0")Group By - Aggregate Data
def group_by_transform(input_name, output_name, group_columns, aggregations):
"""Group by columns and apply aggregations"""
return {
"transform": "group_by",
"input": input_name,
"output": output_name,
"group_columns": group_columns,
"aggregations": aggregations
}
# Example usage
group_step = group_by_transform("filtered_data", "aggregated_data",
["customer_id"], {
"total_amount": {
"agg_func": "sum",
"col_name": "amount"
},
"purchase_count": {
"agg_func": "count",
"col_name": "customer_id"
}
})Join - Combine Data Sources
def join_transform(input_name, output_name, other_input, join_type, conditions, select_columns=None):
"""Join two data sources"""
transform = {
"transform": "join_rename_select",
"input": input_name,
"output": output_name,
"other": other_input,
"join": join_type,
"conditions": conditions,
"select_all_columns": select_columns is None
}
if select_columns:
transform["select_columns"] = select_columns
transform["select_all_columns"] = False
return transform
# Example usage
join_step = join_transform("customer_data", "enriched_data", "product_data", "inner",
[{"left": "product_id", "operator": "eq", "right": "id"}])Expression - Apply SQL-like Expressions
def expression_transform(input_name, output_name, expressions):
"""Apply SQL expressions to create/transform columns"""
return {
"transform": "select_expression",
"input": input_name,
"output": output_name,
"expressions": expressions
}
# Example usage
expr_step = expression_transform("base_data", "calculated_data", [
"*", # Include all existing columns
"amount * 1.1 as amount_with_tax",
"CASE WHEN amount > 1000 THEN 'Premium' ELSE 'Standard' END as customer_tier"
])Complete Example
def create_customer_analytics_transformation(product_id, customer_data_object_id):
"""Create complete customer analytics transformation"""
input_key = f"input_{customer_data_object_id.replace('-', '_')}"
# Define transformation pipeline
transformations = [
# 1. Cast data types
cast_transform(input_key, "typed_data", {
"customer_id": "integer",
"purchase_amount": "decimal",
"purchase_date": "timestamp"
}),
# 2. Filter valid transactions
filter_transform("typed_data", "valid_data", "purchase_amount > 0"),
# 3. Add calculated fields
expression_transform("valid_data", "enriched_data", [
"*",
"purchase_amount * 1.08 as amount_with_tax",
"YEAR(purchase_date) as purchase_year",
"MONTH(purchase_date) as purchase_month"
]),
# 4. Group by customer and calculate metrics
group_by_transform("enriched_data", "customer_summary", ["customer_id"], {
"total_spent": {
"agg_func": "sum",
"col_name": "amount_with_tax"
},
"total_transactions": {
"agg_func": "count",
"col_name": "customer_id"
},
"avg_transaction": {
"agg_func": "avg",
"col_name": "amount_with_tax"
}
}),
# 5. Select final columns
select_columns_transform("customer_summary", "final_output", [
"customer_id", "total_spent", "total_transactions", "avg_transaction"
])
]
return create_transformation_builder(product_id, customer_data_object_id, transformations)
# Usage
success = create_customer_analytics_transformation(
product_id="your-product-id",
customer_data_object_id="your-data-object-id"
)Monitoring and Validation
def get_transformation_status(product_id):
"""Get transformation execution status"""
# Get product details to find compute_identifier
response = requests.get(
f"{API_URL}/data/data_product?identifier={product_id}",
headers=get_headers()
)
if response.status_code != 200:
return None
compute_id = response.json().get("compute_identifier")
if not compute_id:
return "No transformation job found"
# Check compute status
compute_response = requests.get(
f"{API_URL}/data/compute?identifier={compute_id}",
headers=get_headers()
)
if compute_response.status_code == 200:
return compute_response.json()["status"]["status"]
return "Status unavailable"
def get_transformation_logs(product_id):
"""Get detailed transformation logs"""
response = requests.get(
f"{API_URL}/data/data_product?identifier={product_id}",
headers=get_headers()
)
if response.status_code != 200:
return None
compute_id = response.json().get("compute_identifier")
if not compute_id:
return "No transformation job found"
logs_response = requests.get(
f"{API_URL}/data/compute/log?identifier={compute_id}",
headers=get_headers()
)
return logs_response.json() if logs_response.status_code == 200 else None
def get_quality_validations(product_id):
"""Get data quality validation results"""
response = requests.get(
f"{API_URL}/data/data_product/quality/validations?identifier={product_id}",
headers=get_headers()
)
if response.status_code == 200:
validations = response.json()
print(f"Success rate: {validations.get('success_percentage', 0)}%")
print(f"Successful expectations: {validations.get('successful_expectation_count', 0)}")
print(f"Failed expectations: {validations.get('failed_expectation_count', 0)}")
return validations
return None
def get_builder_schemas(product_id):
"""Get schema for each transformation step"""
response = requests.get(
f"{API_URL}/data/data_product/compute/builder/state?identifier={product_id}",
headers=get_headers()
)
return response.json() if response.status_code == 200 else NoneStatus Values
COMPLETED: Transformation finished successfullyFAILED: Transformation encountered an errorRUNNING: Transformation currently executingSTARTING_UP: Transformation is startingSCHEDULED: Transformation scheduled for executionUNSCHEDULED: Job could not be scheduled
Troubleshooting
Failed Transformations
Check compute logs: Use
get_transformation_logs()for detailed error informationVerify schema match: Ensure transformation output matches product schema exactly
Check quality validations: Use
get_quality_validations()for schema mismatch details
Schema Validation Issues
If transformation status is
FAILEDbut no error in logs, check quality validationsSchema mismatches are the most common cause of failed transformations
Use
get_builder_schemas()to see schema evolution through transformation steps
Best Practices
Preview first: Always test with
preview=Truebefore running full transformationsSchema alignment: Ensure final output exactly matches data product schema
Error handling: Check both compute logs and quality validations for failures
Resource sizing: Adjust executor instances and memory based on data volume
Step validation: Use builder state to verify schema at each transformation step
Keep transformations simple and modular
Prefer built-in transformation types over custom SQL when possible
For large datasets, filter early in the pipeline to reduce data volume
Last updated