Skip to content

TidyLakeContext

Source code in src/tidylake/core/context.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
class TidyLakeContext:
    @classmethod
    def get_config(cls, config_file: str) -> dict:
        """
        Load the configuration from a YAML file.

        Args:
            config_file (str): The path to the configuration file.

        Returns:
            dict: The loaded configuration.
        """

        config_file_path = Path(config_file).resolve()
        if not config_file_path.is_file():
            raise FileNotFoundError(f"Configuration file {config_file_path!r} not found.")

        with open(config_file_path) as f:
            context_config = yaml.safe_load(f)

        # TODO: Validate context config data
        return context_config

    @classmethod
    def load_plugin(cls, plugin_name: str, plugin_config: dict, config_dir: Path = None) -> None:
        """
        Load a  plugin dynamically.

        Args:
            plugin_config (dict): The configuration for the plugin.
            config_dir (Path): Directory to resolve relative plugin paths from.
        """

        pluging_class_name = plugin_config.get("plugin_class_name")

        if not pluging_class_name:
            raise ValueError(f"{plugin_name} configuration must include 'plugin_class_name'.")

        # If the plugin is specified as a module
        if plugin_config.get("plugin_module"):
            module = importlib.import_module(plugin_config.get("plugin_module"))

            return getattr(module, pluging_class_name)(plugin_config)

        # If the plugin is specified as a file path
        elif plugin_config.get("plugin_path"):
            plugin_path = plugin_config.get("plugin_path")
            if config_dir and not Path(plugin_path).is_absolute():
                file_path = (config_dir / plugin_path).resolve()
            else:
                file_path = Path(plugin_path).resolve()

            spec = importlib.util.spec_from_file_location(plugin_name, file_path)

            if spec is None:
                raise ImportError(
                    f"Could not find {plugin_name} plugin module at {file_path!r}. \
                                  Please check the path and try again."
                )

            mod = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(mod)

            return getattr(mod, pluging_class_name)(plugin_config)

        else:
            raise ValueError(
                "Compute engine configuration must include either'compute_engine_plugin_module'\
                              or 'compute_engine_plugin_path'."
            )

    def __init__(
        self,
        name: str,
        compute_engine_config: str = None,
        catalog_config: str = None,
        config_dir: Path = None,
    ):
        self.name = name
        self.data_products: dict[str, DataProduct] = {}
        self._dependencies: dict[str, tuple[str, ...]] = {}
        self.compute_engine: ComputeEnginePlugin | None = None

        if compute_engine_config:
            self.compute_engine = self.load_plugin("compute_engine", compute_engine_config, config_dir)

    def add_data_product(self, data_product: DataProduct) -> None:
        """
        Add a data product to the context instance.

        Args:
            data_product (DataProduct): The data product to add.
        """

        if self.compute_engine:
            data_product.set_compute_engine(self.compute_engine)

        self.data_products[data_product.name] = data_product
        self._dependencies[data_product.name] = tuple(data_product.inputs)

    def get_data_product(self, name: str) -> DataProduct:
        """
        Get a specific data product from the current context instance.

        Args:
            name (str): The name of the data product to retrieve.

        Returns:
            DataProduct: The requested data product instance.
        """

        if name in self.data_products:
            return self.data_products[name]
        else:
            raise ValueError(f"Data product '{name}' not found in the context instance.")

    def get_graph_sequence(self) -> list[str]:
        """
        This also checks for topological problems

        Returns:
            list[str]: A list of data product names in the order they should be executed.
        """

        try:
            graph = TopologicalSorter()
            for name, deps in self._dependencies.items():
                graph.add(name, *deps)

            return [s for s in graph.static_order() if s in self.data_products]
        except CycleError as e:
            print("❌ Cycle detected in the process dependencies!")
            print("Loop involving:", e.args[1])

    def get_dependencies(self) -> dict[str, tuple[str, ...]]:
        """
        Return a shallow copy of the dependency map (data product -> required inputs).

        Returns:
            dict[str, tuple[str, ...]]: Mapping of each data product to its declared inputs.
        """

        return dict(self._dependencies)

    def run_data_product(self, name: str):
        """
        Run a specific data product by name.

        Args:
            name (str): The name of the data product to run.

        """

        if name in self.data_products:
            print(f"⚡️ Running data product: {name}")
            self.data_products[name].run()
        else:
            print(f"Data product '{name}' is not defined")

    def run(self, name: str, continue_run: bool = False) -> None:
        """
        Run the process, given the initial conditions

        Args:
            name (str): The name of the data product to start from.
            continue_run (bool): If True, continue from the last data product,
                otherwise run a single data product.
        """

        g = self.get_graph_sequence()

        # get the index of the data product to continue from
        data_product_idx = g.index(name) if name in g else 0

        # if continue_run, we will run from the last data product else run a single data product
        data_products = (
            g[data_product_idx : data_product_idx + 1] if name and not continue_run else g[data_product_idx:]
        )

        # run data products
        for s in data_products:
            self.run_data_product(s)

    def visualize(self, mermaid: bool = False, textual: bool = False) -> None:
        """
        Visualize the run in different formats.

        Args:
            mermaid (bool): If True, generate a Mermaid diagram.
            textual (bool): If True, open a Textual TUI to inspect the graph.
        """

        def build_mermaid_lines(*, include_labels: bool = True, include_classdefs: bool = True) -> list[str]:
            def sanitize(name: str) -> str:
                return re.sub(r"[^0-9A-Za-z_]", "_", name)

            def node_label(name: str) -> str:
                return name.replace("_", " ").title() if include_labels else name

            lines: list[str] = ["flowchart TD"]

            defined_data_products = {name: sanitize(name) for name in self.get_graph_sequence()}
            external_nodes: dict[str, str] = {}

            if include_labels:
                for original_name, node_id in defined_data_products.items():
                    lines.append(f'    {node_id}["{node_label(original_name)}"]')

            for name in self.get_graph_sequence():
                data_product = self.data_products[name]
                for input_name in data_product.inputs:
                    if input_name in defined_data_products:
                        lines.append(
                            f"    {defined_data_products[input_name]} --> {defined_data_products[name]}"
                        )
                    else:
                        ext_id = f"ext_{sanitize(input_name)}"
                        if ext_id not in external_nodes:
                            external_nodes[ext_id] = node_label(input_name)
                            if include_labels:
                                lines.append(f'    {ext_id}["{node_label(input_name)}"]')
                        lines.append(f"    {ext_id} -.-> {defined_data_products[name]}")

            if include_labels and external_nodes and include_classdefs:
                lines.append("    classDef external fill:#f5f5f5,stroke:#999,stroke-width:1px;")
                for ext_id in external_nodes:
                    lines.append(f"    {ext_id}:::external")

            return lines

        if textual:
            from tidylake.visualization.textual_viewer import run_textual_viewer

            run_textual_viewer(self)
            return

        if mermaid:
            lines = build_mermaid_lines(include_labels=True, include_classdefs=True)
            print("```mermaid")
            for line in lines:
                print(line)
            print("```")

        else:
            for i, name in enumerate(self.get_graph_sequence(), 1):
                print(f"{i:02d}. {name}")

    def peek_data_product(self, name: str) -> None:
        """
        Peek into a specific data product by name.

        Args:
            name (str): The name of the data product to peek into.

        """

        if name in self.data_products:
            print(f"⚡️ Peeking into data product: {name}")
            self.data_products[name].peek()
        else:
            print(f"Data product '{name}' is not defined")

    def schema_diff(self, name: str = None) -> None:
        """
        Return schema differences for the selected data products.

        Args:
            name (str, optional): The name of a specific data product to check.
                If None, check all data products. Defaults to None.
        """

        if not self.compute_engine:
            print("No compute engine configured, cannot check schema.")
            return

        data_products_to_check = [self.get_data_product(name)] if name else self.data_products.values()

        return {
            data_product.name: data_product.get_schemas()
            for data_product in data_products_to_check
            if data_product.compute_engine
        }

    def schema_update(self, name: str = None, commit: bool = False) -> None:
        """
        Update or create the schema in the compute engine catalog for the selected data products.

        Args:
            name (str, optional): The name of a specific data product to update.
                If None, update all data products. Defaults to None.
            commit (bool): If True, perform the update/create operation.
                If False, only print what would be done.
        """

        if not self.compute_engine:
            print("No compute engine configured, cannot check schema.")
            return

        data_products_to_update = [self.get_data_product(name)] if name else self.data_products.values()

        for data_product in data_products_to_update:
            if data_product.compute_engine and data_product.schema:
                print(f"⚡️ Updating/Creating schema for data product: {data_product.name}")
                data_product.update_or_create_schema(commit=commit)

