Skip to content

ComputeEnginePlugin

Bases: ABC

Source code in src/tidylake/plugins/compute_engine.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
class ComputeEnginePlugin(ABC):
    @abstractmethod
    def read_dataset(self, name: str):
        """
        Read dataset from the compute engine catalog.
        The implementation should return a dataframe in the native format of the
        compute engine (e.g. Spark DataFrame, Pandas DataFrame, etc.).

        Args:
            name (str): The name of the data product in the project.

        Returns:
            DataFrame: The dataset as a dataframe in the native format of the compute engine.
        """
        pass

    def display_dataset(self, df):
        """
        Display dataset in the appropriate format based on the execution environment.

        If the execution mode is interactive (e.g. Jupyter notebook), use rich display.
        Otherwise, print the dataframe in a simple format. Method can be overridden
        by compute engine plugins to provide custom display logic.

        Args:
            df: The dataset as a dataframe in the native format of the compute engine.
        """
        if execution_mode == EXECUTION_MODE_INTERACTIVE:
            display(df)  # type: ignore  # noqa: F821
        else:
            # TODO: update to rich table display
            print(df)

    @abstractmethod
    def read_synthetic_dataset(self, manifest_schema: dict):
        """
        Generate synthetic dataset based on the manifest schema and return it as a dataframe
        in the native format of the compute engine.
        The implementation should return a dataframe in the native format of the compute engine
        (e.g. Spark DataFrame, Pandas DataFrame, etc.).

        Args:
            manifest_schema (dict): The schema defined in the manifest file.
        Returns:
            DataFrame: The synthetic dataset as a dataframe in the native format of the compute engine.
        """
        pass

    @abstractmethod
    def write_dataset(self, name: str, df):
        """
        Write dataset to the compute engine catalog.
        The implementation should handle writing the dataframe to the compute
        engine catalog in the appropriate format.

        Args:
            name (str): The name of the data product in the project.
            df: The dataset as a dataframe in the native format of the compute engine.
        """
        pass

    @abstractmethod
    def check_catalog_exists(self, name: str):
        """
        Check if dataset exists in the compute engine catalog.
        The implementation should check if the dataset with the given name exists
        in the compute engine catalog.

        Args:
            name (str): The name of the data product in the project.
        Returns:
            bool: True if the dataset exists in the compute engine catalog, False otherwise.
        """
        pass

    @abstractmethod
    def get_schema_from_catalog(self, name: str):
        """
        Get the schema of the dataset from the compute engine catalog.
        The implementation should retrieve the schema of the dataset with the given name
        from the compute engine catalog.

        Args:
            name (str): The name of the data product in the project.
        Returns:
            dict: The schema of the dataset.
        """
        pass

    @abstractmethod
    def manifest_schema_to_engine_schema(self, manifest_schema: str):
        """
        Convert a manifest schema to the compute engine's native schema format.

        Args:
            manifest_schema (str): The schema defined in the manifest file.
        Returns:
            dict: The schema in the compute engine's native format.
        """
        pass

    @abstractmethod
    def engine_schema_to_manifest_schema(self, catalog_schema: dict):
        """
        Convert a compute engine's native schema to a manifest schema.

        Args:
            catalog_schema (dict): The schema in the compute engine's native format.
        Returns:
            dict: The schema in the manifest format.
        """
        pass

    @abstractmethod
    def validate_dataset_schema(self, manifest_schema: dict, df) -> bool:
        """
        Optional function to validate the manifest schema against a dataset
        during the execution of the data product script.

        Args:
            manifest_schema (dict): The schema defined in the manifest file.
            df: The dataset as a dataframe in the native format of the compute engine.
        Returns:
            bool: True if the dataset matches the manifest schema, False otherwise.
        """
        pass

    @abstractmethod
    def create_table(self, name: str, manifest_schema: str):
        """
        Create a new table in the compute engine catalog.

        Args:
            name (str): The name of the data product in the project.
            manifest_schema (str): The schema defined in the manifest file.
        """
        pass

    @abstractmethod
    def alter_table_add_column(self, table_name: str, column_name: str, column_type: str):
        """
        Add a new column to an existing table in the compute engine catalog.

        Args:
            table_name (str): The name of the table in the catalog.
            column_name (str): The name of the column to add.
            column_type (str): The type of the column to add.
        """
        pass

    @abstractmethod
    def alter_table_drop_column(self, table_name: str, column_name: str):
        """
        Drop a column from an existing table in the compute engine catalog.
        Args:
            table_name (str): The name of the table in the catalog.
            column_name (str): The name of the column to drop.
        """
        pass

    @abstractmethod
    def alter_table_alter_column(self, table_name: str, column_name: str, column_type: str):
        """
        Alter the type of a column in an existing table in the compute engine catalog.
        Args:
            table_name (str): The name of the table in the catalog.
            column_name (str): The name of the column to alter.
            column_type (str): The new type of the column.
        """
        pass

    @staticmethod
    def compute_changeset(manifest_schema: dict, catalog_schema: dict):
        manifest_props = manifest_schema.get("properties", {}) if manifest_schema else {}
        catalog_props = catalog_schema.get("properties", {}) if catalog_schema else {}

        manifest_keys = set(manifest_props.keys())
        catalog_keys = set(catalog_props.keys())

        changeset = []
        for key in manifest_keys | catalog_keys:
            if key not in catalog_keys:
                # Column exists only in defined schema → ADD
                manifest_type = manifest_props[key].get("type")
                changeset.append(("ADD", key, manifest_type))
            elif key not in manifest_keys:
                # Column exists only in catalog schema → DROP
                changeset.append(("DROP", key, None))
            else:
                manifest_type = manifest_props[key].get("type")
                cat_type = catalog_props[key].get("type")
                if manifest_type != cat_type:
                    # Type mismatch → ALTER (use Spark type for SQL statement)
                    changeset.append(
                        (
                            "ALTER",
                            key,
                            manifest_type,
                        )
                    )
        return changeset

    def update_or_create_schema(self, name: str, manifest_schema: str, commit: bool = False):
        # Check if table exists in catalog
        if not self.check_catalog_exists(name):
            print(f"Table {name} does not exist. A new table will be created.")

            if not commit:
                print("Dry run mode, not creating table.")
                return

            self.create_table(name, manifest_schema)

        # Table exists, check for schema changes
        else:
            # Compute schema changes reusing the same diff strategy used by the CLI
            catalog_schema = self.get_schema_from_catalog(name)

            changeset = self.compute_changeset(manifest_schema, catalog_schema)

            print(f"Schema changes for table {name}:")
            for change in changeset:
                print(f"{change[0]:6} {change[1]:20} {change[2]}")

            if not commit:
                print("Dry run mode, not applying changes.")
                return

            # Apply schema changes to the table in the catalog
            for op, col_name, col_type in changeset:
                if op == "ADD":
                    self.alter_table_add_column(name, col_name, col_type)
                elif op == "DROP":
                    self.alter_table_drop_column(name, col_name)
                elif op == "ALTER":
                    self.alter_table_alter_column(name, col_name, col_type)

    def generate_synthetic_data_from_manifest(self, manifest_schema: dict) -> dict:
        synthetic_data = {}

        for prop_name, prop_definition in manifest_schema.get("properties", {}).items():
            jtype = prop_definition.get("type", "string")

            n = get_use_synthetic_data_sample_size()

            if jtype == "string":
                data = [f"sample_{prop_name}" for _ in range(n)]
            elif jtype == "integer":
                data = [123 for _ in range(n)]
            elif jtype == "number":
                data = [123.45 for _ in range(n)]
            elif jtype == "boolean":
                data = [True for _ in range(n)]
            else:
                data = [None for _ in range(n)]

            synthetic_data[prop_name] = data

        return synthetic_data

