Monitoring the Execution of Transformation Jobs

Defining a Job through the Data Product Builder

On Foundation, each data product contains a data transformation job, a compute job, that is executed via the Foundation Data Product Builder. The data product builder is backed by Apache Spark and provides a set of tools to allow the user create a data product job with just a few lines of code, even with no prior Apache Spark experience.

The Foundation Data Product Builder offers two ways to submit a job. The first is by defining a data product builder JSON object, that includes the definition of the job (input data objects/data products, list of transformations, finalisers), and, second, by using a custom file script in Python.

For any compute job of a data product, a compute identifier UUID is created and can be retrieved by the data product API GET /api/data/data_product . This compute_identifier can be used to monitor the status of the compute job.

Low-code pipeline builder

An example JSON object to use in the Foundation Data Product Builder is the following. In case of invalid JSON object supplied, an error message will appear in the API response.

API: PUT /api/data/data_product/compute/builder

{
  "config": {
    "docker_tag": "0.1.10",
    "executor_core_request": "800m",
    "executor_core_limit": "1500m",
    "executor_instances": 1,
    "min_executor_instances": 1,
    "max_executor_instances": 1,
    "executor_memory": "5120m",
    "driver_core_request": "0.3",
    "driver_core_limit": "800m",
    "driver_memory": "2048m"
  },
  "inputs": {
    "input_26e5351b_dab2_4e3d_979b_6e7936d5143b": {
      "input_type": "data_object",
      "identifier": "26e5351b-dab2-4e3d-979b-6e7936d5143b",
      "preview_limit": 10
    }
  },
  "transformations": [
    {
      "transform": "cast",
      "input": "input_26e5351b_dab2_4e3d_979b_6e7936d5143b",
      "output": "shipments_casted",
      "changes": [
        {
          "column": "shipment_id",
          "data_type": "integer",
          "kwargs": {}
        },
        {
          "column": "route_id",
          "data_type": "integer",
          "kwargs": {}
        },
        {
          "column": "carrier_id",
          "data_type": "integer",
          "kwargs": {}
        },
        {
          "column": "shipment_date",
          "data_type": "timestamp",
          "kwargs": {}
        },
        {
          "column": "expected_delivery_date",
          "data_type": "timestamp",
          "kwargs": {}
        },
        {
          "column": "actual_delivery_date",
          "data_type": "timestamp",
          "kwargs": {}
        },
        {
          "column": "weight",
          "data_type": "double",
          "kwargs": {}
        },
        {
          "column": "volume",
          "data_type": "double",
          "kwargs": {}
        },
        {
          "column": "priority",
          "data_type": "integer",
          "kwargs": {}
        },
        {
          "column": "cost",
          "data_type": "double",
          "kwargs": {}
        }
      ]
    },
    {
      "transform": "select_columns",
      "input": "shipments_casted",
      "output": "shipments_final",
      "columns": [
        "shipment_id",
        "route_id",
        "carrier_id",
        "shipment_date",
        "expected_delivery_date",
        "actual_delivery_date",
        "origin",
        "destination",
        "weight",
        "volume",
        "status",
        "delay_reason",
        "priority",
        "cost"
      ]
    }
  ],
  "finalisers": {
    "input": "shipments_final",
    "enable_quality": true,
    "write_config": {
      "mode": "overwrite"
    },
    "enable_profiling": true,
    "enable_classification": false
  },
  "preview": false
}

Custom script pipeline builder

An example custom script file to use in the Foundation Data Product Builder is the following. The file should include the config section and the filename should have the .spark extension. In case of invalid file format (content), an error message will appear in the API response.

API: PUT /api/data/data_product/compute/file

[config]
  docker_tag=0.0.28
  executor_core_request=800m
  executor_core_limit=1500m
  executor_instances=1
  min_executor_instances=1
  max_executor_instances=1
  executor_memory=8192m
  driver_core_request=0.3
  driver_core_limit=800m
  driver_memory=3072m
---
import logging
import random
import uuid
...

Monitoring the compute jobs

The first step to monitoring a compute job is by retrieving the status of the job (STARTING, PENDING, COMPLETED, FAILED) using the GET /api/data/compute. The error statuses also include an error message to help debug the job. The response object is:

{
  "identifier": "43006eea-6ce0-4285-971a-a840d6ffbd89",
  "urn": "urn:meshx:backend:data:root:compute:43006eea-6ce0-4285-971a-a840d6ffbd89",
  "name": "dataproduct-accidents_f521fw",
  "is_system": false,
  "status": {
    "run_type": "single",
    "container": {
      "reason": "Completed",
      "message": null
    },
    "status": "COMPLETED",
    "attempts": 1,
    "submission_attempts": 1,
    "start": "2025-08-06T01:35:30Z",
    "finish": "2025-08-06T01:35:30Z",
    "error": null
  }
}

Accessing the logs of the job for further debugging is highly advised before performing any change, using the GET /api/data/compute/log API.

Common Issues

  1. OOMKilled which indicates that the job requires more memory on the driver and/or the executor

  2. Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources which indicates that the cluster is not having resources available at this moment. The compute job is not failing but rather waits for resources to be allocated. One common solution would be to kill this current job and submit again with fewer resources if possible.

  3. ImagePullBackOff which indicates that the image version specified in the docker_tag does not exist.

To see the history of all runs for this compute job, use GET /api/data/compute/history .

{
  "jobs": [
    {
      "created": "2025-08-04T12:24:10.244141Z",
      "updated": "2025-08-04T12:24:10.244141Z",
      "name": "dp-efb239421319403aa86183e30ccfc6ff",
      "reason": "reassign",
      "status": "COMPLETED",
      "suffix": "1mhv2"
    },
    {
      "created": "2025-08-04T12:19:51.297152Z",
      "updated": "2025-08-04T12:19:51.297152Z",
      "name": "dp-efb239421319403aa86183e30ccfc6ff",
      "reason": "reassign",
      "status": "FAILED",
      "suffix": "4ov3e"
    }
  ]
}

Last updated