get_config classmethod

get_config(config_file: str) -> dict

Load the configuration from a YAML file.

Parameters:

Name Type Description Default
config_file str

The path to the configuration file.

required

Returns:

Name Type Description
dict dict

The loaded configuration.

Source code in src/tidylake/core/context.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@classmethod
def get_config(cls, config_file: str) -> dict:
    """
    Load the configuration from a YAML file.

    Args:
        config_file (str): The path to the configuration file.

    Returns:
        dict: The loaded configuration.
    """

    config_file_path = Path(config_file).resolve()
    if not config_file_path.is_file():
        raise FileNotFoundError(f"Configuration file {config_file_path!r} not found.")

    with open(config_file_path) as f:
        context_config = yaml.safe_load(f)

    # TODO: Validate context config data
    return context_config

load_plugin classmethod

load_plugin(
    plugin_name: str,
    plugin_config: dict,
    config_dir: Path = None,
) -> None

Load a plugin dynamically.

Parameters:

Name Type Description Default
plugin_config dict

The configuration for the plugin.

required
config_dir Path

Directory to resolve relative plugin paths from.

None
Source code in src/tidylake/core/context.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@classmethod
def load_plugin(cls, plugin_name: str, plugin_config: dict, config_dir: Path = None) -> None:
    """
    Load a  plugin dynamically.

    Args:
        plugin_config (dict): The configuration for the plugin.
        config_dir (Path): Directory to resolve relative plugin paths from.
    """

    pluging_class_name = plugin_config.get("plugin_class_name")

    if not pluging_class_name:
        raise ValueError(f"{plugin_name} configuration must include 'plugin_class_name'.")

    # If the plugin is specified as a module
    if plugin_config.get("plugin_module"):
        module = importlib.import_module(plugin_config.get("plugin_module"))

        return getattr(module, pluging_class_name)(plugin_config)

    # If the plugin is specified as a file path
    elif plugin_config.get("plugin_path"):
        plugin_path = plugin_config.get("plugin_path")
        if config_dir and not Path(plugin_path).is_absolute():
            file_path = (config_dir / plugin_path).resolve()
        else:
            file_path = Path(plugin_path).resolve()

        spec = importlib.util.spec_from_file_location(plugin_name, file_path)

        if spec is None:
            raise ImportError(
                f"Could not find {plugin_name} plugin module at {file_path!r}. \
                              Please check the path and try again."
            )

        mod = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(mod)

        return getattr(mod, pluging_class_name)(plugin_config)

    else:
        raise ValueError(
            "Compute engine configuration must include either'compute_engine_plugin_module'\
                          or 'compute_engine_plugin_path'."
        )

