Adding and Managing Data Objects

Data Objects

Data objects store raw data ingested into Foundation without any transformation. They function as SQL tables containing initial data exactly as it comes from source systems before transformation is applied.

When to Use Data Objects

  • Raw data preservation: Store original data for audit, compliance, or historical purposes

  • Multiple product feeds: Make raw data available to multiple data products

  • Decoupled processing: Separate data ingestion from transformation processes

  • Source integrity: Maintain data exactly as received from external systems

Creating a Data Object

Step 1: Create Data Object

Endpoint: POST /api/data/data_object

{
  "entity": {
    "name": "Customer Transactions",
    "entity_type": "data_object",
    "label": "CTX",
    "description": "Raw customer transaction data from payment system"
  },
  "entity_info": {
    "owner": "[email protected]",
    "contact_ids": ["Data Object contact"],
    "links": ["example.com"]
  }
}

Endpoint: POST /api/data/link/data_source/data_object

Parameters:

  • identifier: Data source identifier

  • child_identifier: Data object identifier

Step 3: Configure Data Object

Endpoint: PUT /api/data/data_object/config?identifier={data_object_id}

{
  "configuration": {
    "data_object_type": "csv",
    "path": "/transactions/daily_transactions.csv",
    "has_header": true,
    "delimiter": ",",
    "quote_char": null,
    "escape_char": null,
    "multi_line": null
  }
}

Python Functions

def create_data_object(name, description, owner_email="[email protected]"):
    """Create a new data object"""
    response = requests.post(
        f"{API_URL}/data/data_object",
        headers=get_headers(),
        json={
            "entity": {
                "name": name,
                "entity_type": "data_object",
                "label": name[:3].upper(),
                "description": description
            },
            "entity_info": {
                "owner": owner_email,
                "contact_ids": [f"{name} contact"],
                "links": ["example.com"]
            }
        }
    )
    
    if response.status_code == 200:
        data_object_id = response.json()["entity"]["identifier"]
        print(f"Data object '{name}' created with ID: {data_object_id}")
        return data_object_id
    else:
        print(f"Error creating data object: {response.text}")
        return None

def link_data_object_to_source(data_source_id, data_object_id):
    """Link data object to data source"""
    response = requests.post(
        f"{API_URL}/data/link/data_source/data_object",
        headers=get_headers(),
        params={
            "identifier": data_source_id,
            "child_identifier": data_object_id
        }
    )
    
    return response.status_code == 200

def configure_csv_data_object(data_object_id, path, has_header=True, delimiter=","):
    """Configure CSV data object"""
    response = requests.put(
        f"{API_URL}/data/data_object/config?identifier={data_object_id}",
        headers=get_headers(),
        json={
            "configuration": {
                "data_object_type": "csv",
                "path": path,
                "has_header": has_header,
                "delimiter": delimiter,
                "quote_char": None,
                "escape_char": None,
                "multi_line": None
            }
        }
    )
    
    return response.status_code == 200

def create_complete_data_object(name, description, data_source_id, file_path):
    """Create and configure a complete data object"""
    # Create data object
    data_object_id = create_data_object(name, description)
    if not data_object_id:
        return None
    
    # Link to data source
    if not link_data_object_to_source(data_source_id, data_object_id):
        print("Failed to link data object to source")
        return None
    
    # Configure for CSV
    if not configure_csv_data_object(data_object_id, file_path):
        print("Failed to configure data object")
        return None
    
    print(f"Data object setup completed: {data_object_id}")
    return data_object_id

Configuration Options

Supported Resource Types

  • "csv" - Comma-separated values

  • "json" - JSON format

  • "parquet" - Parquet format

  • "avro" - Avro format

  • "jdbc" - Database tables

CSV Configuration Fields

  • path: File location in the data source

  • has_header: First row contains column names

  • delimiter: Value separator (,, ;, |)

  • quote_char: Character for quoting values

  • escape_char: Character for escaping special characters

  • multi_line: Records can span multiple lines

Monitoring Data Object Status

def check_data_object_status(data_object_id):
    """Check data object ingestion status"""
    # Get data object details
    response = requests.get(
        f"{API_URL}/data/data_object?identifier={data_object_id}",
        headers=get_headers()
    )
    
    if response.status_code != 200:
        return None
    
    data = response.json()
    compute_id = data.get("compute_identifier")
    
    if not compute_id:
        return "No compute job found"
    
    # Check compute status
    compute_response = requests.get(
        f"{API_URL}/data/compute?identifier={compute_id}",
        headers=get_headers()
    )
    
    if compute_response.status_code == 200:
        status = compute_response.json()["status"]["status"]
        return status
    
    return "Status unavailable"

def get_data_object_logs(data_object_id):
    """Get detailed logs for troubleshooting"""
    # Get compute identifier first
    response = requests.get(
        f"{API_URL}/data/data_object?identifier={data_object_id}",
        headers=get_headers()
    )
    
    if response.status_code != 200:
        return None
    
    compute_id = response.json().get("compute_identifier")
    if not compute_id:
        return "No compute job found"
    
    # Get logs
    logs_response = requests.get(
        f"{API_URL}/data/compute/log?identifier={compute_id}",
        headers=get_headers()
    )
    
    if logs_response.status_code == 200:
        return logs_response.json()
    
    return None

Status Values

  • COMPLETED: Ingestion finished successfully

  • FAILED: Ingestion encountered an error

  • RUNNING: Ingestion currently executing

  • STARTING_UP: Ingestion is starting

  • SCHEDULED: Ingestion scheduled for execution

  • UNSCHEDULED: Job could not be scheduled

Example Usage

# Complete workflow example
data_object_id = create_complete_data_object(
    name="Daily Sales Data",
    description="Raw daily sales transactions from payment gateway",
    data_source_id="your-s3-data-source-id",
    file_path="/sales/daily_sales.csv"
)

# Monitor progress
if data_object_id:
    status = check_data_object_status(data_object_id)
    print(f"Ingestion status: {status}")
    
    if status == "FAILED":
        logs = get_data_object_logs(data_object_id)
        print("Error logs:", logs)

Last updated