Using the API to implement Data Transformations

Transformations

Transformations convert raw data into usable data products by defining processing steps needed to transform input data (from data objects or other data products) into the structure defined by the data product's schema.

Purpose

  • Data cleaning: Remove inconsistencies and errors from raw data

  • Data enrichment: Add calculated fields and business logic

  • Data integration: Combine data from multiple sources

  • Schema compliance: Ensure output matches data product schema exactly

Important: The output of your transformations must match the schema defined for the data product exactly.

Creating Transformations

Transformation Builder Structure

Endpoint: PUT /api/data/data_product/compute/builder?identifier={product_id}

{
  "config": {
    "docker_tag": "0.0.23",
    "executor_core_request": "800m",
    "executor_core_limit": "1500m",
    "executor_instances": 1,
    "executor_memory": "5120m",
    "driver_core_request": "0.3",
    "driver_core_limit": "800m",
    "driver_memory": "2048m"
  },
  "inputs": {
    "input_data_object_id": {
      "input_type": "data_object",
      "identifier": "data-object-id-here",
      "preview_limit": 10
    }
  },
  "transformations": [
    {
      "transform": "cast",
      "input": "input_data_object_id",
      "output": "casted_data",
      "changes": [
        {
          "column": "customer_id",
          "data_type": "integer",
          "kwargs": {}
        }
      ]
    }
  ],
  "finalisers": {
    "input": "casted_data",
    "enable_quality": true,
    "write_config": {"mode": "overwrite"},
    "enable_profiling": true,
    "enable_classification": false
  },
  "preview": false
}

Python Functions

def create_transformation_builder(product_id, data_object_id, transformations):
    """Create transformation builder for data product"""
    input_key = f"input_{data_object_id.replace('-', '_')}"
    
    builder = {
        "config": {
            "docker_tag": "0.0.23",
            "executor_core_request": "800m",
            "executor_core_limit": "1500m",
            "executor_instances": 1,
            "min_executor_instances": 1,
            "max_executor_instances": 1,
            "executor_memory": "5120m",
            "driver_core_request": "0.3",
            "driver_core_limit": "800m",
            "driver_memory": "2048m"
        },
        "inputs": {
            input_key: {
                "input_type": "data_object",
                "identifier": data_object_id,
                "preview_limit": 10
            }
        },
        "transformations": transformations,
        "finalisers": {
            "input": transformations[-1]["output"] if transformations else input_key,
            "enable_quality": True,
            "write_config": {"mode": "overwrite"},
            "enable_profiling": True,
            "enable_classification": False
        },
        "preview": False
    }
    
    response = requests.put(
        f"{API_URL}/data/data_product/compute/builder?identifier={product_id}",
        headers=get_headers(),
        json=builder
    )
    
    if response.status_code == 200:
        print(f"Transformation builder created for product {product_id}")
        return True
    else:
        print(f"Error creating transformation: {response.text}")
        return False

def preview_transformation(product_id, data_object_id, transformations):
    """Preview transformation with limited data"""
    input_key = f"input_{data_object_id.replace('-', '_')}"
    
    builder = {
        "config": {
            "docker_tag": "0.0.23",
            "executor_instances": 1,
            "executor_memory": "2048m",
            "driver_memory": "1024m"
        },
        "inputs": {
            input_key: {
                "input_type": "data_object",
                "identifier": data_object_id,
                "preview_limit": 10
            }
        },
        "transformations": transformations,
        "finalisers": {
            "input": transformations[-1]["output"] if transformations else input_key,
            "enable_quality": False,
            "write_config": {"mode": "overwrite"},
            "enable_profiling": False,
            "enable_classification": False
        },
        "preview": True
    }
    
    response = requests.put(
        f"{API_URL}/data/data_product/compute/builder?identifier={product_id}",
        headers=get_headers(),
        json=builder
    )
    
    return response.status_code == 200

Common Transformations

Cast - Convert Data Types