add_data_product

add_data_product(data_product: DataProduct) -> None

Add a data product to the context instance.

Parameters:

Name Type Description Default
data_product DataProduct

The data product to add.

required
Source code in src/tidylake/core/context.py
102
103
104
105
106
107
108
109
110
111
112
113
114
def add_data_product(self, data_product: DataProduct) -> None:
    """
    Add a data product to the context instance.

    Args:
        data_product (DataProduct): The data product to add.
    """

    if self.compute_engine:
        data_product.set_compute_engine(self.compute_engine)

    self.data_products[data_product.name] = data_product
    self._dependencies[data_product.name] = tuple(data_product.inputs)

get_data_product

get_data_product(name: str) -> DataProduct

Get a specific data product from the current context instance.

Parameters:

Name Type Description Default
name str

The name of the data product to retrieve.

required

Returns:

Name Type Description
DataProduct DataProduct

The requested data product instance.

Source code in src/tidylake/core/context.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def get_data_product(self, name: str) -> DataProduct:
    """
    Get a specific data product from the current context instance.

    Args:
        name (str): The name of the data product to retrieve.

    Returns:
        DataProduct: The requested data product instance.
    """

    if name in self.data_products:
        return self.data_products[name]
    else:
        raise ValueError(f"Data product '{name}' not found in the context instance.")

