Adding and Managing Data Objects
Data Objects
Data objects store raw data ingested into Foundation without any transformation. They function as SQL tables containing initial data exactly as it comes from source systems before transformation is applied.
When to Use Data Objects
Raw data preservation: Store original data for audit, compliance, or historical purposes
Multiple product feeds: Make raw data available to multiple data products
Decoupled processing: Separate data ingestion from transformation processes
Source integrity: Maintain data exactly as received from external systems
Creating a Data Object
Step 1: Create Data Object
Endpoint: POST /api/data/data_object
{
"entity": {
"name": "Customer Transactions",
"entity_type": "data_object",
"label": "CTX",
"description": "Raw customer transaction data from payment system"
},
"entity_info": {
"owner": "[email protected]",
"contact_ids": ["Data Object contact"],
"links": ["example.com"]
}
}Step 2: Link to Data Source
Endpoint: POST /api/data/link/data_source/data_object
Parameters:
identifier: Data source identifierchild_identifier: Data object identifier
Step 3: Configure Data Object
Endpoint: PUT /api/data/data_object/config?identifier={data_object_id}
{
"configuration": {
"data_object_type": "csv",
"path": "/transactions/daily_transactions.csv",
"has_header": true,
"delimiter": ",",
"quote_char": null,
"escape_char": null,
"multi_line": null
}
}Python Functions
def create_data_object(name, description, owner_email="[email protected]"):
"""Create a new data object"""
response = requests.post(
f"{API_URL}/data/data_object",
headers=get_headers(),
json={
"entity": {
"name": name,
"entity_type": "data_object",
"label": name[:3].upper(),
"description": description
},
"entity_info": {
"owner": owner_email,
"contact_ids": [f"{name} contact"],
"links": ["example.com"]
}
}
)
if response.status_code == 200:
data_object_id = response.json()["entity"]["identifier"]
print(f"Data object '{name}' created with ID: {data_object_id}")
return data_object_id
else:
print(f"Error creating data object: {response.text}")
return None
def link_data_object_to_source(data_source_id, data_object_id):
"""Link data object to data source"""
response = requests.post(
f"{API_URL}/data/link/data_source/data_object",
headers=get_headers(),
params={
"identifier": data_source_id,
"child_identifier": data_object_id
}
)
return response.status_code == 200
def configure_csv_data_object(data_object_id, path, has_header=True, delimiter=","):
"""Configure CSV data object"""
response = requests.put(
f"{API_URL}/data/data_object/config?identifier={data_object_id}",
headers=get_headers(),
json={
"configuration": {
"data_object_type": "csv",
"path": path,
"has_header": has_header,
"delimiter": delimiter,
"quote_char": None,
"escape_char": None,
"multi_line": None
}
}
)
return response.status_code == 200
def create_complete_data_object(name, description, data_source_id, file_path):
"""Create and configure a complete data object"""
# Create data object
data_object_id = create_data_object(name, description)
if not data_object_id:
return None
# Link to data source
if not link_data_object_to_source(data_source_id, data_object_id):
print("Failed to link data object to source")
return None
# Configure for CSV
if not configure_csv_data_object(data_object_id, file_path):
print("Failed to configure data object")
return None
print(f"Data object setup completed: {data_object_id}")
return data_object_idConfiguration Options
Supported Resource Types
"csv"- Comma-separated values"json"- JSON format"parquet"- Parquet format"avro"- Avro format"jdbc"- Database tables
CSV Configuration Fields
path: File location in the data sourcehas_header: First row contains column namesdelimiter: Value separator (,,;,|)quote_char: Character for quoting valuesescape_char: Character for escaping special charactersmulti_line: Records can span multiple lines
Monitoring Data Object Status
def check_data_object_status(data_object_id):
"""Check data object ingestion status"""
# Get data object details
response = requests.get(
f"{API_URL}/data/data_object?identifier={data_object_id}",
headers=get_headers()
)
if response.status_code != 200:
return None
data = response.json()
compute_id = data.get("compute_identifier")
if not compute_id:
return "No compute job found"
# Check compute status
compute_response = requests.get(
f"{API_URL}/data/compute?identifier={compute_id}",
headers=get_headers()
)
if compute_response.status_code == 200:
status = compute_response.json()["status"]["status"]
return status
return "Status unavailable"
def get_data_object_logs(data_object_id):
"""Get detailed logs for troubleshooting"""
# Get compute identifier first
response = requests.get(
f"{API_URL}/data/data_object?identifier={data_object_id}",
headers=get_headers()
)
if response.status_code != 200:
return None
compute_id = response.json().get("compute_identifier")
if not compute_id:
return "No compute job found"
# Get logs
logs_response = requests.get(
f"{API_URL}/data/compute/log?identifier={compute_id}",
headers=get_headers()
)
if logs_response.status_code == 200:
return logs_response.json()
return NoneStatus Values
COMPLETED: Ingestion finished successfullyFAILED: Ingestion encountered an errorRUNNING: Ingestion currently executingSTARTING_UP: Ingestion is startingSCHEDULED: Ingestion scheduled for executionUNSCHEDULED: Job could not be scheduled
Example Usage
# Complete workflow example
data_object_id = create_complete_data_object(
name="Daily Sales Data",
description="Raw daily sales transactions from payment gateway",
data_source_id="your-s3-data-source-id",
file_path="/sales/daily_sales.csv"
)
# Monitor progress
if data_object_id:
status = check_data_object_status(data_object_id)
print(f"Ingestion status: {status}")
if status == "FAILED":
logs = get_data_object_logs(data_object_id)
print("Error logs:", logs)Last updated