read_dataset abstractmethod

read_dataset(name: str)

Read dataset from the compute engine catalog. The implementation should return a dataframe in the native format of the compute engine (e.g. Spark DataFrame, Pandas DataFrame, etc.).

Parameters:

Name Type Description Default
name str

The name of the data product in the project.

required

Returns:

Name Type Description
DataFrame

The dataset as a dataframe in the native format of the compute engine.

Source code in src/tidylake/plugins/compute_engine.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@abstractmethod
def read_dataset(self, name: str):
    """
    Read dataset from the compute engine catalog.
    The implementation should return a dataframe in the native format of the
    compute engine (e.g. Spark DataFrame, Pandas DataFrame, etc.).

    Args:
        name (str): The name of the data product in the project.

    Returns:
        DataFrame: The dataset as a dataframe in the native format of the compute engine.
    """
    pass

display_dataset

display_dataset(df)

Display dataset in the appropriate format based on the execution environment.

If the execution mode is interactive (e.g. Jupyter notebook), use rich display. Otherwise, print the dataframe in a simple format. Method can be overridden by compute engine plugins to provide custom display logic.

Parameters:

Name Type Description Default
df

The dataset as a dataframe in the native format of the compute engine.

required
Source code in src/tidylake/plugins/compute_engine.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def display_dataset(self, df):
    """
    Display dataset in the appropriate format based on the execution environment.

    If the execution mode is interactive (e.g. Jupyter notebook), use rich display.
    Otherwise, print the dataframe in a simple format. Method can be overridden
    by compute engine plugins to provide custom display logic.

    Args:
        df: The dataset as a dataframe in the native format of the compute engine.
    """
    if execution_mode == EXECUTION_MODE_INTERACTIVE:
        display(df)  # type: ignore  # noqa: F821
    else:
        # TODO: update to rich table display
        print(df)

