Skip to content

DataProduct

The Data Product class is one of the building blocks of tidylake.

This class is meant to be instantiated from the manifest files when loading the framework, objects will be retrieved using the available helper functions in the scripts.

Source code in src/tidylake/core/data_product.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
class DataProduct:
    """
    The Data Product class is one of the building blocks of tidylake.

    This class is meant to be instantiated from the manifest files when loading the framework,
    objects will be retrieved using the available helper functions in the scripts.
    """

    name: str
    schema: dict = None
    raw_inputs: list[str]
    inputs: list[str]
    script: str = None
    sink: Callable = None
    # TODO: how can we type this as it depends on the compute engine or can be even custom...
    output_data = None
    compute_engine: ComputeEnginePlugin = None

    @classmethod
    def get_config(cls, config_file: str) -> dict:
        config_file_path = Path(config_file).resolve()
        if not config_file_path.is_file():
            raise FileNotFoundError(f"Configuration file {config_file_path!r} not found.")

        # Create data producct
        with open(config_file_path) as data_product_config_file:
            data_product_config = yaml.safe_load(data_product_config_file)

        # TODO: Validate data_product config
        return data_product_config

    def __init__(
        self,
        name: str,
        schema: dict,
        script: str | None = None,
        config_dir: Path | None = None,
    ):
        self.name = name
        self.schema = schema
        self.script = script if script is not None else name
        self.config_dir = config_dir or Path(".")

        self.raw_inputs = []
        self.inputs = []

        # process script with ast to analyze inputs
        script_path = self.config_dir / f"{self.script.replace('.', '/')}.py"

        if script_path.is_file():
            try:
                with open(script_path) as f:
                    code = f.read()
                self.inputs = parse_script_inputs(code)
            except FileNotFoundError:
                print(f"WARNING: Data product script {self.script} not found. Inputs will not be available.")
        # No warning if script file doesn't exist - it's optional

    def set_compute_engine(self, compute_engine: ComputeEnginePlugin):
        """
        Set the compute engine for the data product.

        Args:
            compute_engine (ComputeEnginePlugin): The compute engine plugin to use.
        """
        self.compute_engine = compute_engine

    # noqa: SIM108
    def read_input(self, name: str):
        if not self.compute_engine:
            raise ValueError("Compute engine not set for the data product.")

        if get_use_synthetic_data():
            result = self.compute_engine.read_synthetic_dataset(self.schema)
        else:
            result = self.compute_engine.read_dataset(name)

        # if running in interactive mode, display the result
        if execution_mode == EXECUTION_MODE_INTERACTIVE:
            if self.compute_engine:
                self.compute_engine.display_dataset(result)
            else:
                display(result)  # pyright: ignore[reportUndefinedVariable] # noqa: F821

        return result

    def add_input(self, name: str | None = None, raw: bool = False) -> Callable[[Callable], Callable]:
        """
        Decorator to add an input to the data product within the script.

        Args:
            name (str, optional): The name of the input. If not provided, the function
                name will be used.
            raw (bool, optional): If True, the input is considered a raw input.
                Defaults to False.

        Returns:
            Callable: A decorator that wraps the function to add it as an input,
                calling the function will return the DataFrame or data structure

        """

        def decorator(fn: Callable):
            input_name = name or fn.__name__
            if not isinstance(input_name, str):
                raise ValueError("Input name must be a string.")

            @functools.wraps(fn)
            def wrapped(*args, **kwargs):
                # invoke the function to get the DataFrame or data structure
                result = fn(*args, **kwargs)

                if execution_mode == EXECUTION_MODE_INTERACTIVE:
                    if self.compute_engine:
                        self.compute_engine.display_dataset(result)
                    else:
                        display(result)  # pyright: ignore[reportUndefinedVariable] # noqa: F821

                return result

            return wrapped

        return decorator

    def peek(self) -> None:
        if self.compute_engine:
            df = self.read_input(self.name)
            self.compute_engine.display_dataset(df)
        else:
            raise ValueError("Compute engine not set for the data product.")

    def set_sink(
        self,
    ) -> Callable[[Callable], Callable]:
        def decorator(fn: Callable):
            self.sink = fn

            @functools.wraps(fn)
            def wrapped(*args, **kwargs):
                print("WARN: Calling sink from data product modules has no effect")

            return wrapped

        return decorator

    def set_output_data(self, data):
        if not self.compute_engine:
            raise ValueError("Compute engine not set for the data product.")

        self.compute_engine.validate_dataset_schema(self.schema, data)
        self.output_data = data

    def get_schemas(self):
        catalog_schema = None

        if self.compute_engine and self.compute_engine.check_catalog_exists(self.name):
            catalog_schema = self.compute_engine.get_schema_from_catalog(self.name)

        return {
            "defined_schema": self.schema,
            "catalog_schema": catalog_schema if catalog_schema else {},
        }

    def update_or_create_schema(self, commit: bool = False) -> None:
        if self.compute_engine and self.schema:
            self.compute_engine.update_or_create_schema(self.name, self.schema, commit=commit)
        else:
            raise ValueError(
                "Compute engine or schema not set for the data product.Cannot update or create schema."
            )

    def run(self):
        # If session is interactive do nothing
        if execution_mode == EXECUTION_MODE_INTERACTIVE:
            print("WARN: Calling run from data product modules has no effect")
            return

        else:
            # Ensure the directory containing your modules is in sys.path
            project_root = str(self.config_dir.resolve())

            sys.path.insert(0, project_root)
            importlib.import_module(self.script)

            if self.sink:
                self.sink()

            elif self.compute_engine:
                if self.output_data is None:
                    raise ValueError(
                        "No output data set for the data product.\
                                     Please set output_data when using a compute engine."
                    )

                self.compute_engine.write_dataset(
                    self.name,
                    self.output_data,
                )
            else:
                raise ValueError("No compute engine or set set for the data product.")