get_graph_sequence

get_graph_sequence() -> list[str]

This also checks for topological problems

Returns:

Type Description
list[str]

list[str]: A list of data product names in the order they should be executed.

Source code in src/tidylake/core/context.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def get_graph_sequence(self) -> list[str]:
    """
    This also checks for topological problems

    Returns:
        list[str]: A list of data product names in the order they should be executed.
    """

    try:
        graph = TopologicalSorter()
        for name, deps in self._dependencies.items():
            graph.add(name, *deps)

        return [s for s in graph.static_order() if s in self.data_products]
    except CycleError as e:
        print("❌ Cycle detected in the process dependencies!")
        print("Loop involving:", e.args[1])

get_dependencies

get_dependencies() -> dict[str, tuple[str, ...]]

Return a shallow copy of the dependency map (data product -> required inputs).

Returns:

Type Description
dict[str, tuple[str, ...]]

dict[str, tuple[str, ...]]: Mapping of each data product to its declared inputs.

Source code in src/tidylake/core/context.py
150
151
152
153
154
155
156
157
158
def get_dependencies(self) -> dict[str, tuple[str, ...]]:
    """
    Return a shallow copy of the dependency map (data product -> required inputs).

    Returns:
        dict[str, tuple[str, ...]]: Mapping of each data product to its declared inputs.
    """

    return dict(self._dependencies)

run_data_product

run_data_product(name: str)

Run a specific data product by name.

Parameters:

Name Type Description Default
name str

The name of the data product to run.

required
Source code in src/tidylake/core/context.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def run_data_product(self, name: str):
    """
    Run a specific data product by name.

    Args:
        name (str): The name of the data product to run.

    """

    if name in self.data_products:
        print(f"⚡️ Running data product: {name}")
        self.data_products[name].run()
    else:
        print(f"Data product '{name}' is not defined")

run

run(name: str, continue_run: bool = False) -> None

Run the process, given the initial conditions

Parameters:

Name Type Description Default
name str

The name of the data product to start from.

required
continue_run bool

If True, continue from the last data product, otherwise run a single data product.

False
Source code in src/tidylake/core/context.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def run(self, name: str, continue_run: bool = False) -> None:
    """
    Run the process, given the initial conditions

    Args:
        name (str): The name of the data product to start from.
        continue_run (bool): If True, continue from the last data product,
            otherwise run a single data product.
    """

    g = self.get_graph_sequence()

    # get the index of the data product to continue from
    data_product_idx = g.index(name) if name in g else 0

    # if continue_run, we will run from the last data product else run a single data product
    data_products = (
        g[data_product_idx : data_product_idx + 1] if name and not continue_run else g[data_product_idx:]
    )

    # run data products
    for s in data_products:
        self.run_data_product(s)

visualize

visualize(
    mermaid: bool = False, textual: bool = False
) -> None

Visualize the run in different formats.

Parameters:

Name Type Description Default
mermaid bool

If True, generate a Mermaid diagram.

False
textual bool

If True, open a Textual TUI to inspect the graph.