read_synthetic_dataset abstractmethod

read_synthetic_dataset(manifest_schema: dict)

Generate synthetic dataset based on the manifest schema and return it as a dataframe in the native format of the compute engine. The implementation should return a dataframe in the native format of the compute engine (e.g. Spark DataFrame, Pandas DataFrame, etc.).

Parameters:

Name Type Description Default
manifest_schema dict

The schema defined in the manifest file.

required

Returns: DataFrame: The synthetic dataset as a dataframe in the native format of the compute engine.

Source code in src/tidylake/plugins/compute_engine.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@abstractmethod
def read_synthetic_dataset(self, manifest_schema: dict):
    """
    Generate synthetic dataset based on the manifest schema and return it as a dataframe
    in the native format of the compute engine.
    The implementation should return a dataframe in the native format of the compute engine
    (e.g. Spark DataFrame, Pandas DataFrame, etc.).

    Args:
        manifest_schema (dict): The schema defined in the manifest file.
    Returns:
        DataFrame: The synthetic dataset as a dataframe in the native format of the compute engine.
    """
    pass

write_dataset abstractmethod

write_dataset(name: str, df)

Write dataset to the compute engine catalog. The implementation should handle writing the dataframe to the compute engine catalog in the appropriate format.

Parameters:

Name Type Description Default
name str

The name of the data product in the project.

required
df

The dataset as a dataframe in the native format of the compute engine.

required
Source code in src/tidylake/plugins/compute_engine.py
58
59
60
61
62
63
64
65
66
67
68
69
@abstractmethod
def write_dataset(self, name: str, df):
    """
    Write dataset to the compute engine catalog.
    The implementation should handle writing the dataframe to the compute
    engine catalog in the appropriate format.

    Args:
        name (str): The name of the data product in the project.
        df: The dataset as a dataframe in the native format of the compute engine.
    """
    pass

check_catalog_exists abstractmethod

check_catalog_exists(name: str)

Check if dataset exists in the compute engine catalog. The implementation should check if the dataset with the given name exists in the compute engine catalog.

Parameters:

Name Type Description Default
name str

The name of the data product in the project.

required

Returns: bool: True if the dataset exists in the compute engine catalog, False otherwise.

Source code in src/tidylake/plugins/compute_engine.py
71
72
73
74
75
76
77
78
79
80
81
82
83
@abstractmethod
def check_catalog_exists(self, name: str):
    """
    Check if dataset exists in the compute engine catalog.
    The implementation should check if the dataset with the given name exists
    in the compute engine catalog.

    Args:
        name (str): The name of the data product in the project.
    Returns:
        bool: True if the dataset exists in the compute engine catalog, False otherwise.
    """
    pass

get_schema_from_catalog abstractmethod

get_schema_from_catalog(name: str)

Get the schema of the dataset from the compute engine catalog. The implementation should retrieve the schema of the dataset with the given name from the compute engine catalog.

Parameters:

Name Type Description Default
name str

The name of the data product in the project.

required

Returns: dict: The schema of the dataset.

Source code in src/tidylake/plugins/compute_engine.py
85
86
87
88
89
90
91
92
93
94
95
96
97
@abstractmethod
def get_schema_from_catalog(self, name: str):
    """
    Get the schema of the dataset from the compute engine catalog.
    The implementation should retrieve the schema of the dataset with the given name
    from the compute engine catalog.

    Args:
        name (str): The name of the data product in the project.
    Returns:
        dict: The schema of the dataset.
    """
    pass

manifest_schema_to_engine_schema abstractmethod

manifest_schema_to_engine_schema(manifest_schema: str)

Convert a manifest schema to the compute engine's native schema format.

Parameters:

Name Type Description Default
manifest_schema str

The schema defined in the manifest file.

required

Returns: dict: The schema in the compute engine's native format.

Source code in src/tidylake/plugins/compute_engine.py
 99
100
101
102
103
104
105
106
107
108
109
@abstractmethod
def manifest_schema_to_engine_schema(self, manifest_schema: str):
    """
    Convert a manifest schema to the compute engine's native schema format.

    Args:
        manifest_schema (str): The schema defined in the manifest file.
    Returns:
        dict: The schema in the compute engine's native format.
    """
    pass