def cast_transform(input_name, output_name, column_changes):
    """Cast columns to different data types"""
    return {
        "transform": "cast",
        "input": input_name,
        "output": output_name,
        "changes": [
            {
                "column": col,
                "data_type": dtype,
                "kwargs": {}
            } for col, dtype in column_changes.items()
        ]
    }

# Example usage
cast_step = cast_transform("raw_data", "typed_data", {
    "customer_id": "integer",
    "purchase_date": "timestamp",
    "amount": "decimal"
})

Select Columns - Choose Specific Columns

def select_columns_transform(input_name, output_name, columns):
    """Select specific columns"""
    return {
        "transform": "select_columns",
        "input": input_name,
        "output": output_name,
        "columns": columns
    }

# Example usage
select_step = select_columns_transform("typed_data", "selected_data", 
                                     ["customer_id", "amount", "purchase_date"])

Filter - Apply Conditions

def filter_transform(input_name, output_name, condition):
    """Filter rows based on condition"""
    return {
        "transform": "filter_with_condition",
        "input": input_name,
        "output": output_name,
        "condition": condition
    }

# Example usage
filter_step = filter_transform("selected_data", "filtered_data", "amount > 0")

Group By - Aggregate Data

def group_by_transform(input_name, output_name, group_columns, aggregations):
    """Group by columns and apply aggregations"""
    return {
        "transform": "group_by",
        "input": input_name,
        "output": output_name,
        "group_columns": group_columns,
        "aggregations": aggregations
    }

# Example usage
group_step = group_by_transform("filtered_data", "aggregated_data", 
                               ["customer_id"], {
                                   "total_amount": {
                                       "agg_func": "sum",
                                       "col_name": "amount"
                                   },
                                   "purchase_count": {
                                       "agg_func": "count",
                                       "col_name": "customer_id"
                                   }
                               })

Join - Combine Data Sources

def join_transform(input_name, output_name, other_input, join_type, conditions, select_columns=None):
    """Join two data sources"""
    transform = {
        "transform": "join_rename_select",
        "input": input_name,
        "output": output_name,
        "other": other_input,
        "join": join_type,
        "conditions": conditions,
        "select_all_columns": select_columns is None
    }
    
    if select_columns:
        transform["select_columns"] = select_columns
        transform["select_all_columns"] = False
    
    return transform

# Example usage
join_step = join_transform("customer_data", "enriched_data", "product_data", "inner",
                          [{"left": "product_id", "operator": "eq", "right": "id"}])

Expression - Apply SQL-like Expressions

def expression_transform(input_name, output_name, expressions):
    """Apply SQL expressions to create/transform columns"""
    return {
        "transform": "select_expression",
        "input": input_name,
        "output": output_name,
        "expressions": expressions
    }

# Example usage
expr_step = expression_transform("base_data", "calculated_data", [
    "*",  # Include all existing columns
    "amount * 1.1 as amount_with_tax",
    "CASE WHEN amount > 1000 THEN 'Premium' ELSE 'Standard' END as customer_tier"
])

Complete Example

def create_customer_analytics_transformation(product_id, customer_data_object_id):
    """Create complete customer analytics transformation"""
    input_key = f"input_{customer_data_object_id.replace('-', '_')}"
    
    # Define transformation pipeline
    transformations = [
        # 1. Cast data types
        cast_transform(input_key, "typed_data", {
            "customer_id": "integer",
            "purchase_amount": "decimal",
            "purchase_date": "timestamp"
        }),
        
        # 2. Filter valid transactions
        filter_transform("typed_data", "valid_data", "purchase_amount > 0"),
        
        # 3. Add calculated fields
        expression_transform("valid_data", "enriched_data", [
            "*",
            "purchase_amount * 1.08 as amount_with_tax",
            "YEAR(purchase_date) as purchase_year",
            "MONTH(purchase_date) as purchase_month"
        ]),
        
        # 4. Group by customer and calculate metrics
        group_by_transform("enriched_data", "customer_summary", ["customer_id"], {
            "total_spent": {
                "agg_func": "sum",
                "col_name": "amount_with_tax"
            },
            "total_transactions": {
                "agg_func": "count",
                "col_name": "customer_id"
            },
            "avg_transaction": {
                "agg_func": "avg",
                "col_name": "amount_with_tax"
            }
        }),
        
        # 5. Select final columns
        select_columns_transform("customer_summary", "final_output", [
            "customer_id", "total_spent", "total_transactions", "avg_transaction"
        ])
    ]
    
    return create_transformation_builder(product_id, customer_data_object_id, transformations)