False
Source code in src/tidylake/core/context.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def visualize(self, mermaid: bool = False, textual: bool = False) -> None:
    """
    Visualize the run in different formats.

    Args:
        mermaid (bool): If True, generate a Mermaid diagram.
        textual (bool): If True, open a Textual TUI to inspect the graph.
    """

    def build_mermaid_lines(*, include_labels: bool = True, include_classdefs: bool = True) -> list[str]:
        def sanitize(name: str) -> str:
            return re.sub(r"[^0-9A-Za-z_]", "_", name)

        def node_label(name: str) -> str:
            return name.replace("_", " ").title() if include_labels else name

        lines: list[str] = ["flowchart TD"]

        defined_data_products = {name: sanitize(name) for name in self.get_graph_sequence()}
        external_nodes: dict[str, str] = {}

        if include_labels:
            for original_name, node_id in defined_data_products.items():
                lines.append(f'    {node_id}["{node_label(original_name)}"]')

        for name in self.get_graph_sequence():
            data_product = self.data_products[name]
            for input_name in data_product.inputs:
                if input_name in defined_data_products:
                    lines.append(
                        f"    {defined_data_products[input_name]} --> {defined_data_products[name]}"
                    )
                else:
                    ext_id = f"ext_{sanitize(input_name)}"
                    if ext_id not in external_nodes:
                        external_nodes[ext_id] = node_label(input_name)
                        if include_labels:
                            lines.append(f'    {ext_id}["{node_label(input_name)}"]')
                    lines.append(f"    {ext_id} -.-> {defined_data_products[name]}")

        if include_labels and external_nodes and include_classdefs:
            lines.append("    classDef external fill:#f5f5f5,stroke:#999,stroke-width:1px;")
            for ext_id in external_nodes:
                lines.append(f"    {ext_id}:::external")

        return lines

    if textual:
        from tidylake.visualization.textual_viewer import run_textual_viewer

        run_textual_viewer(self)
        return

    if mermaid:
        lines = build_mermaid_lines(include_labels=True, include_classdefs=True)
        print("```mermaid")
        for line in lines:
            print(line)
        print("```")

    else:
        for i, name in enumerate(self.get_graph_sequence(), 1):
            print(f"{i:02d}. {name}")

peek_data_product

peek_data_product(name: str) -> None

Peek into a specific data product by name.

Parameters:

Name Type Description Default
name str

The name of the data product to peek into.

required
Source code in src/tidylake/core/context.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
def peek_data_product(self, name: str) -> None:
    """
    Peek into a specific data product by name.

    Args:
        name (str): The name of the data product to peek into.

    """

    if name in self.data_products:
        print(f"⚡️ Peeking into data product: {name}")
        self.data_products[name].peek()
    else:
        print(f"Data product '{name}' is not defined")

schema_diff

schema_diff(name: str = None) -> None

Return schema differences for the selected data products.

Parameters:

Name Type Description Default
name str

The name of a specific data product to check. If None, check all data products. Defaults to None.

None
Source code in src/tidylake/core/context.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def schema_diff(self, name: str = None) -> None:
    """
    Return schema differences for the selected data products.

    Args:
        name (str, optional): The name of a specific data product to check.
            If None, check all data products. Defaults to None.
    """

    if not self.compute_engine:
        print("No compute engine configured, cannot check schema.")
        return

    data_products_to_check = [self.get_data_product(name)] if name else self.data_products.values()

    return {
        data_product.name: data_product.get_schemas()
        for data_product in data_products_to_check
        if data_product.compute_engine
    }

schema_update

schema_update(
    name: str = None, commit: bool = False
) -> None

Update or create the schema in the compute engine catalog for the selected data products.

Parameters:

Name Type Description Default
name str

The name of a specific data product to update. If None, update all data products. Defaults to None.

None
commit bool

If True, perform the update/create operation. If False, only print what would be done.

False
Source code in src/tidylake/core/context.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def schema_update(self, name: str = None, commit: bool = False) -> None:
    """
    Update or create the schema in the compute engine catalog for the selected data products.

    Args:
        name (str, optional): The name of a specific data product to update.
            If None, update all data products. Defaults to None.
        commit (bool): If True, perform the update/create operation.
            If False, only print what would be done.
    """

    if not self.compute_engine:
        print("No compute engine configured, cannot check schema.")
        return

    data_products_to_update = [self.get_data_product(name)] if name else self.data_products.values()

    for data_product in data_products_to_update:
        if data_product.compute_engine and data_product.schema:
            print(f"⚡️ Updating/Creating schema for data product: {data_product.name}")
            data_product.update_or_create_schema(commit=commit)