engine_schema_to_manifest_schema abstractmethod

engine_schema_to_manifest_schema(catalog_schema: dict)

Convert a compute engine's native schema to a manifest schema.

Parameters:

Name Type Description Default
catalog_schema dict

The schema in the compute engine's native format.

required

Returns: dict: The schema in the manifest format.

Source code in src/tidylake/plugins/compute_engine.py
111
112
113
114
115
116
117
118
119
120
121
@abstractmethod
def engine_schema_to_manifest_schema(self, catalog_schema: dict):
    """
    Convert a compute engine's native schema to a manifest schema.

    Args:
        catalog_schema (dict): The schema in the compute engine's native format.
    Returns:
        dict: The schema in the manifest format.
    """
    pass

validate_dataset_schema abstractmethod

validate_dataset_schema(manifest_schema: dict, df) -> bool

Optional function to validate the manifest schema against a dataset during the execution of the data product script.

Parameters:

Name Type Description Default
manifest_schema dict

The schema defined in the manifest file.

required
df

The dataset as a dataframe in the native format of the compute engine.

required

Returns: bool: True if the dataset matches the manifest schema, False otherwise.

Source code in src/tidylake/plugins/compute_engine.py
123
124
125
126
127
128
129
130
131
132
133
134
135
@abstractmethod
def validate_dataset_schema(self, manifest_schema: dict, df) -> bool:
    """
    Optional function to validate the manifest schema against a dataset
    during the execution of the data product script.

    Args:
        manifest_schema (dict): The schema defined in the manifest file.
        df: The dataset as a dataframe in the native format of the compute engine.
    Returns:
        bool: True if the dataset matches the manifest schema, False otherwise.
    """
    pass

create_table abstractmethod

create_table(name: str, manifest_schema: str)

Create a new table in the compute engine catalog.

Parameters:

Name Type Description Default
name str

The name of the data product in the project.

required
manifest_schema str

The schema defined in the manifest file.

required
Source code in src/tidylake/plugins/compute_engine.py
137
138
139
140
141
142
143
144
145
146
@abstractmethod
def create_table(self, name: str, manifest_schema: str):
    """
    Create a new table in the compute engine catalog.

    Args:
        name (str): The name of the data product in the project.
        manifest_schema (str): The schema defined in the manifest file.
    """
    pass

alter_table_add_column abstractmethod

alter_table_add_column(
    table_name: str, column_name: str, column_type: str
)

Add a new column to an existing table in the compute engine catalog.

Parameters:

Name Type Description Default
table_name str

The name of the table in the catalog.

required
column_name str

The name of the column to add.

required
column_type str

The type of the column to add.

required
Source code in src/tidylake/plugins/compute_engine.py
148
149
150
151
152
153
154
155
156
157
158
@abstractmethod
def alter_table_add_column(self, table_name: str, column_name: str, column_type: str):
    """
    Add a new column to an existing table in the compute engine catalog.

    Args:
        table_name (str): The name of the table in the catalog.
        column_name (str): The name of the column to add.
        column_type (str): The type of the column to add.
    """
    pass

alter_table_drop_column abstractmethod

alter_table_drop_column(table_name: str, column_name: str)

Drop a column from an existing table in the compute engine catalog. Args: table_name (str): The name of the table in the catalog. column_name (str): The name of the column to drop.

Source code in src/tidylake/plugins/compute_engine.py
160
161
162
163
164
165
166
167
168
@abstractmethod
def alter_table_drop_column(self, table_name: str, column_name: str):
    """
    Drop a column from an existing table in the compute engine catalog.
    Args:
        table_name (str): The name of the table in the catalog.
        column_name (str): The name of the column to drop.
    """
    pass

alter_table_alter_column abstractmethod

alter_table_alter_column(
    table_name: str, column_name: str, column_type: str
)

Alter the type of a column in an existing table in the compute engine catalog. Args: table_name (str): The name of the table in the catalog. column_name (str): The name of the column to alter. column_type (str): The new type of the column.

Source code in src/tidylake/plugins/compute_engine.py
170
171
172
173
174
175
176
177
178
179
@abstractmethod
def alter_table_alter_column(self, table_name: str, column_name: str, column_type: str):
    """
    Alter the type of a column in an existing table in the compute engine catalog.
    Args:
        table_name (str): The name of the table in the catalog.
        column_name (str): The name of the column to alter.
        column_type (str): The new type of the column.
    """
    pass