set_compute_engine

set_compute_engine(compute_engine: ComputeEnginePlugin)

Set the compute engine for the data product.

Parameters:

Name Type Description Default
compute_engine ComputeEnginePlugin

The compute engine plugin to use.

required
Source code in src/tidylake/core/data_product.py
73
74
75
76
77
78
79
80
def set_compute_engine(self, compute_engine: ComputeEnginePlugin):
    """
    Set the compute engine for the data product.

    Args:
        compute_engine (ComputeEnginePlugin): The compute engine plugin to use.
    """
    self.compute_engine = compute_engine

add_input

add_input(
    name: str | None = None, raw: bool = False
) -> Callable[[Callable], Callable]

Decorator to add an input to the data product within the script.

Parameters:

Name Type Description Default
name str

The name of the input. If not provided, the function name will be used.

None
raw bool

If True, the input is considered a raw input. Defaults to False.

False

Returns:

Name Type Description
Callable Callable[[Callable], Callable]

A decorator that wraps the function to add it as an input, calling the function will return the DataFrame or data structure

Source code in src/tidylake/core/data_product.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def add_input(self, name: str | None = None, raw: bool = False) -> Callable[[Callable], Callable]:
    """
    Decorator to add an input to the data product within the script.

    Args:
        name (str, optional): The name of the input. If not provided, the function
            name will be used.
        raw (bool, optional): If True, the input is considered a raw input.
            Defaults to False.

    Returns:
        Callable: A decorator that wraps the function to add it as an input,
            calling the function will return the DataFrame or data structure

    """

    def decorator(fn: Callable):
        input_name = name or fn.__name__
        if not isinstance(input_name, str):
            raise ValueError("Input name must be a string.")

        @functools.wraps(fn)
        def wrapped(*args, **kwargs):
            # invoke the function to get the DataFrame or data structure
            result = fn(*args, **kwargs)

            if execution_mode == EXECUTION_MODE_INTERACTIVE:
                if self.compute_engine:
                    self.compute_engine.display_dataset(result)
                else:
                    display(result)  # pyright: ignore[reportUndefinedVariable] # noqa: F821

            return result

        return wrapped

    return decorator