Compute Engine Plugin
The Compute Engine Plugin is the primary driver of automation in the tidylake ecosystem. It bridges the gap between your metadata definitions and your actual storage and compute infrastructure.
While the core library handles the "logic" of your DAG, this plugin handles the "physics" of reading, writing, and maintaining your data lakehouse.
Plugin Configuration
Adapter Implementation
To use this plugin, you must implement the ComputeEnginePlugin abstract class. This adapter serves as the translator between tidylake and your specific stack (e.g., Spark, Snowflake, or Pandas + Iceberg).
Context Configuration
Register your implementation in the context file so the framework can discover it at runtime:
tidylake:
...
plugins: # (1)!
type: object
description: Plugin Configuration
properties:
compute_engine: # (2)!
type: object
description: Configuration for the compute engine plugin
properties:
plugin_path: # (3)!
type: string
description: Path to the python file that defines the plugin. Takes precedence
plugin_module: # (4)!
type: string
description: Path to the python module that contains the plugin
plugin_class_name: # (5)!
type: string
description: Name of the python path inside file or module
# (6)!
- Add a
pluginssection - Create the
compute_engineproperty - Set either
plugin_pathorplugin_module - Set either
plugin_pathorplugin_module - Set your
plugin_class_name - You can define additional custom properties for your custom implementation
See Real-World Implementations
Check the pandas pyceberg demo or the pyspark demo for complete, production-ready adapter code.
Initialization
Your adapter must implement an __init__ function. This is where you set up global state, such as attaching to an existing Spark session, authenticating with cloud providers (AWS/Azure/GCP), or initializing local catalog connections.
class PandasIcebergComputeEnginePlugin(ComputeEnginePlugin):
def __init__(self, compute_engine_config: dict = None):
super().__init__()
warehouse_path = compute_engine_config.get("warehouse_path") # (1)!
self.namespace = compute_engine_config.get("namespace", "default")
os.makedirs(warehouse_path, exist_ok=True) # (2)!
self.catalog = load_catalog(
"default",
**{
"type": "sql",
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
},
)
self.catalog.create_namespace_if_not_exists(self.namespace)
- Load config values from the context file
- Init required sessions or temporary files for the process
Complete Example
This is an excerpt from pandas pyceberg demo.
Data Serialization & Deserialization
The adapter's primary job is to handle how data products are retrieved and persisted. You must implement the read_dataset and write_dataset methods.
- Reading: Logic to load a table from a metastore (like Hive or Glue), query a warehouse (BigQuery, Snowflake) or read a plain file from an object store (S3, ADLS, GCS).
- Writing: Logic to serialize data to a specific format (Parquet, Delta, Iceberg) and update the corresponding catalog table via your framework serialization API or SQL statements.
def read_dataset(self, name: str):
return self.catalog.load_table(f"{self.namespace}.{name}").scan().to_pandas() # (1)!
def write_dataset(self, name: str, df):
table = self.catalog.load_table(f"{self.namespace}.{name}")
df_arrow = pa.Table.from_pandas(df)
table.overwrite(df_arrow)
- Reuse existing sessions from the initialization
Complete Example
This is an excerpt from pandas pyceberg demo.
Other Methods
Check the plugin implementation for other optional methods that can be added to the adapter such as the display_dataset that will improve how the dataset is visualized during interactive mode or with the peek cli command.
Schema Automation
Tidylake promotes a methodology that separates the lifecycle of data from the lifecycle of schemas. We recommend defining your "Data Contract" (the schema) in the manifest first. This enables several "Shift-Left" data engineering practices:
- Schema Enforcement: Prevent pipeline bugs by enforcing strict typing on write.
- Safe Development: Use synthetic data generation in early development or when real-world data access is restricted.
- Evolution Management: Use the CLI to track and apply changes to your catalog tables.
Schema translation
To automate your catalog, the adapter must know how to translate tidylake manifest types (based on JSON Schema) into your engine's native types (e.g., PyArrow or Spark Types).
You must implement:
- get_schema_from_catalog: Retrieve the current native schema as a dictionary.
- manifest_schema_to_engine_schema: Convert a manifest definition into your catalog’s API format.
- engine_schema_to_manifest_schema: Convert a physical catalog schema back into tidylake’s manifest format.
def get_schema_from_catalog(self, name: str):
table = self.catalog.load_table(f"{self.namespace}.{name}") # (1)!
arrow_schema = table.schema()
return self.engine_schema_to_manifest_schema(arrow_schema)
def manifest_schema_to_engine_schema(self, manifest_schema: str):
fields = []
for name, prop in manifest_schema.get("properties", {}).items():
jtype = prop.get("type", "string")
pa_type = JSONSCHEMA_PYARROW_MAPPING.get(jtype, pa.string()) # (2)!
# nullable = True if not required
nullable = name not in manifest_schema.get("required", [])
fields.append(pa.field(name, pa_type, nullable=nullable))
return pa.schema(fields)
def engine_schema_to_manifest_schema(self, catalog_schema: pa.Schema):
return {
"type": "object",
"properties": {
field.name: {"type": PYARROW_JSONSCHEMA_MAPPING.get(str(field.field_type), "string")}
for field in catalog_schema.columns
},
}
- Just uses the native API
- This is just a dict mapping. Check it out in the full example.
Complete Example
This is an excerpt from pandas pyceberg demo.
Schema Manipulation
Once translation is handled, you implement the methods that actually touch the infrastructure: check_catalog_exists, create_table, and the alter table suite (alter_table_add_column, alter_table_drop_column, or alter_table_alter_column).
def check_catalog_exists(self, name: str):
return self.catalog.table_exists(f"{self.namespace}.{name}")
def create_table(self, name: str, manifest_schema: str):
catalog_schema = self.manifest_schema_to_engine_schema(manifest_schema)
self.catalog.create_table(
f"{self.namespace}.{name}",
schema=catalog_Schema,
)
def alter_table_add_column(self, table_name: str, column_name: str, column_type: str):
table = self.catalog.load_table(f"{self.namespace}.{table_name}")
with table.update_schema() as update:
update.add_column(
column_name,
JSONSCHEMA_PYICEBERG_MAPPING.get(column_type, StringType()),
)
def alter_table_drop_column(self, table_name: str, column_name: str):
table = self.catalog.load_table(f"{self.namespace}.{table_name}")
with table.update_schema() as update:
update.delete_column(column_name)
def alter_table_alter_column(self, table_name: str, column_name: str, column_type: str):
table = self.catalog.load_table(f"{self.namespace}.{table_name}")
with table.update_schema() as update:
update.update_column(
column_name,
JSONSCHEMA_PYICEBERG_MAPPING.get(column_type, StringType()),
)
Managing the Schema Lifecycle via CLI
With your adapter in place, you can use the CLI to maintain your data lakehouse without writing manual statements.
Step 1: Detect Discrepancies
Running tidylake schema diff compares your manifest files against the actual physical tables in your catalog.
Run it by yourself by following the example
These commands are being run over the pandas pyiceberg demo.
$ tidylake schema diff
bronze_customers
┏━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━┓
┃ column ┃ defined_schema ┃ catalog_schema ┃ status ┃
┡━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━┩
│ customer__city │ string │ <missing> │ ADD │
│ customer__active │ boolean │ <missing> │ ADD │
│ customer__id │ string │ <missing> │ ADD │
│ customer__name │ string │ <missing> │ ADD │
└──────────────────┴────────────────┴────────────────┴────────┘
silver_customers
┏━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━┓
┃ column ┃ defined_schema ┃ catalog_schema ┃ status ┃
┡━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━┩
│ customer_active │ boolean │ <missing> │ ADD │
│ customer_city │ string │ <missing> │ ADD │
│ customer_name │ string │ <missing> │ ADD │
│ customer_id │ string │ <missing> │ ADD │
└─────────────────┴────────────────┴────────────────┴────────┘
Step 2: Apply Changes
Use the tidylake schema update command to align the catalog with your manifests. We recommend a "Dry Run" first, followed by a --commit once you have verified the changes.
$ tidylake schema update --data-product bronze_customers
⚡️ Updating/Creating schema for data product: bronze_customers
Table bronze_customers does not exist. A new table will be created.
Dry run mode, not creating table.
$ tidylake schema update --data-product bronze_customers --commit
⚡️ Updating/Creating schema for data product: bronze_customers
Table bronze_customers does not exist. A new table will be created.
Now that the table has been created, we can run tidylake schema diff again to sverifyee the changes in our catalog:
$ tidylake schema diff
bronze_customers
┏━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━┓
┃ column ┃ defined_schema ┃ catalog_schema ┃ status ┃
┡━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━┩
│ customer__id │ string │ string │ OK │
│ customer__active │ boolean │ boolean │ OK │
│ customer__city │ string │ string │ OK │
│ customer__name │ string │ string │ OK │
└──────────────────┴────────────────┴────────────────┴────────┘
silver_customers
┏━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━┓
┃ column ┃ defined_schema ┃ catalog_schema ┃ status ┃
┡━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━┩
│ customer_active │ boolean │ <missing> │ ADD │
│ customer_city │ string │ <missing> │ ADD │
│ customer_name │ string │ <missing> │ ADD │
│ customer_id │ string │ <missing> │ ADD │
└─────────────────┴────────────────┴────────────────┴────────┘
If you later modify a manifest file, running tidylake schema diff will detect the drift; running tidylake schema update --commit will apply the necessary commands automatically.
$ tidylake schema diff --data-product bronze_customers
bronze_customers
┏━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━┓
┃ column ┃ defined_schema ┃ catalog_schema ┃ status ┃
┡━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━┩
│ customer__active │ boolean │ boolean │ OK │
│ customer__city │ <missing> │ string │ DROP │
│ customer__email │ string │ <missing> │ ADD │
│ customer__id │ string │ string │ OK │
│ customer__name │ string │ string │ OK │
└──────────────────┴────────────────┴────────────────┴────────┘
$ tidylake schema update --data-product bronze_customers --commit
⚡️ Updating/Creating schema for data product: bronze_customers
Schema changes for table bronze_customers:
ADD customer__email string
DROP customer__city None
$ tidylake schema diff --data-product bronze_customers
bronze_customers
┏━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━┓
┃ column ┃ defined_schema ┃ catalog_schema ┃ status ┃
┡━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━┩
│ customer__email │ string │ string │ OK │
│ customer__active │ boolean │ boolean │ OK │
│ customer__id │ string │ string │ OK │
│ customer__name │ string │ string │ OK │
└──────────────────┴────────────────┴────────────────┴────────┘
Now you are ready to run the script files and start working with your data.
Working with synthetic data
WIP