# Usage
success = create_customer_analytics_transformation(
    product_id="your-product-id",
    customer_data_object_id="your-data-object-id"
)

Monitoring and Validation

def get_transformation_status(product_id):
    """Get transformation execution status"""
    # Get product details to find compute_identifier
    response = requests.get(
        f"{API_URL}/data/data_product?identifier={product_id}",
        headers=get_headers()
    )
    
    if response.status_code != 200:
        return None
    
    compute_id = response.json().get("compute_identifier")
    if not compute_id:
        return "No transformation job found"
    
    # Check compute status
    compute_response = requests.get(
        f"{API_URL}/data/compute?identifier={compute_id}",
        headers=get_headers()
    )
    
    if compute_response.status_code == 200:
        return compute_response.json()["status"]["status"]
    
    return "Status unavailable"

def get_transformation_logs(product_id):
    """Get detailed transformation logs"""
    response = requests.get(
        f"{API_URL}/data/data_product?identifier={product_id}",
        headers=get_headers()
    )
    
    if response.status_code != 200:
        return None
    
    compute_id = response.json().get("compute_identifier")
    if not compute_id:
        return "No transformation job found"
    
    logs_response = requests.get(
        f"{API_URL}/data/compute/log?identifier={compute_id}",
        headers=get_headers()
    )
    
    return logs_response.json() if logs_response.status_code == 200 else None

def get_quality_validations(product_id):
    """Get data quality validation results"""
    response = requests.get(
        f"{API_URL}/data/data_product/quality/validations?identifier={product_id}",
        headers=get_headers()
    )
    
    if response.status_code == 200:
        validations = response.json()
        print(f"Success rate: {validations.get('success_percentage', 0)}%")
        print(f"Successful expectations: {validations.get('successful_expectation_count', 0)}")
        print(f"Failed expectations: {validations.get('failed_expectation_count', 0)}")
        return validations
    
    return None

def get_builder_schemas(product_id):
    """Get schema for each transformation step"""
    response = requests.get(
        f"{API_URL}/data/data_product/compute/builder/state?identifier={product_id}",
        headers=get_headers()
    )
    
    return response.json() if response.status_code == 200 else None

Status Values

  • COMPLETED: Transformation finished successfully

  • FAILED: Transformation encountered an error

  • RUNNING: Transformation currently executing

  • STARTING_UP: Transformation is starting

  • SCHEDULED: Transformation scheduled for execution

  • UNSCHEDULED: Job could not be scheduled

Troubleshooting

Failed Transformations

  1. Check compute logs: Use get_transformation_logs() for detailed error information

  2. Verify schema match: Ensure transformation output matches product schema exactly

  3. Check quality validations: Use get_quality_validations() for schema mismatch details

Schema Validation Issues

  • If transformation status is FAILED but no error in logs, check quality validations

  • Schema mismatches are the most common cause of failed transformations

  • Use get_builder_schemas() to see schema evolution through transformation steps

Best Practices

  • Preview first: Always test with preview=True before running full transformations

  • Schema alignment: Ensure final output exactly matches data product schema

  • Error handling: Check both compute logs and quality validations for failures

  • Resource sizing: Adjust executor instances and memory based on data volume

  • Step validation: Use builder state to verify schema at each transformation step

  • Keep transformations simple and modular

  • Prefer built-in transformation types over custom SQL when possible

  • For large datasets, filter early in the pipeline to reduce data volume

Last updated