Skip to content

Delta

toolbox_pyspark.delta 🔗

Summary

The delta module is for various processes related to Delta Lake tables. Including optimising tables, merging tables, retrieving table history, and transferring between locations.

load_table 🔗

load_table(
    name: str, path: str, spark_session: SparkSession
) -> DeltaTable

Summary

Load a DeltaTable from a path.

Details

Under the hood, this function simply calls the .forPath() method

Parameters:

Name Type Description Default
name str

The name of the DeltaTable.

required
path str

The path where the DeltaTable is found.

required
spark_session SparkSession

The SparkSession to use for loading the DeltaTable.

required

Raises:

Type Description
TypeError

If any of the inputs parsed to the parameters of this function are not the correct type. Uses the @typeguard.typechecked decorator.

Returns:

Type Description
DeltaTable

The loaded DeltaTable.

See also
Source code in src/toolbox_pyspark/delta.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@typechecked
def load_table(
    name: str,
    path: str,
    spark_session: SparkSession,
) -> DeltaTable:
    """
    !!! note "Summary"
        Load a [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) from a path.

    ???+ abstract "Details"
        Under the hood, this function simply calls the [`.forPath()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.forPath) method

    Params:
        name (str):
            The name of the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable).
        path (str):
            The path where the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) is found.
        spark_session (SparkSession):
            The SparkSession to use for loading the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable).

    Raises:
        TypeError:
            If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.

    Returns:
        (DeltaTable):
            The loaded [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable).

    ??? tip "See also"
        - [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable
        - [`DeltaTable.forPath()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.forPath)
    """
    return DeltaTable.forPath(
        sparkSession=spark_session,
        path=f"{path}{'/' if not path.endswith('/') else ''}{name}",
    )

count_rows 🔗

count_rows(
    table: Union[str, DeltaTable],
    path: Optional[str] = None,
    spark_session: Optional[SparkSession] = None,
) -> int

Summary

Count the number of rows on a given DeltaTable.

Details

Under the hood, this function will convert the DeltaTable to a Spark DataFrame to then execute the .count() method.

Parameters:

Name Type Description Default
table Union[str, DeltaTable]

The table to check.
If it is a DeltaTable, then it will immediately use that.
If it is a str, then it will use that as the name of the table from where to load the DeltaTable from.

required
path Optional[str]

If table is a str, then path is mandatory, and is used as the path location for where to find the DeltaTable to load from.
Defaults to None.

None
spark_session Optional[SparkSession]

If table is str, then spark_session is mandatory. This is the SparkSession to use for loading the DeltaTable.
Defaults to None.

None

Raises:

Type Description
TypeError

If any of the inputs parsed to the parameters of this function are not the correct type. Uses the @typeguard.typechecked decorator.

AssertionError

If table is a str, then path and spark_session cannot be None.

Returns:

Type Description
int

The number of rows on table.

See also
Source code in src/toolbox_pyspark/delta.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@typechecked
def count_rows(
    table: Union[str, DeltaTable],
    path: Optional[str] = None,
    spark_session: Optional[SparkSession] = None,
) -> int:
    """
    !!! note "Summary"
        Count the number of rows on a given [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable).

    ???+ abstract "Details"
        Under the hood, this function will convert the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) to a Spark [`DataFrame`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html) to then execute the [`.count()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.count.html) method.

    Params:
        table (Union[str, DeltaTable]):
            The table to check.<br>
            If it is a [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable), then it will immediately use that.<br>
            If it is a `#!py str`, then it will use that as the name of the table from where to load the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) from.
        path (Optional[str], optional):
            If `table` is a `#!py str`, then `path` is mandatory, and is used as the `path` location for where to find the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) to load from.<br>
            Defaults to `#!py None`.
        spark_session (Optional[SparkSession], optional):
            If `table` is `#!py str`, then `spark_session` is mandatory. This is the [`SparkSession`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html) to use for loading the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable).<br>
            Defaults to `#!py None`.

    Raises:
        TypeError:
            If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
        AssertionError:
            If `table` is a `str`, then `path` and `spark_session` cannot be `None`.

    Returns:
        (int):
            The number of rows on `table`.

    ??? tip "See also"
        - [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable)
        - [`DeltaTable.toDF()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.toDF)
        - [`pyspark.sql.DataFrame.count()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.count.html)
    """
    if is_type(table, str):
        assert path is not None, "If `table` is a `str`, then `path` cannot be `None`."
        assert (
            spark_session is not None
        ), "If `table` is a `str`, then `spark_session` cannot be `None`."
        table = load_table(name=table, path=path, spark_session=spark_session)
    return table.toDF().count()

get_history 🔗

get_history(
    table: Union[str, DeltaTable],
    path: Optional[str] = None,
    spark_session: Optional[SparkSession] = None,
) -> psDataFrame

Summary

Retrieve the transaction history for a given DeltaTable.

Details

Under the hood, this function will simply call the .history() method.

Parameters:

Name Type Description Default
table Union[str, DeltaTable]

The table to check.
If it is a DeltaTable, then it will immediately use that.
If it is a str, then it will use that as the name of the table from where to load the DeltaTable from.

required
path Optional[str]

If table is a str, then path is mandatory, and is used as the path location for where to find the DeltaTable to load from.
Defaults to None.

None
spark_session Optional[SparkSession]

If table is str, then spark_session is mandatory. This is the SparkSession to use for loading the DeltaTable.
Defaults to None.

None

Raises:

Type Description
TypeError

If any of the inputs parsed to the parameters of this function are not the correct type. Uses the @typeguard.typechecked decorator.

AssertionError

If table is a str, then path and spark_session cannot be None.

Returns:

Type Description
DataFrame

The transaction history for a given DeltaTable.

See also
Source code in src/toolbox_pyspark/delta.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
@typechecked
def get_history(
    table: Union[str, DeltaTable],
    path: Optional[str] = None,
    spark_session: Optional[SparkSession] = None,
) -> psDataFrame:
    """
    !!! note "Summary"
        Retrieve the transaction history for a given [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable).

    ???+ abstract "Details"
        Under the hood, this function will simply call the [`.history()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.history) method.

    Params:
        table (Union[str, DeltaTable]):
            The table to check.<br>
            If it is a [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable), then it will immediately use that.<br>
            If it is a `#!py str`, then it will use that as the name of the table from where to load the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) from.
        path (Optional[str], optional):
            If `table` is a `#!py str`, then `path` is mandatory, and is used as the `path` location for where to find the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) to load from.<br>
            Defaults to `#!py None`.
        spark_session (Optional[SparkSession], optional):
            If `table` is `#!py str`, then `spark_session` is mandatory. This is the [`SparkSession`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html) to use for loading the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable).<br>
            Defaults to `#!py None`.

    Raises:
        TypeError:
            If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
        AssertionError:
            If `table` is a `str`, then `path` and `spark_session` cannot be `None`.

    Returns:
        (psDataFrame):
            The transaction history for a given [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable).

    ??? tip "See also"
        - [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable)
        - [`DeltaTable.history()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.history)
    """
    if is_type(table, str):
        assert path is not None, "If `table` is a `str`, then `path` cannot be `None`."
        assert (
            spark_session is not None
        ), "If `table` is a `str`, then `spark_session` cannot be `None`."
        table = load_table(name=table, path=path, spark_session=spark_session)
    return table.history()

is_partitioned 🔗

is_partitioned(
    table: Union[str, DeltaTable],
    path: Optional[str] = None,
    spark_session: Optional[SparkSession] = None,
) -> bool

Summary

Check whether a given DeltaTable is partitioned.

Details

Under the hood, this function will retrieve the table details and check the partitionColumns attribute to determine if the table is partitioned.

Parameters:

Name Type Description Default
table Union[str, DeltaTable]

The table to check.
If it is a DeltaTable, then it will immediately use that.
If it is a str, then it will use that as the name of the table from where to load the DeltaTable from.

required
path Optional[str]

If table is a str, then path is mandatory, and is used as the path location for where to find the DeltaTable to load from.
Defaults to None.

None
spark_session Optional[SparkSession]

If table is str, then spark_session is mandatory. This is the SparkSession to use for loading the DeltaTable.
Defaults to None.

None

Raises:

Type Description
TypeError

If any of the inputs parsed to the parameters of this function are not the correct type. Uses the @typeguard.typechecked decorator.

AssertionError

If table is a str, then path and spark_session cannot be None.

Returns:

Type Description
bool

True if the table is partitioned, False otherwise.

See also
Source code in src/toolbox_pyspark/delta.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
@typechecked
def is_partitioned(
    table: Union[str, DeltaTable],
    path: Optional[str] = None,
    spark_session: Optional[SparkSession] = None,
) -> bool:
    """
    !!! note "Summary"
        Check whether a given [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) is partitioned.

    ???+ abstract "Details"
        Under the hood, this function will retrieve the table details and check the `partitionColumns` attribute to determine if the table is partitioned.

    Params:
        table (Union[str, DeltaTable]):
            The table to check.<br>
            If it is a [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable), then it will immediately use that.<br>
            If it is a `#!py str`, then it will use that as the name of the table from where to load the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) from.
        path (Optional[str], optional):
            If `table` is a `#!py str`, then `path` is mandatory, and is used as the `path` location for where to find the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) to load from.<br>
            Defaults to `#!py None`.
        spark_session (Optional[SparkSession], optional):
            If `table` is `#!py str`, then `spark_session` is mandatory. This is the [`SparkSession`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html) to use for loading the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable).<br>
            Defaults to `#!py None`.

    Raises:
        TypeError:
            If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
        AssertionError:
            If `table` is a `str`, then `path` and `spark_session` cannot be `None`.

    Returns:
        (bool):
            `#!py True` if the table is partitioned, `#!py False` otherwise.

    ??? tip "See also"
        - [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable)
        - [`DeltaTable.detail()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.detail)
    """
    if is_type(table, str):
        assert path is not None, "If `table` is a `str`, then `path` cannot be `None`."
        assert (
            spark_session is not None
        ), "If `table` is a `str`, then `spark_session` cannot be `None`."
        table = load_table(
            name=table,
            path=path,
            spark_session=spark_session,
        )
    return len(table.detail().select("partitionColumns").collect()[0][0]) > 0

get_partition_columns 🔗

get_partition_columns(
    table: Union[str, DeltaTable],
    path: Optional[str] = None,
    spark_session: Optional[SparkSession] = None,
) -> Optional[str_list]

Summary

Retrieve the partition columns for a given DeltaTable.

Details

Under the hood, this function will retrieve the table details and return the partitionColumns attribute if the table is partitioned.

Parameters:

Name Type Description Default
table Union[str, DeltaTable]

The table to check.
If it is a DeltaTable, then it will immediately use that.
If it is a str, then it will use that as the name of the table from where to load the DeltaTable from.

required
path Optional[str]

If table is a str, then path is mandatory, and is used as the path location for where to find the DeltaTable to load from.
Defaults to None.

None
spark_session Optional[SparkSession]

If table is str, then spark_session is mandatory. This is the SparkSession to use for loading the DeltaTable.
Defaults to None.

None

Raises:

Type Description
TypeError

If any of the inputs parsed to the parameters of this function are not the correct type. Uses the @typeguard.typechecked decorator.

AssertionError

If table is a str, then path and spark_session cannot be None.

Returns:

Type Description
Optional[str_list]

The list of partition columns if the table is partitioned, None otherwise.

See also
Source code in src/toolbox_pyspark/delta.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
@typechecked
def get_partition_columns(
    table: Union[str, DeltaTable],
    path: Optional[str] = None,
    spark_session: Optional[SparkSession] = None,
) -> Optional[str_list]:
    """
    !!! note "Summary"
        Retrieve the partition columns for a given [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable).

    ???+ abstract "Details"
        Under the hood, this function will retrieve the table details and return the `partitionColumns` attribute if the table is partitioned.

    Params:
        table (Union[str, DeltaTable]):
            The table to check.<br>
            If it is a [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable), then it will immediately use that.<br>
            If it is a `#!py str`, then it will use that as the name of the table from where to load the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) from.
        path (Optional[str], optional):
            If `table` is a `#!py str`, then `path` is mandatory, and is used as the `path` location for where to find the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) to load from.<br>
            Defaults to `#!py None`.
        spark_session (Optional[SparkSession], optional):
            If `table` is `#!py str`, then `spark_session` is mandatory. This is the [`SparkSession`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html) to use for loading the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable).<br>
            Defaults to `#!py None`.

    Raises:
        TypeError:
            If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
        AssertionError:
            If `table` is a `str`, then `path` and `spark_session` cannot be `None`.

    Returns:
        (Optional[str_list]):
            The list of partition columns if the table is partitioned, `#!py None` otherwise.

    ??? tip "See also"
        - [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable)
        - [`DeltaTable.detail()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.detail)
    """
    if is_type(table, str):
        assert path is not None, "If `table` is a `str`, then `path` cannot be `None`."
        assert (
            spark_session is not None
        ), "If `table` is a `str`, then `spark_session` cannot be `None`."
        table = load_table(name=table, path=path, spark_session=spark_session)
    if is_partitioned(table):
        return table.detail().select("partitionColumns").collect()[0][0]
    else:
        return None

optimise_table 🔗

optimise_table(
    table_name: str,
    table_path: str,
    spark_session: SparkSession,
    partition_cols: Optional[str_collection] = None,
    inspect: bool = False,
    return_result: bool = True,
    method: Literal["api", "sql"] = "api",
    conditional_where_clause: Optional[str] = None,
) -> Optional[psDataFrame]

Summary

Run the OPTIMIZE command over a DeltaTable table to ensure that it is structurally efficient.

Details

There are fundamentally two different ways in which this optimisation process can be achieved: by SQL or by API. Under the hood, both of these two methods will be implemented the same way, over the DeltaTable object, however the syntactic method to execute the optimisation allows for flexibility through either a Python API method or a SQL method.

Parameters:

Name Type Description Default
table_name str

The name of the table to be optimised. Must be a valid delta table, and must exist in the write_path location.

required
table_path str

The location for where the delta table is located.

required
spark_session SparkSession

The SparkSession to use for loading the table.

required
partition_cols Optional[Union[str, List[str]]]

The columns to be partitioned/clustered by.

  • If type list, then these elements will be added to the OPTIMIZE delta.`{table_path}/{table_name}` command, like this: OPTIMIZE delta.`{table_path}/{table_name}` ZORDER BY (col1, col2).
  • If type str, then will be coerced to list of 1 elements long, like: [partition_cols], then appended to the command, like: OPTIMIZE delta.`{table_path}/{table_name}` ZORDER BY (col1).
  • If None, then nothing will be added to the OPTIMIZE delta.`{table_path}/{table_name}` command.

Default: None.

None
inspect bool

For debugging. If True, then the OPTIMIZE command will be printed to the terminal.
Default: False.

False
return_result bool

For efficient handling of elements. If True, then the table created by the OPTIMIZE command will be returned from the function. Noting that this table will give the statistics of what/how the delta table is optimised.
Default: True.

True
method Literal['api', 'sql']

The method to use for the execution, either by api or sql.
Using api is preferred.
Default: "api".

'api'
conditional_where_clause Optional[str]

An optional conditional parameter to add to the command.
Any records matching this condition will be optimised; those not matching will not be optimised.
This is particularly useful for partitioned tables when you don't want to use ZORDER optimisation, or when you have huge tables.
Default: None.

None

Raises:

Type Description
TypeError

If any of the inputs parsed to the parameters of this function are not the correct type. Uses the @typeguard.typechecked decorator.

Returns:

Type Description
Union[DataFrame, None]

Either None or the statistics/details from the optimised delta table.

Notes
Important notes
  • For partition_cols:
    • If it is type list, then the OPTIMIZE delta.`{table_path}/{table_name}` command will be extended to include each element in the partition_cols list. Like this: OPTIMIZE delta.`{table_path}/{table_name}` ZORDER BY (col1, col2).
    • If partition_cols is a type str, then it will be coerced to a list of 1 elements, and then appended like mentioned above.
    • If partition_cols is None, then nothing will be added to the OPTIMIZE delta.`{table_path}/{table_name}` command.
  • For conditional_where_clause:
    • It must be a str.
    • It must be in the format: {column} {conditional} {value}.
    • For example: editdatetime >= '2023-09-01'
    • This will then be coerced in to the format: WHERE {where}.
    • And then appended to the overall SQL command like this: OPTIMIZE delta.`{table_path}/{table_name}` WHERE {where}.
The sql process

When method=="sql" then this process will:

  1. Take the table given by the param table_name.
  2. Build the SQL command using the values in the parameters partition_cols and conditional_where_clause.
  3. Will execute the OPTIMIZE delta.`{table_path}/{table_name}` WHERE {where} ZORDER BY {zorder} command over the new table.
  4. Optionally return the results.
The api process

When method=="api" then this process will:

  1. Take the table given by the param table_name.
  2. Build the partition columns when the partition_cols is not None.
  3. Load the DeltaOptimizeBuilder by using the syntax: table = DeltaTable.forPath(spark_session, f"{table_path}/{table_name}").optimize().
  4. Optionally add a where clause using the .where when conditional_where_clause is not None.
  5. Conditionally execute .executeZOrderBy when partition_cols is not None, or .executeCompaction otherwise.
References

For more information, please see:

See also
Source code in src/toolbox_pyspark/delta.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
@typechecked
def optimise_table(
    table_name: str,
    table_path: str,
    spark_session: SparkSession,
    partition_cols: Optional[str_collection] = None,
    inspect: bool = False,
    return_result: bool = True,
    method: Literal["api", "sql"] = "api",
    conditional_where_clause: Optional[str] = None,
) -> Optional[psDataFrame]:
    """
    !!! note "Summary"
        Run the `OPTIMIZE` command over a [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) table to ensure that it is structurally efficient.

    ???+ abstract "Details"
        There are fundamentally two different ways in which this optimisation process can be achieved: by SQL or by API. Under the hood, both of these two methods will be implemented the same way, over the [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable) object, however the syntactic method to execute the optimisation allows for flexibility through either a Python API method or a SQL method.

    Params:
        table_name (str):
            The name of the table to be optimised. Must be a valid `delta` table, and must exist in the `write_path` location.
        table_path (str):
            The location for where the `delta` table is located.<br>
        spark_session (SparkSession):
            The SparkSession to use for loading the table.
        partition_cols (Optional[Union[str, List[str]]], optional):
            The columns to be partitioned/clustered by.

            - If type `#!py list`, then these elements will be added to the ``#!sql OPTIMIZE delta.`{table_path}/{table_name}` `` command, like this: ``#!sql OPTIMIZE delta.`{table_path}/{table_name}` ZORDER BY (col1, col2)``.
            - If type `#!py str`, then will be coerced to list of 1 elements long, like: `[partition_cols]`, then appended to the command, like: ``#!sql OPTIMIZE delta.`{table_path}/{table_name}` ZORDER BY (col1)``.
            - If `#!py None`, then nothing will be added to the ``#!sql OPTIMIZE delta.`{table_path}/{table_name}` `` command.

            Default: `#!py None`.
        inspect (bool, optional):
            For debugging.
            If `#!py True`, then the `OPTIMIZE` command will be printed to the terminal.<br>
            Default: `#!py False`.
        return_result (bool, optional):
            For efficient handling of elements.
            If `#!py True`, then the table created by the `OPTIMIZE` command will be returned from the function.
            Noting that this table will give the statistics of what/how the `delta` table is optimised.<br>
            Default: `#!py True`.
        method (Literal["api", "sql"], optional):
            The method to use for the execution, either by `api` or `sql`.<br>
            Using `api` is preferred.<br>
            Default: `#!py "api"`.
        conditional_where_clause (Optional[str], optional):
            An optional conditional parameter to add to the command.<br>
            Any records matching this condition will be optimised; those not matching will not be optimised.<br>
            This is particularly useful for partitioned tables when you don't want to use ZORDER optimisation, or when you have huge tables.<br>
            Default: `#!py None`.

    Raises:
        TypeError:
            If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.

    Returns:
        (Union[psDataFrame, None]):
            Either `#!py None` or the statistics/details from the optimised delta table.

    ???+ info "Notes"
        ???+ info "Important notes"
            - For `partition_cols`:
                - If it is type `#!py list`, then the ``#!sql OPTIMIZE delta.`{table_path}/{table_name}` `` command will be extended to include each element in the `partition_cols` `#!py list`. Like this: ``#!sql OPTIMIZE delta.`{table_path}/{table_name}` ZORDER BY (col1, col2)``.
                - If `partition_cols` is a type `#!py str`, then it will be coerced to a list of 1 elements, and then appended like mentioned above.
                - If `partition_cols` is `#!py None`, then nothing will be added to the ``#!sql OPTIMIZE delta.`{table_path}/{table_name}` `` command.
            - For `conditional_where_clause`:
                - It must be a `#!py str`.
                - It must be in the format: `#!sql {column} {conditional} {value}`.
                - For example: `#!sql editdatetime >= '2023-09-01'`
                - This will then be coerced in to the format: `#!sql WHERE {where}`.
                - And then appended to the overall SQL command like this: ``#!sql OPTIMIZE delta.`{table_path}/{table_name}` WHERE {where}``.
        ???+ info "The `sql` process"
            When `#!py method=="sql"` then this process will:

            1. Take the table given by the param `table_name`.
            1. Build the SQL command using the values in the parameters `partition_cols` and `conditional_where_clause`.
            1. Will execute the ``#!sql OPTIMIZE delta.`{table_path}/{table_name}` WHERE {where} ZORDER BY {zorder}`` command over the new table.
            1. Optionally return the results.
        ???+ info "The `api` process"
            When `#!py method=="api"` then this process will:

            1. Take the table given by the param `table_name`.
            1. Build the partition columns when the `partition_cols` is not `#!py None`.
            1. Load the [`DeltaOptimizeBuilder`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaOptimizeBuilder) by using the syntax: `#!py table = DeltaTable.forPath(spark_session, f"{table_path}/{table_name}").optimize()`.
            1. Optionally add a where clause using the [`.where`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaOptimizeBuilder.where) when `conditional_where_clause` is not `#!py None`.
            1. Conditionally execute [`.executeZOrderBy`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaOptimizeBuilder.executeZOrderBy) when `partition_cols` is not `#!py None`, or [`.executeCompaction`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaOptimizeBuilder.executeCompaction) otherwise.

    ??? question "References"
        For more information, please see:

        - https://docs.azuredatabricks.net/_static/notebooks/delta/optimize-python.html
        - https://medium.com/@debusinha2009/cheatsheet-on-understanding-zorder-and-optimize-for-your-delta-tables-1556282221d3
        - https://www.cloudiqtech.com/partition-optimize-and-zorder-delta-tables-in-azure-databricks/
        - https://docs.databricks.com/delta/optimizations/file-mgmt.html
        - https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-optimize.html
        - https://stackoverflow.com/questions/65320949/parquet-vs-delta-format-in-azure-data-lake-gen-2-store?_sm_au_=iVV4WjsV0q7WQktrJfsTkK7RqJB10
        - https://www.i-programmer.info/news/197-data-mining/12582-databricks-delta-adds-faster-parquet-import.html#:~:text=Databricks%20says%20Delta%20is%2010,data%20management%2C%20and%20query%20serving.

    ??? tip "See also"
        - [`SparkSession`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html)
        - [`SparkSession.sql()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sql.html)
        - [pyspark.sql.DataFrame](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)
        - [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable)
        - [`DeltaTable.forPath()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.forPath)
        - [`DeltaTable.optimize()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.optimize)
        - [`DeltaTable.executeZOrderBy()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.executeZOrderBy)
        - [`DeltaTable.executeCompaction()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.executeCompaction)
    """
    if method == "api":
        return _optimise_table_api(
            table_name=table_name,
            table_path=table_path,
            spark_session=spark_session,
            partition_cols=partition_cols,
            inspect=inspect,
            return_result=return_result,
            conditional_where_clause=conditional_where_clause,
        )
    elif method == "sql":
        return _optimise_table_sql(
            table_name=table_name,
            table_path=table_path,
            spark_session=spark_session,
            partition_cols=partition_cols,
            inspect=inspect,
            return_result=return_result,
            conditional_where_clause=conditional_where_clause,
        )

retry_optimise_table 🔗

retry_optimise_table(
    table_name: str,
    table_path: str,
    spark_session: SparkSession,
    partition_cols: Optional[str_collection] = None,
    inspect: bool = False,
    return_result: bool = True,
    method: Literal["api", "sql"] = "api",
    conditional_where_clause: Optional[str] = None,
    retry_exceptions: Union[
        type[Exception],
        list[Type[Exception]],
        tuple[Type[Exception], ...],
    ] = Exception,
    retry_attempts: int = 10,
) -> Optional[psDataFrame]

Summary

Retry the execution of optimise_table a number of times when a given error exception is met.

Details

Particularly useful for when you are trying to run this optimisation over a cluster, and when parallelisaiton is causing multiple processes to occur over the same DeltaTable at the same time.

For more info on the Retry process, see: stamina.retry().

Parameters:

Name Type Description Default
table_name str

The name of the table to be optimised. Must be a valid delta table, and must exist in the write_path location.

required
table_path str

The location for where the delta table is located.

required
spark_session SparkSession

The SparkSession to use for loading the table.

required
partition_cols Optional[Union[str, List[str]]]

The columns to be partitioned/clustered by.

  • If type list, then these elements will be added to the OPTIMIZE delta.`{table_path}/{table_name}` command, like this: OPTIMIZE delta.`{table_path}/{table_name}` ZORDER BY (col1, col2).
  • If type str, then will be coerced to list of 1 elements long, like: [partition_cols], then appended to the command, like: OPTIMIZE delta.`{table_path}/{table_name}` ZORDER BY (col1).
  • If None, then nothing will be added to the OPTIMIZE delta.`{table_path}/{table_name}` command.

Default: None.

None
inspect bool

For debugging. If True, then the OPTIMIZE command will be printed to the terminal.
Default: False.

False
return_result bool

For efficient handling of elements. If True, then the table created by the OPTIMIZE command will be returned from the function. Noting that this table will give the statistics of what/how the delta table is optimised.
Default: True.

True
method Literal['api', 'sql']

The method to use for the execution, either by api or sql.
Using api is preferred.
Default: "api".

'api'
conditional_where_clause Optional[str]

An optional conditional parameter to add to the command.
Any records matching this condition will be optimised; those not matching will not be optimised.
This is particularly useful for partitioned tables when you don't want to use ZORDER optimisation, or when you have huge tables.
Default: None.

None
retry_exceptions Union[Type[Exception], List[Type[Exception]], Tuple[Type[Exception], ...]]

A given single or collection of expected exceptions for which to catch and retry for.
Defaults to Exception.

Exception
retry_attempts int

The number of retries to attempt. If the underlying process is still failing after this number of attempts, then throw a hard error and alert the user.
Defaults to 10.

10

Raises:

Type Description
TypeError

If any of the inputs parsed to the parameters of this function are not the correct type. Uses the @typeguard.typechecked decorator.

Returns:

Type Description
Union[DataFrame, None]

Either None or the statistics/details from the optimised delta table.

See also
Source code in src/toolbox_pyspark/delta.py
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
@typechecked
def retry_optimise_table(
    table_name: str,
    table_path: str,
    spark_session: SparkSession,
    partition_cols: Optional[str_collection] = None,
    inspect: bool = False,
    return_result: bool = True,
    method: Literal["api", "sql"] = "api",
    conditional_where_clause: Optional[str] = None,
    retry_exceptions: Union[
        type[Exception],
        list[Type[Exception]],
        tuple[Type[Exception], ...],
    ] = Exception,
    retry_attempts: int = 10,
) -> Optional[psDataFrame]:
    """
    !!! note "Summary"
        Retry the execution of [`optimise_table`][toolbox_pyspark.delta.optimise_table] a number of times when a given error exception is met.

    ???+ abstract "Details"

        Particularly useful for when you are trying to run this optimisation over a cluster, and when parallelisaiton is causing multiple processes to occur over the same DeltaTable at the same time.

        For more info on the Retry process, see: [`stamina.retry()`](https://stamina.hynek.me/en/stable/).

    Params:
        table_name (str):
            The name of the table to be optimised. Must be a valid `delta` table, and must exist in the `write_path` location.

        table_path (str):
            The location for where the `delta` table is located.

        spark_session (SparkSession):
            The SparkSession to use for loading the table.

        partition_cols (Optional[Union[str, List[str]]], optional):
            The columns to be partitioned/clustered by.

            - If type `#!py list`, then these elements will be added to the ``#!sql OPTIMIZE delta.`{table_path}/{table_name}` `` command, like this: ``#!sql OPTIMIZE delta.`{table_path}/{table_name}` ZORDER BY (col1, col2)``.
            - If type `#!py str`, then will be coerced to list of 1 elements long, like: `[partition_cols]`, then appended to the command, like: ``#!sql OPTIMIZE delta.`{table_path}/{table_name}` ZORDER BY (col1)``.
            - If `#!py None`, then nothing will be added to the ``#!sql OPTIMIZE delta.`{table_path}/{table_name}` `` command.

            Default: `#!py None`.

        inspect (bool, optional):
            For debugging.
            If `#!py True`, then the `OPTIMIZE` command will be printed to the terminal.<br>
            Default: `#!py False`.

        return_result (bool, optional):
            For efficient handling of elements.
            If `#!py True`, then the table created by the `OPTIMIZE` command will be returned from the function.
            Noting that this table will give the statistics of what/how the `delta` table is optimised.<br>
            Default: `#!py True`.

        method (Literal["api", "sql"], optional):
            The method to use for the execution, either by `api` or `sql`.<br>
            Using `api` is preferred.<br>
            Default: `#!py "api"`.

        conditional_where_clause (Optional[str], optional):
            An optional conditional parameter to add to the command.<br>
            Any records matching this condition will be optimised; those not matching will not be optimised.<br>
            This is particularly useful for partitioned tables when you don't want to use ZORDER optimisation, or when you have huge tables.<br>
            Default: `#!py None`.

        retry_exceptions (Union[ Type[Exception], List[Type[Exception]], Tuple[Type[Exception], ...], ], optional):
            A given single or collection of expected exceptions for which to catch and retry for.<br>
            Defaults to `#!py Exception`.

        retry_attempts (int, optional):
            The number of retries to attempt. If the underlying process is still failing after this number of attempts, then throw a hard error and alert the user.<br>
            Defaults to `#!py 10`.

    Raises:
        TypeError:
            If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.

    Returns:
        (Union[psDataFrame, None]):
            Either `#!py None` or the statistics/details from the optimised delta table.

    ??? tip "See also"
        - [`stamina.retry()`](https://stamina.hynek.me/en/stable/)
        - [`optimise_table()`][toolbox_pyspark.delta.optimise_table]
        - [`SparkSession`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html)
        - [`SparkSession.sql()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sql.html)
        - [pyspark.sql.DataFrame](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)
        - [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable)
        - [`DeltaTable.forPath()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.forPath)
        - [`DeltaTable.optimize()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.optimize)
        - [`DeltaTable.executeZOrderBy()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.executeZOrderBy)
        - [`DeltaTable.executeCompaction()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.executeCompaction)
    """

    @retry(
        on=((*retry_exceptions,) if isinstance(retry_exceptions, list) else retry_exceptions),
        attempts=retry_attempts,
    )
    @typechecked
    def _retry_optimise_table(
        table_name: str,
        table_path: str,
        spark_session: SparkSession,
        partition_cols: Optional[str_collection] = None,
        inspect: bool = False,
        return_result: bool = True,
        method: Literal["api", "sql"] = "api",
        conditional_where_clause: Optional[str] = None,
    ) -> Optional[psDataFrame]:
        return optimise_table(
            table_name=table_name,
            table_path=table_path,
            spark_session=spark_session,
            partition_cols=partition_cols,
            inspect=inspect,
            return_result=return_result,
            method=method,
            conditional_where_clause=conditional_where_clause,
        )

    return _retry_optimise_table(
        table_name=table_name,
        table_path=table_path,
        spark_session=spark_session,
        partition_cols=partition_cols,
        inspect=inspect,
        return_result=return_result,
        method=method,
        conditional_where_clause=conditional_where_clause,
    )

merge_spark_to_delta 🔗

merge_spark_to_delta(
    from_table: psDataFrame,
    to_table_name: str,
    to_table_path: str,
    matching_keys: Optional[str_collection] = None,
    from_keys: Optional[str_collection] = None,
    to_keys: Optional[str_collection] = None,
    partition_keys: Optional[str_dict] = None,
    editdate_col_name: Optional[str] = "editdate",
    delete_unmatched_rows: Optional[bool] = False,
    enable_automatic_schema_evolution: Optional[
        bool
    ] = False,
    return_merge_metrics: Optional[bool] = False,
) -> Union[bool, psDataFrame]

Summary

Take one PySpark DataFrame from_table, and merge it with another DeltaTable at location: to_table_path/to_table_name.

Parameters:

Name Type Description Default
from_table DataFrame

The PySpark table. Data will be merged FROM here.

required
to_table_name str

The name of the Delta table. Data will be merged TO here.

required
to_table_path str

The location where the target Delta table can be found.

required
matching_keys Optional[Union[str, List[str], Tuple[str, ...], Set[str]]]

The list of matching columns between both the Spark table and the Delta table.
If this is parsed in as a str type, then it will be coerced to a list like: [matching_keys].
If this is not provided, then BOTH the from_keys and the to_keys must be provided.
Defaults to None.

None
from_keys Optional[Union[str, List[str], Tuple[str, ...], Set[str]]]

The list of keys on the from_table to use in the join.
If this is parsed in as a str type, then it will be coerced to a list like: [from_keys].
Only necessary when matching_keys is None. When provided, the length must be the same as the to_keys.
Defaults to None.

None
to_keys Optional[Union[str, List[str], Tuple[str, ...], Set[str]]]

The list of keys on the to_table to use in the join.
If this is parsed in as a str type, then it will be coerced to a list like: [to_keys].
Only necessary when matching_keys is None. When provided, the length must be the same as the from_keys.
Defaults to None.

None
partition_keys Optional[Dict[str, str]]

The keys and values that the to_table is partitioned by.
This is to improve (Concurrency Control)[https://docs.delta.io/latest/concurrency-control.html] while performing the merges.
If provided, it will enhance the internal join_keys variable to add new clauses for each column and value provided, to ensure it is explicit and direct.
If provided, it must be a dict, where the keys are the columns and the values are the specific partition to use.
For example, if partition_keys is {'SYSTEM':'OWAU','WHSEID':'BNE04'}, then the join_keys will be enhanced to add ... and TRG.SYSTEM='OWAU' and TRG.WHSEID='BNE04'. Which will then execute where the partition forSYSTEMwill _only_ implement for theOWAUvalue, and same forWHSEID. Defaults to#!py None`.

None
editdate_col_name Optional[str]

The column to use for the editdate field, in case any table uses a different name for this field.
If not provided (as in, the value None is parsed to this parameter), then this function will not implement any conditional logic during the .whenMatchedUpdateAll() method.
Defaults to "editdate".

'editdate'
delete_unmatched_rows Optional[bool]

Whether or not to DELETE rows on the target table which are existing on the target but missing from the source tables.
This should be used if you want to clean the target table and delete any rows which have already been deleted from the source table.
If True, then this function will implement the method .whenNoMatchedBySourceDelete() method, with no conditionals.
Defaults to False.

False
enable_automatic_schema_evolution Optional[bool] False
return_merge_metrics Optional[bool]

Set to True if you want to return the Merge metrics from this function.
If False, it will only return the value: True.
Defaults to False.

False

Returns:

Type Description
Union[bool, DataFrame]

Will return either:

  • If return_merge_metrics is True: Will return the Merge metrics, which is calculated by:
    1. Extracting the history from DeltaTable (at the to_table_path location),
    2. Coercing that history object to a pyspark DataFrame,
    3. Filtering to only extract the MERGE operations,
    4. Limiting to the top 1 lines, which is the most recent info.
  • If return_merge_metrics is False: The value True is returned when the function runs successfully.

If an error is thrown, then obviously it will not reach this far. Unfortunately, the DeltaTable Merge process does not return any data or statistics from it's execution... So therefore, we need to use the DeltaTable history to fetch the metrics. For more info, see: Show key metrics after running .merge(...)....execute()

Raises:

Type Description
TypeError

If any of the inputs parsed to the parameters of this function are not the correct type. Uses the @typeguard.typechecked decorator.

AttributeError
  • If any of matching_keys do not exist in the Spark table
  • If any of matching_keys do not exist in the Delta table
  • If any of from_keys do not exist in the Spark table
  • If any of to_keys do not exist in the Delta table
AssertionError
  • If matching_keys is None AND from_keys is None
  • If matching_keys is None AND to_keys is None
  • If length of from_keys does not match the length of to_keys
Notes

The main objective of this function is to:

  1. For any records existing in Spark but missing in Delta, then INSERT those records from Spark to Delta. Using the .whenNotMatchedInsertAll() method.
  2. For any records existing in both Spark and Delta, check if they have been updated in Spark and if so then UPDATE those matching records in the Delta. Using the .whenMatchedUpdateAll() method.
  3. Conditionally, check whether or not to actually apply #2 above by comparing the editdate_col_name field between the two tables.

Note:

  1. The from_keys and the to_keys will logically be the same values MOST of the time.
    • Very rarely will they ever be different; however, they are added here as separate parameters to facilitate this future functionality.
  2. If from_keys and to_keys are type list, then their length must be the same.
  3. Conditional logic is applied during the .whenMatchedUpdateAll() method to avoid re-updating data in the Delta location which has actually updated from the SpSpark table.
  4. There is an additional ifnull() conditional check added to the .whenMatchedUpdateAll() method for converting any values in the target table to timestamp(0) when their value is actually null.
    • The history to this check is that when these data were originally added to BigDaS, the column EditDate did not exist.
    • Therefore, when they were first inserted, all the values in EditDate were null.
    • As time progressed, the records have slowly been updating, and therefore the EditDate values have been changing.
    • Due to nuances and semantics around how Spark handles null values, whenever this previous check was run including columns with values null, it would inevitably return null.
    • As such, these rows were not identified as able to be matched, therefore the optimiser skipped them.
    • However, we actually did want them to be matched; because the rows had actually been updated on the source table.
    • Therefore, we add this ifnull() check to capture this edge case, and then push through and update the record on the target table.
  5. The parameter enable_automatic_schema_evolution was added because it is possible for the upstream tables to be adding new columns as they evolve. Therefore, it is necessary for this function to handle schema evolution automatically.
References
See also
Source code in src/toolbox_pyspark/delta.py
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
@typechecked
def merge_spark_to_delta(
    from_table: psDataFrame,
    to_table_name: str,
    to_table_path: str,
    matching_keys: Optional[str_collection] = None,
    from_keys: Optional[str_collection] = None,
    to_keys: Optional[str_collection] = None,
    partition_keys: Optional[str_dict] = None,
    editdate_col_name: Optional[str] = "editdate",
    delete_unmatched_rows: Optional[bool] = False,
    enable_automatic_schema_evolution: Optional[bool] = False,
    return_merge_metrics: Optional[bool] = False,
) -> Union[bool, psDataFrame]:
    """
    !!! note "Summary"
        Take one PySpark DataFrame `from_table`, and merge it with another DeltaTable at location: `to_table_path`/`to_table_name`.

    Params:
        from_table (psDataFrame):
            The PySpark table. Data will be merged FROM here.
        to_table_name (str):
            The name of the Delta table. Data will be merged TO here.
        to_table_path (str):
            The location where the target Delta table can be found.
        matching_keys (Optional[Union[str, List[str], Tuple[str, ...], Set[str]]], optional):
            The list of matching columns between both the Spark table and the Delta table.<br>
            If this is parsed in as a `#!py str` type, then it will be coerced to a list like: `[matching_keys]`.<br>
            If this is not provided, then BOTH the `from_keys` and the `to_keys` must be provided.<br>
            Defaults to `#!py None`.
        from_keys (Optional[Union[str, List[str], Tuple[str, ...], Set[str]]], optional):
            The list of keys on the `from_table` to use in the join.<br>
            If this is parsed in as a `#!py str` type, then it will be coerced to a list like: `[from_keys]`.<br>
            Only necessary when `matching_keys` is `#!py None`. When provided, the length must be the same as the `to_keys`.<br>
            Defaults to `#!py None`.
        to_keys (Optional[Union[str, List[str], Tuple[str, ...], Set[str]]], optional):
            The list of keys on the `to_table` to use in the join.<br>
            If this is parsed in as a `#!py str` type, then it will be coerced to a list like: `[to_keys]`.<br>
            Only necessary when `matching_keys` is `#!py None`. When provided, the length must be the same as the `from_keys`.<br>
            Defaults to `#!py None`.
        partition_keys (Optional[Dict[str, str]], optional):
            The keys and values that the `to_table` is partitioned by.<br>
            This is to improve (Concurrency Control)[https://docs.delta.io/latest/concurrency-control.html] while performing the merges.<br>
            If provided, it will enhance the internal `join_keys` variable to add new clauses for each column and value provided, to ensure it is explicit and direct.<br>
            If provided, it must be a `#!py dict`, where the keys are the columns and the values are the specific partition to use.<br>
            For example, if `partition_keys` is `{'SYSTEM':'OWAU','WHSEID':'BNE04'}`, then the `join_keys` will be enhanced to add `... and TRG.SYSTEM='OWAU' and TRG.WHSEID='BNE04'. Which will then execute where the partition for `SYSTEM` will _only_ implement for the `OWAU` value, and same for `WHSEID`.
            Defaults to `#!py None`.
        editdate_col_name (Optional[str], optional):
            The column to use for the `editdate` field, in case any table uses a different name for this field.<br>
            If not provided (as in, the value `#!py None` is parsed to this parameter), then this function will not implement any conditional logic during the [`.whenMatchedUpdateAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdateAll) method.<br>
            Defaults to `#!py "editdate"`.
        delete_unmatched_rows (Optional[bool], optional):
            Whether or not to **DELETE** rows on the _target_ table which are existing on the _target_ but missing from the _source_ tables.<br>
            This should be used if you want to clean the target table and delete any rows which have already been deleted from the source table.<br>
            If `#!py True`, then this function will implement the method [`.whenNoMatchedBySourceDelete()`](https://docs.delta.io/latest/api/python/spark/index.html#delta.tables.DeltaMergeBuilder.whenNotMatchedBySourceDelete) method, with no conditionals.<br>
            Defaults to `#!py False`.
        enable_automatic_schema_evolution (Optional[bool], optional):
            Optional parameter for whether or not to automatically update the downstream `delta` table schema.<br>
            As documented extensively elsewhere:

            - https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge
            - https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html
            - https://www.databricks.com/blog/2020/05/19/schema-evolution-in-merge-operations-and-operational-metrics-in-delta-lake.html
            - https://towardsdatascience.com/delta-lake-automatic-schema-evolution-11d32bd1aa99

            Defaults to `#!py False`.
        return_merge_metrics (Optional[bool], optional):
            Set to `#!py True` if you want to return the Merge metrics from this function.<br>
            If `#!py False`, it will only return the value: `#!py True`.<br>
            Defaults to `#!py False`.

    Returns:
        (Union[bool, psDataFrame]):
            Will return either:

            - If `return_merge_metrics` is `#!py True`: Will return the Merge metrics, which is calculated by:
                1. Extracting the history from DeltaTable (at the `to_table_path` location),
                1. Coercing that history object to a `pyspark` DataFrame,
                1. Filtering to only extract the `#!sql MERGE` operations,
                1. Limiting to the top `#!py 1` lines, which is the most recent info.
            - If `return_merge_metrics` is `#!py False`: The value `#!py True` is returned when the function runs successfully.

            If an error is thrown, then obviously it will not reach this far.
            Unfortunately, the DeltaTable Merge process does not return any data or statistics from it's execution... So therefore, we need to use the DeltaTable history to fetch the metrics. For more info, see: [Show key metrics after running `.merge(...)....execute()`](https://github.com/delta-io/delta/issues/1361)

    Raises:
        TypeError:
            If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
        AttributeError:
            - If any of `matching_keys` do not exist in the Spark table
            - If any of `matching_keys` do not exist in the Delta table
            - If any of `from_keys` do not exist in the Spark table
            - If any of `to_keys` do not exist in the Delta table
        AssertionError:
            - If `matching_keys` is None AND `from_keys` is None
            - If `matching_keys` is None AND `to_keys` is None
            - If length of `from_keys` does not match the length of `to_keys`

    ???+ info "Notes"
        The main objective of this function is to:

        1. For any records _existing_ in Spark but _missing_ in Delta, then INSERT those records from Spark to Delta. Using the [`.whenNotMatchedInsertAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenNotMatchedInsertAll) method.
        1. For any records _existing_ in both Spark and Delta, check if they have been _updated_ in Spark and if so then UPDATE those matching records in the Delta. Using the [`.whenMatchedUpdateAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdateAll) method.
        1. Conditionally, check whether or not to actually apply #2 above by comparing the `editdate_col_name` field between the two tables.

        Note:

        1. The `from_keys` and the `to_keys` will logically be the same values MOST of the time.
            - Very rarely will they ever be different; however, they are added here as separate parameters to facilitate this future functionality.
        1. If `from_keys` and `to_keys` are type `#!py list`, then their length must be the same.
        1. Conditional logic is applied during the [`.whenMatchedUpdateAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdateAll) method to avoid re-updating data in the Delta location which has actually updated from the SpSpark table.
        1. There is an additional `#!sql ifnull()` conditional check added to the [`.whenMatchedUpdateAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdateAll) method for converting any values in the _target_ table to `#!py timestamp(0)` when their value is actually `#!sql null`.
            - The history to this check is that when these data were originally added to BigDaS, the column `EditDate` did not exist.
            - Therefore, when they were first inserted, all the values in `EditDate` were `#!sql null`.
            - As time progressed, the records have slowly been updating, and therefore the `EditDate` values have been changing.
            - Due to nuances and semantics around how Spark handles `null` values, whenever this previous check was run including columns with values `#!sql null`, it would inevitably return `#!sql null`.
            - As such, these rows were not identified as able to be matched, therefore the optimiser skipped them.
            - However, we actually did want them to be matched; because the rows had actually been updated on the _source_ table.
            - Therefore, we add this `#!sql ifnull()` check to capture this edge case, and then push through and update the record on the _target_ table.
        1. The parameter `enable_automatic_schema_evolution` was added because it is possible for the upstream tables to be adding new columns as they evolve. Therefore, it is necessary for this function to handle schema evolution automatically.

    ???+ question "References"
        - https://docs.databricks.com/delta/delta-update.html#language-python
        - https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge&language-python
        - https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder
        - https://spark.apache.org/docs/3.0.0-preview/sql-ref-null-semantics.html
        - https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge

    ???+ tip "See also"
        - [`load_table()`][toolbox_pyspark.delta.load_table]
        - [`assert_columns_exists()`][toolbox_pyspark.delta.assert_columns_exists]
        - [`get_columns()`][toolbox_pyspark.delta.get_columns]
        - [`SparkSession`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html)
        - [`SparkSession.sql()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sql.html)
        - [`DeltaMergeBuilder`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder)
        - [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable)
        - [`DeltaTable.history()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.history)
    """

    # Set up
    SRC_ALIAS = "src"
    TRG_ALIAS = "dlt"
    spark_session: SparkSession = from_table.sparkSession

    # Enable automatic Schema Evolution
    if enable_automatic_schema_evolution:
        current_conf = spark_session.conf.get(
            "spark.databricks.delta.schema.autoMerge.enabled",
        )
        _ = spark_session.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true")

    # Define target table
    to_table = load_table(
        name=to_table_name,
        path=to_table_path,
        spark_session=spark_session,
    )

    # Check keys
    if matching_keys is not None:
        matching_keys = get_columns(from_table, matching_keys)
        assert_columns_exists(from_table, matching_keys, match_case=False)
        assert_columns_exists(to_table.toDF(), matching_keys, match_case=False)
        join_keys: str = " and ".join(
            [f"{SRC_ALIAS}.{key}={TRG_ALIAS}.{key}" for key in matching_keys]
        )
    else:
        assert from_keys is not None, f"Cannot be `None`: '{from_keys=}'"
        assert to_keys is not None, f"Cannot be `None`: '{to_keys=}'"
        from_keys = get_columns(from_table, from_keys)
        to_keys = get_columns(to_table.toDF(), to_keys)
        if not len(from_keys) == len(to_keys):
            raise ValueError(f"`from_keys` & `to_keys` must be the same length.")
        assert_columns_exists(from_table, from_keys, match_case=False)
        assert_columns_exists(to_table.toDF(), to_keys, match_case=False)
        combined_keys = zip(from_keys, to_keys)
        for from_key, to_key in combined_keys:
            assert from_key == to_key, f"Must be same: '{from_key=}' & '{to_key=}'"
        join_keys = " and ".join(
            [
                f"{SRC_ALIAS}.{from_key}={TRG_ALIAS}.{to_key}"
                for from_key, to_key in combined_keys
            ]
        )
    if partition_keys is not None:
        assert_columns_exists(to_table.toDF(), list(partition_keys.keys()))
        # TODO: Add a check for the `values`??
        for key, value in partition_keys.items():
            join_keys += f" and {TRG_ALIAS}.{key}='{value}'"

    # Run
    merger: DeltaMergeBuilder = (
        to_table.alias(TRG_ALIAS)
        .merge(
            source=from_table.alias(SRC_ALIAS),
            condition=join_keys,
        )
        .whenMatchedUpdateAll(
            condition=(
                None
                if editdate_col_name is None
                else f"ifnull({TRG_ALIAS}.{editdate_col_name}, timestamp(0))<{SRC_ALIAS}.{editdate_col_name}"
            )
        )
        .whenNotMatchedInsertAll()
    )
    if delete_unmatched_rows:
        merger = merger.whenNotMatchedBySourceDelete()
    merger.execute()

    # Return settings
    if enable_automatic_schema_evolution and current_conf is not None:
        _ = spark_session.conf.set(
            "spark.databricks.delta.schema.autoMerge.enabled",
            current_conf,
        )

    # Return
    if return_merge_metrics:
        return (
            to_table.history()
            .filter("operation='MERGE'")
            .limit(1)
            .select(
                "version",
                "timestamp",
                "operation",
                "operationParameters",
                "operationMetrics",
            )
        )
    else:
        return True

merge_delta_to_delta 🔗

merge_delta_to_delta(
    from_table_name: str,
    from_table_path: str,
    to_table_name: str,
    to_table_path: str,
    spark_session: SparkSession,
    matching_keys: str_collection,
    partition_keys: Optional[str_dict] = None,
    editdate_col_name: Optional[str] = "editdate",
    delete_unmatched_rows: Optional[bool] = False,
    enable_automatic_schema_evolution: Optional[
        bool
    ] = False,
    return_merge_metrics: Optional[bool] = False,
) -> Union[bool, psDataFrame]

Summary

Take one DeltaTable at locationfrom_table_path/from_table_name, and merge it with another DeltaTable at location: to_table_path/to_table_name.

Details

This function is fundamentally the same as the merge_spark_to_delta() function, except it defines the from_table as a DeltaTable instead of a Spark DataFrame.

Parameters:

Name Type Description Default
from_table_name str

The name of the Delta table. Data will be merged FROM here.

required
from_table_path str

The location where the source Delta table can be found.

required
to_table_name str

The name of the Delta table. Data will be merged TO here.

required
to_table_path str

The location where the target Delta table can be found.

required
spark_session SparkSession

The Spark session to use for the merging.

required
matching_keys Union[str, List[str], Tuple[str, ...]]

The list of matching columns between both the Spark table and the Delta table.
If this is parsed in as a str type, then it will be coerced to a list like: [matching_keys].
If this is not provided, then BOTH the from_keys and the to_keys must be provided.
Defaults to None.

required
editdate_col_name Optional[str]

The column to use for the editdate field, in case any table uses a different name for this field.
If not provided (as in, the value None is parsed to this parameter), then this function will not implement any conditional logic during the .whenMatchedUpdateAll() method.
Defaults to "editdate".

'editdate'
delete_unmatched_rows Optional[bool]

Whether or not to DELETE rows on the target table which are existing on the target but missing from the source tables.
This should be used if you want to clean the target table and delete any rows which have already been deleted from the source table.
If True, then this function will implement the method .whenNoMatchedBySourceDelete() method, with no conditionals.
Defaults to False.

False
enable_automatic_schema_evolution Optional[bool] False
return_merge_metrics Optional[bool]

Set to True if you want to return the Merge metrics from this function.
If False, it will only return the value: True.
Defaults to False.

False

Returns:

Type Description
Union[bool, DataFrame]

Will return either:

  • If return_merge_metrics is True: Will return the Merge metrics, which is calculated by:
    1. Extracting the history from DeltaTable (at the to_table_path location),
    2. Coercing that history object to a pyspark DataFrame,
    3. Filtering to only extract the MERGE operations,
    4. Limiting to the top 1 lines, which is the most recent info.
  • If return_merge_metrics is False: The value True is returned when the function runs successfully.

If an error is thrown, then obviously it will not reach this far. Unfortunately, the DeltaTable Merge process does not return any data or statistics from it's execution... So therefore, we need to use the DeltaTable history to fetch the metrics. For more info, see: Show key metrics after running .merge(...)....execute()

Raises:

Type Description
TypeError

If any of the inputs parsed to the parameters of this function are not the correct type. Uses the @typeguard.typechecked decorator.

AttributeError
  • If any of matching_keys do not exist in the Spark table
  • If any of matching_keys do not exist in the Delta table
  • If any of from_keys do not exist in the Spark table
  • If any of to_keys do not exist in the Delta table
AssertionError
  • If matching_keys is None AND from_keys is None
  • If matching_keys is None AND to_keys is None
  • If length of from_keys does not match the length of to_keys
Notes

The main objective of this function is to:

  1. For any records existing in Spark but missing in Delta, then INSERT those records from Spark to Delta. Using the .whenNotMatchedInsertAll() method.
  2. For any records existing in both Spark and Delta, check if they have been updated in Spark and if so then UPDATE those matching records in the Delta. Using the .whenMatchedUpdateAll() method.
  3. Conditionally, check whether or not to actually apply #2 above by comparing the editdate_col_name field between the two tables.

Note:

  1. The from_keys and the to_keys will logically be the same values MOST of the time.
    • Very rarely will they ever be different; however, they are added here as separate parameters to facilitate this future functionality.
  2. If from_keys and to_keys are type list, then their length must be the same.
  3. Conditional logic is applied during the .whenMatchedUpdateAll() method to avoid re-updating data in the Delta location which has actually updated from the SpSpark table.
  4. There is an additional ifnull() conditional check added to the .whenMatchedUpdateAll() method for converting any values in the target table to timestamp(0) when their value is actually null.
    • The history to this check is that when these data were originally added to BigDaS, the column EditDate did not exist.
    • Therefore, when they were first inserted, all the values in EditDate were null.
    • As time progressed, the records have slowly been updating, and therefore the EditDate values have been changing.
    • Due to nuances and semantics around how Spark handles null values, whenever this previous check was run including columns with values null, it would inevitably return null.
    • As such, these rows were not identified as able to be matched, therefore the optimiser skipped them.
    • However, we actually did want them to be matched; because the rows had actually been updated on the source table.
    • Therefore, we add this ifnull() check to capture this edge case, and then push through and update the record on the target table.
  5. The parameter enable_automatic_schema_evolution was added because it is possible for the upstream tables to be adding new columns as they evolve. Therefore, it is necessary for this function to handle schema evolution automatically.
References
See Also
Source code in src/toolbox_pyspark/delta.py
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
@typechecked
def merge_delta_to_delta(
    from_table_name: str,
    from_table_path: str,
    to_table_name: str,
    to_table_path: str,
    spark_session: SparkSession,
    matching_keys: str_collection,
    partition_keys: Optional[str_dict] = None,
    editdate_col_name: Optional[str] = "editdate",
    delete_unmatched_rows: Optional[bool] = False,
    enable_automatic_schema_evolution: Optional[bool] = False,
    return_merge_metrics: Optional[bool] = False,
) -> Union[bool, psDataFrame]:
    """
    !!! note "Summary"
        Take one DeltaTable at location`from_table_path`/`from_table_name`, and merge it with another DeltaTable at location: `to_table_path`/`to_table_name`.

    ???+ abstract "Details"
        This function is fundamentally the same as the [`merge_spark_to_delta()`][toolbox_pyspark.delta.merge_spark_to_delta] function, except it defines the `from_table` as a DeltaTable instead of a Spark DataFrame.

    Params:
        from_table_name (str):
            The name of the Delta table. Data will be merged FROM here.
        from_table_path (str):
            The location where the source Delta table can be found.
        to_table_name (str):
            The name of the Delta table. Data will be merged TO here.
        to_table_path (str):
            The location where the target Delta table can be found.
        spark_session (SparkSession):
            The Spark session to use for the merging.
        matching_keys (Union[str, List[str], Tuple[str, ...]]):
            The list of matching columns between both the Spark table and the Delta table.<br>
            If this is parsed in as a `#!py str` type, then it will be coerced to a list like: `[matching_keys]`.<br>
            If this is not provided, then BOTH the `from_keys` and the `to_keys` must be provided.<br>
            Defaults to `#!py None`.
        editdate_col_name (Optional[str], optional):
            The column to use for the `editdate` field, in case any table uses a different name for this field.<br>
            If not provided (as in, the value `#!py None` is parsed to this parameter), then this function will not implement any conditional logic during the [`.whenMatchedUpdateAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdateAll) method.<br>
            Defaults to `#!py "editdate"`.
        delete_unmatched_rows (Optional[bool], optional):
            Whether or not to **DELETE** rows on the _target_ table which are existing on the _target_ but missing from the _source_ tables.<br>
            This should be used if you want to clean the target table and delete any rows which have already been deleted from the source table.<br>
            If `#!py True`, then this function will implement the method [`.whenNoMatchedBySourceDelete()`](https://docs.delta.io/latest/api/python/spark/index.html#delta.tables.DeltaMergeBuilder.whenNotMatchedBySourceDelete) method, with no conditionals.<br>
            Defaults to `#!py False`.
        enable_automatic_schema_evolution (Optional[bool], optional):
            Optional parameter for whether or not to automatically update the downstream `delta` table schema.<br>
            As documented extensively elsewhere:

            - https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge
            - https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html
            - https://www.databricks.com/blog/2020/05/19/schema-evolution-in-merge-operations-and-operational-metrics-in-delta-lake.html
            - https://towardsdatascience.com/delta-lake-automatic-schema-evolution-11d32bd1aa99

            Defaults to `#!py False`.
        return_merge_metrics (Optional[bool], optional):
            Set to `#!py True` if you want to return the Merge metrics from this function.<br>
            If `#!py False`, it will only return the value: `#!py True`.<br>
            Defaults to `#!py False`.

    Returns:
        (Union[bool, psDataFrame]):
            Will return either:

            - If `return_merge_metrics` is `#!py True`: Will return the Merge metrics, which is calculated by:
                1. Extracting the history from DeltaTable (at the `to_table_path` location),
                1. Coercing that history object to a `pyspark` DataFrame,
                1. Filtering to only extract the `#!sql MERGE` operations,
                1. Limiting to the top `#!py 1` lines, which is the most recent info.
            - If `return_merge_metrics` is `#!py False`: The value `#!py True` is returned when the function runs successfully.

            If an error is thrown, then obviously it will not reach this far.
            Unfortunately, the DeltaTable Merge process does not return any data or statistics from it's execution... So therefore, we need to use the DeltaTable history to fetch the metrics. For more info, see: [Show key metrics after running `.merge(...)....execute()`](https://github.com/delta-io/delta/issues/1361)

    Raises:
        TypeError:
            If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
        AttributeError:
            - If any of `matching_keys` do not exist in the Spark table
            - If any of `matching_keys` do not exist in the Delta table
            - If any of `from_keys` do not exist in the Spark table
            - If any of `to_keys` do not exist in the Delta table
        AssertionError:
            - If `matching_keys` is None AND `from_keys` is None
            - If `matching_keys` is None AND `to_keys` is None
            - If length of `from_keys` does not match the length of `to_keys`

    ??? info "Notes"
        The main objective of this function is to:

        1. For any records _existing_ in Spark but _missing_ in Delta, then INSERT those records from Spark to Delta. Using the [`.whenNotMatchedInsertAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenNotMatchedInsertAll) method.
        1. For any records _existing_ in both Spark and Delta, check if they have been _updated_ in Spark and if so then UPDATE those matching records in the Delta. Using the [`.whenMatchedUpdateAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdateAll) method.
        1. Conditionally, check whether or not to actually apply #2 above by comparing the `editdate_col_name` field between the two tables.

        Note:

        1. The `from_keys` and the `to_keys` will logically be the same values MOST of the time.
            - Very rarely will they ever be different; however, they are added here as separate parameters to facilitate this future functionality.
        1. If `from_keys` and `to_keys` are type `#!py list`, then their length must be the same.
        1. Conditional logic is applied during the [`.whenMatchedUpdateAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdateAll) method to avoid re-updating data in the Delta location which has actually updated from the SpSpark table.
        1. There is an additional `#!sql ifnull()` conditional check added to the [`.whenMatchedUpdateAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdateAll) method for converting any values in the _target_ table to `#!py timestamp(0)` when their value is actually `#!sql null`.
            - The history to this check is that when these data were originally added to BigDaS, the column `EditDate` did not exist.
            - Therefore, when they were first inserted, all the values in `EditDate` were `#!sql null`.
            - As time progressed, the records have slowly been updating, and therefore the `EditDate` values have been changing.
            - Due to nuances and semantics around how Spark handles `null` values, whenever this previous check was run including columns with values `#!sql null`, it would inevitably return `#!sql null`.
            - As such, these rows were not identified as able to be matched, therefore the optimiser skipped them.
            - However, we actually did want them to be matched; because the rows had actually been updated on the _source_ table.
            - Therefore, we add this `#!sql ifnull()` check to capture this edge case, and then push through and update the record on the _target_ table.
        1. The parameter `enable_automatic_schema_evolution` was added because it is possible for the upstream tables to be adding new columns as they evolve. Therefore, it is necessary for this function to handle schema evolution automatically.

    ??? question "References"
        - https://docs.databricks.com/delta/delta-update.html#language-python
        - https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge&language-python
        - https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder
        - https://spark.apache.org/docs/3.0.0-preview/sql-ref-null-semantics.html
        - https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge

    ??? tip "See Also"
        - [`merge_spark_to_delta()`][toolbox_pyspark.delta.merge_spark_to_delta]
        - [`load_table()`][toolbox_pyspark.delta.load_table]
        - [`assert_columns_exists()`][toolbox_pyspark.delta.assert_columns_exists]
        - [`get_columns()`][toolbox_pyspark.delta.get_columns]
        - [`SparkSession`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html)
        - [`SparkSession.sql()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sql.html)
        - [`DeltaMergeBuilder`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder)
        - [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable)
        - [`DeltaTable.history()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.history)
    """
    from_table: DeltaTable = load_table(
        name=from_table_name,
        path=from_table_path,
        spark_session=spark_session,
    )
    return merge_spark_to_delta(
        from_table=from_table.toDF(),
        to_table_name=to_table_name,
        to_table_path=to_table_path,
        matching_keys=matching_keys,
        partition_keys=partition_keys,
        editdate_col_name=editdate_col_name,
        delete_unmatched_rows=delete_unmatched_rows,
        enable_automatic_schema_evolution=enable_automatic_schema_evolution,
        return_merge_metrics=return_merge_metrics,
    )

retry_merge_spark_to_delta 🔗

retry_merge_spark_to_delta(
    from_table: psDataFrame,
    to_table_name: str,
    to_table_path: str,
    matching_keys: Optional[str_collection] = None,
    from_keys: Optional[str_collection] = None,
    to_keys: Optional[str_collection] = None,
    partition_keys: Optional[str_dict] = None,
    editdate_col_name: Optional[str] = "editdate",
    delete_unmatched_rows: Optional[bool] = False,
    enable_automatic_schema_evolution: Optional[
        bool
    ] = False,
    return_merge_metrics: Optional[bool] = False,
    retry_exceptions: Union[
        type[Exception],
        list[Type[Exception]],
        tuple[Type[Exception], ...],
    ] = Exception,
    retry_attempts: int = 10,
) -> Union[bool, psDataFrame]

Summary

Take one PySpark DataFrame from_table, and merge it with another DeltaTable at location: to_table_path/to_table_name.

Details

This function is fundamentally the same as the merge_spark_to_delta() function, except that it will automatically retry the merge function a number of times if it meets an error.

Particularly useful for when you are trying to run this optimisation over a cluster, and when parallelisaiton is causing multiple processes to occur over the same DeltaTable at the same time.

For more info on the Retry process, see: stamina.retry().

Parameters:

Name Type Description Default
from_table DataFrame

The PySpark table. Data will be merged FROM here.

required
to_table_name str

The name of the Delta table. Data will be merged TO here.

required
to_table_path str

The location where the target Delta table can be found.

required
matching_keys Union[List[str], str]

The list of matching columns between both the Spark table and the Delta table.
If this is parsed in as a str type, then it will be coerced to a list like: [matching_keys].
If this is not provided, then BOTH the from_keys and the to_keys must be provided.
Defaults to None.

None
from_keys Union[List[str], str]

The list of keys on the from_table to use in the join.
If this is parsed in as a str type, then it will be coerced to a list like: [from_keys].
Only necessary when matching_keys is None. When provided, the length must be the same as the to_keys.
Defaults to None.

None
to_keys Union[List[str], str]

The list of keys on the to_table to use in the join.
If this is parsed in as a str type, then it will be coerced to a list like: [to_keys].
Only necessary when matching_keys is None. When provided, the length must be the same as the from_keys.
Defaults to None.

None
editdate_col_name Optional[str]

The column to use for the editdate field, in case any table uses a different name for this field.
If not provided (as in, the value None is parsed to this parameter), then this function will not implement any conditional logic during the .whenMatchedUpdateAll() method.
Defaults to "editdate".

'editdate'
delete_unmatched_rows Optional[bool]

Whether or not to DELETE rows on the target table which are existing on the target but missing from the source tables.
This should be used if you want to clean the target table and delete any rows which have already been deleted from the source table.
If True, then this function will implement the method .whenNoMatchedBySourceDelete() method, with no conditionals.
Defaults to False.

False
enable_automatic_schema_evolution Optional[bool] False
return_merge_metrics Optional[bool]

Set to True if you want to return the Merge metrics from this function.
If False, it will only return the value: True.
Defaults to False.

False
retry_exceptions Union[Type[Exception], List[Type[Exception]], Tuple[Type[Exception], ...]]

A given single or collection of expected exceptions for which to catch and retry for.
Defaults to Exception.

Exception
retry_attempts int

The number of retries to attempt. If the underlying process is still failing after this number of attempts, then throw a hard error and alert the user.
Defaults to 10.

10

Returns:

Type Description
Union[bool, DataFrame]

Will return either:

  • If return_merge_metrics is True: Will return the Merge metrics, which is calculated by:
    1. Extracting the history from DeltaTable (at the to_table_path location),
    2. Coercing that history object to a pyspark DataFrame,
    3. Filtering to only extract the MERGE operations,
    4. Limiting to the top 1 lines, which is the most recent info.
  • If return_merge_metrics is False: The value True is returned when the function runs successfully.

If an error is thrown, then obviously it will not reach this far. Unfortunately, the DeltaTable Merge process does not return any data or statistics from it's execution... So therefore, we need to use the DeltaTable history to fetch the metrics. For more info, see: Show key metrics after running .merge(...)....execute()

Raises:

Type Description
TypeError

If any of the inputs parsed to the parameters of this function are not the correct type. Uses the @typeguard.typechecked decorator.

AttributeError
  • If any of matching_keys do not exist in the Spark table
  • If any of matching_keys do not exist in the Delta table
  • If any of from_keys do not exist in the Spark table
  • If any of to_keys do not exist in the Delta table
AssertionError
  • If matching_keys is None AND from_keys is None
  • If matching_keys is None AND to_keys is None
  • If length of from_keys does not match the length of to_keys
Notes
The main objective of this function is to:
  1. For any records existing in Spark but missing in Delta, then INSERT those records from Spark to Delta. Using the .whenNotMatchedInsertAll() method.
  2. For any records existing in both Spark and Delta, check if they have been updated in Spark and if so then UPDATE those matching records in the Delta. Using the .whenMatchedUpdateAll() method.
  3. Conditionally, check whether or not to actually apply #2 above by comparing the editdate_col_name field between the two tables.
Pay particular attention to:
  1. The from_keys and the to_keys will logically be the same values MOST of the time.
    • Very rarely will they ever be different; however, they are added here as separate parameters to facilitate this future functionality.
  2. If from_keys and to_keys are type list, then their length must be the same.
  3. Conditional logic is applied during the .whenMatchedUpdateAll() method to avoid re-updating data in the Delta location which has actually updated from the SpSpark table.
  4. There is an additional ifnull() conditional check added to the .whenMatchedUpdateAll() method for converting any values in the target table to timestamp(0) when their value is actually null.
    • The history to this check is that when these data were originally added to BigDaS, the column EditDate did not exist.
    • Therefore, when they were first inserted, all the values in EditDate were null.
    • As time progressed, the records have slowly been updating, and therefore the EditDate values have been changing.
    • Due to nuances and semantics around how Spark handles null values, whenever this previous check was run including columns with values null, it would inevitably return null.
    • As such, these rows were not identified as able to be matched, therefore the optimiser skipped them.
    • However, we actually did want them to be matched; because the rows had actually been updated on the source table.
    • Therefore, we add this ifnull() check to capture this edge case, and then push through and update the record on the target table.
  5. The parameter enable_automatic_schema_evolution was added because it is possible for the upstream tables to be adding new columns as they evolve. Therefore, it is necessary for this function to handle schema evolution automatically.
References
See also
Source code in src/toolbox_pyspark/delta.py
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
def retry_merge_spark_to_delta(
    from_table: psDataFrame,
    to_table_name: str,
    to_table_path: str,
    matching_keys: Optional[str_collection] = None,
    from_keys: Optional[str_collection] = None,
    to_keys: Optional[str_collection] = None,
    partition_keys: Optional[str_dict] = None,
    editdate_col_name: Optional[str] = "editdate",
    delete_unmatched_rows: Optional[bool] = False,
    enable_automatic_schema_evolution: Optional[bool] = False,
    return_merge_metrics: Optional[bool] = False,
    retry_exceptions: Union[
        type[Exception],
        list[Type[Exception]],
        tuple[Type[Exception], ...],
    ] = Exception,
    retry_attempts: int = 10,
) -> Union[bool, psDataFrame]:
    """
    !!! note "Summary"

        Take one PySpark DataFrame `from_table`, and merge it with another DeltaTable at location: `to_table_path`/`to_table_name`.

    ???+ abstract "Details"

        This function is fundamentally the same as the [`merge_spark_to_delta()`][toolbox_pyspark.delta.merge_spark_to_delta] function, except that it will automatically retry the merge function a number of times if it meets an error.

        Particularly useful for when you are trying to run this optimisation over a cluster, and when parallelisaiton is causing multiple processes to occur over the same DeltaTable at the same time.

        For more info on the Retry process, see: [`stamina.retry()`](https://stamina.hynek.me/en/stable/).

    Params:
        from_table (psDataFrame):
            The PySpark table. Data will be merged FROM here.

        to_table_name (str):
            The name of the Delta table. Data will be merged TO here.

        to_table_path (str):
            The location where the target Delta table can be found.

        matching_keys (Union[List[str], str], optional):
            The list of matching columns between both the Spark table and the Delta table.<br>
            If this is parsed in as a `#!py str` type, then it will be coerced to a list like: `[matching_keys]`.<br>
            If this is not provided, then BOTH the `from_keys` and the `to_keys` must be provided.<br>
            Defaults to `#!py None`.

        from_keys (Union[List[str], str], optional):
            The list of keys on the `from_table` to use in the join.<br>
            If this is parsed in as a `#!py str` type, then it will be coerced to a list like: `[from_keys]`.<br>
            Only necessary when `matching_keys` is `#!py None`. When provided, the length must be the same as the `to_keys`.<br>
            Defaults to `#!py None`.

        to_keys (Union[List[str], str], optional):
            The list of keys on the `to_table` to use in the join.<br>
            If this is parsed in as a `#!py str` type, then it will be coerced to a list like: `[to_keys]`.<br>
            Only necessary when `matching_keys` is `#!py None`. When provided, the length must be the same as the `from_keys`.<br>
            Defaults to `#!py None`.

        editdate_col_name (Optional[str], optional):
            The column to use for the `editdate` field, in case any table uses a different name for this field.<br>
            If not provided (as in, the value `#!py None` is parsed to this parameter), then this function will not implement any conditional logic during the [`.whenMatchedUpdateAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdateAll) method.<br>
            Defaults to `#!py "editdate"`.

        delete_unmatched_rows (Optional[bool], optional):
            Whether or not to **DELETE** rows on the _target_ table which are existing on the _target_ but missing from the _source_ tables.<br>
            This should be used if you want to clean the target table and delete any rows which have already been deleted from the source table.<br>
            If `#!py True`, then this function will implement the method [`.whenNoMatchedBySourceDelete()`](https://docs.delta.io/latest/api/python/spark/index.html#delta.tables.DeltaMergeBuilder.whenNotMatchedBySourceDelete) method, with no conditionals.<br>
            Defaults to `#!py False`.

        enable_automatic_schema_evolution (Optional[bool], optional):
            Optional parameter for whether or not to automatically update the downstream `delta` table schema.<br>
            As documented extensively elsewhere:

            - https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge
            - https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html
            - https://www.databricks.com/blog/2020/05/19/schema-evolution-in-merge-operations-and-operational-metrics-in-delta-lake.html
            - https://towardsdatascience.com/delta-lake-automatic-schema-evolution-11d32bd1aa99

            Defaults to `#!py False`.

        return_merge_metrics (Optional[bool], optional):
            Set to `#!py True` if you want to return the Merge metrics from this function.<br>
            If `#!py False`, it will only return the value: `#!py True`.<br>
            Defaults to `#!py False`.

        retry_exceptions (Union[ Type[Exception], List[Type[Exception]], Tuple[Type[Exception], ...], ], optional):
            A given single or collection of expected exceptions for which to catch and retry for.<br>
            Defaults to `#!py Exception`.

        retry_attempts (int, optional):
            The number of retries to attempt. If the underlying process is still failing after this number of attempts, then throw a hard error and alert the user.<br>
            Defaults to `#!py 10`.

    Returns:
        (Union[bool, psDataFrame]):
            Will return either:

            - If `return_merge_metrics` is `#!py True`: Will return the Merge metrics, which is calculated by:
                1. Extracting the history from DeltaTable (at the `to_table_path` location),
                1. Coercing that history object to a `pyspark` DataFrame,
                1. Filtering to only extract the `#!sql MERGE` operations,
                1. Limiting to the top `#!py 1` lines, which is the most recent info.
            - If `return_merge_metrics` is `#!py False`: The value `#!py True` is returned when the function runs successfully.

            If an error is thrown, then obviously it will not reach this far.
            Unfortunately, the DeltaTable Merge process does not return any data or statistics from it's execution... So therefore, we need to use the DeltaTable history to fetch the metrics. For more info, see: [Show key metrics after running `.merge(...)....execute()`](https://github.com/delta-io/delta/issues/1361)

    Raises:
        TypeError:
            If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
        AttributeError:
            - If any of `matching_keys` do not exist in the Spark table
            - If any of `matching_keys` do not exist in the Delta table
            - If any of `from_keys` do not exist in the Spark table
            - If any of `to_keys` do not exist in the Delta table
        AssertionError:
            - If `matching_keys` is None AND `from_keys` is None
            - If `matching_keys` is None AND `to_keys` is None
            - If length of `from_keys` does not match the length of `to_keys`

    ???+ info "Notes"

        ???+ info "The main objective of this function is to:"

            1. For any records _existing_ in Spark but _missing_ in Delta, then INSERT those records from Spark to Delta. Using the [`.whenNotMatchedInsertAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenNotMatchedInsertAll) method.
            1. For any records _existing_ in both Spark and Delta, check if they have been _updated_ in Spark and if so then UPDATE those matching records in the Delta. Using the [`.whenMatchedUpdateAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdateAll) method.
            1. Conditionally, check whether or not to actually apply #2 above by comparing the `editdate_col_name` field between the two tables.

        ???+ info "Pay particular attention to:"

            1. The `from_keys` and the `to_keys` will logically be the same values MOST of the time.
                - Very rarely will they ever be different; however, they are added here as separate parameters to facilitate this future functionality.
            1. If `from_keys` and `to_keys` are type `#!py list`, then their length must be the same.
            1. Conditional logic is applied during the [`.whenMatchedUpdateAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdateAll) method to avoid re-updating data in the Delta location which has actually updated from the SpSpark table.
            1. There is an additional `#!sql ifnull()` conditional check added to the [`.whenMatchedUpdateAll()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder.whenMatchedUpdateAll) method for converting any values in the _target_ table to `#!py timestamp(0)` when their value is actually `#!sql null`.
                - The history to this check is that when these data were originally added to BigDaS, the column `EditDate` did not exist.
                - Therefore, when they were first inserted, all the values in `EditDate` were `#!sql null`.
                - As time progressed, the records have slowly been updating, and therefore the `EditDate` values have been changing.
                - Due to nuances and semantics around how Spark handles `null` values, whenever this previous check was run including columns with values `#!sql null`, it would inevitably return `#!sql null`.
                - As such, these rows were not identified as able to be matched, therefore the optimiser skipped them.
                - However, we actually did want them to be matched; because the rows had actually been updated on the _source_ table.
                - Therefore, we add this `#!sql ifnull()` check to capture this edge case, and then push through and update the record on the _target_ table.
            1. The parameter `enable_automatic_schema_evolution` was added because it is possible for the upstream tables to be adding new columns as they evolve. Therefore, it is necessary for this function to handle schema evolution automatically.

    ???+ question "References"
        - https://docs.databricks.com/delta/delta-update.html#language-python
        - https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge&language-python
        - https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder
        - https://spark.apache.org/docs/3.0.0-preview/sql-ref-null-semantics.html
        - https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge

    ???+ tip "See also"
        - [`stamina.retry()`](https://stamina.hynek.me/en/stable/)
        - [`merge_spark_to_delta()`][toolbox_pyspark.delta.merge_spark_to_delta]
        - [`load_table()`][toolbox_pyspark.delta.load_table]
        - [`assert_columns_exists()`][toolbox_pyspark.delta.assert_columns_exists]
        - [`get_columns()`][toolbox_pyspark.delta.get_columns]
        - [`SparkSession`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html)
        - [`SparkSession.sql()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sql.html)
        - [`DeltaMergeBuilder`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder)
        - [`DeltaTable`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable)
        - [`DeltaTable.history()`](https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.history)
    """

    @retry(
        on=((*retry_exceptions,) if isinstance(retry_exceptions, list) else retry_exceptions),
        attempts=retry_attempts,
    )
    @typechecked
    def _retry_merge_spark_to_delta(
        from_table: psDataFrame,
        to_table_name: str,
        to_table_path: str,
        matching_keys: Optional[str_collection] = None,
        from_keys: Optional[str_collection] = None,
        to_keys: Optional[str_collection] = None,
        partition_keys: Optional[str_dict] = None,
        editdate_col_name: Optional[str] = "editdate",
        delete_unmatched_rows: Optional[bool] = False,
        enable_automatic_schema_evolution: Optional[bool] = False,
        return_merge_metrics: Optional[bool] = False,
    ) -> Union[bool, psDataFrame]:
        return merge_spark_to_delta(
            from_table=from_table,
            to_table_name=to_table_name,
            to_table_path=to_table_path,
            matching_keys=matching_keys,
            from_keys=from_keys,
            to_keys=to_keys,
            partition_keys=partition_keys,
            editdate_col_name=editdate_col_name,
            delete_unmatched_rows=delete_unmatched_rows,
            enable_automatic_schema_evolution=enable_automatic_schema_evolution,
            return_merge_metrics=return_merge_metrics,
        )

    return _retry_merge_spark_to_delta(
        from_table=from_table,
        to_table_name=to_table_name,
        to_table_path=to_table_path,
        matching_keys=matching_keys,
        from_keys=from_keys,
        to_keys=to_keys,
        partition_keys=partition_keys,
        editdate_col_name=editdate_col_name,
        delete_unmatched_rows=delete_unmatched_rows,
        enable_automatic_schema_evolution=enable_automatic_schema_evolution,
        return_merge_metrics=return_merge_metrics,
    )

DeltaLoader 🔗

Summary

A class to load and inspect Delta Lake tables from a specified root directory.

Details

The DeltaLoader class provides methods to load Delta Lake tables from a specified root directory and inspect the contents of these tables. It uses the dbutils library if available to list folders, otherwise it falls back to using the os library.

Parameters:

Name Type Description Default
root str

The root directory where the Delta Lake tables are stored.

required
spark SparkSession

The Spark session to use for loading the Delta Lake tables.

required
dbutils optional

The dbutils library to use for listing folders. If not provided, the os library will be used.
Defaults to None.

None

Methods:

Name Description
load

str) -> psDataFrame: Load a Delta Lake table from the specified folder.

folders

List the folders in the root directory.

inspect

Inspect the Delta Lake tables in the root directory and return a DataFrame with information about each table.

Examples
Set up
1
2
3
4
5
6
7
8
9
>>> # Imports
>>> from pyspark.sql import SparkSession
>>> from toolbox_pyspark.delta import DeltaLoader
>>>
>>> # Instantiate Spark
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> # Create DeltaLoader instance
>>> delta_loader = DeltaLoader(root="/path/to/delta/tables", spark=spark)

Example 1: Load a table
1
2
>>> df = delta_loader.load("folder_name")
>>> df.show()
Terminal
+---+---+---+
| a | b | c |
+---+---+---+
| 1 | 2 | 3 |
| 4 | 5 | 6 |
+---+---+---+

Conclusion: Successfully loaded the table from the specified folder.

Example 2: List folders
1
2
>>> folders = delta_loader.folders
>>> print(folders)
Terminal
['folder1', 'folder2', 'folder3']

Conclusion: Successfully listed the folders in the root directory.

Example 3: Inspect tables
1
2
>>> inspection_df = delta_loader.inspect()
>>> inspection_df.show()
Terminal
+---------+-------------+---------------------+-------+
| Folder  | TimeElement | TimeStamp           | Count |
+---------+-------------+---------------------+-------+
| folder1 | EDITDATE    | 2023-01-01 00:00:00 |   100 |
| folder2 | ADDDATE     | 2023-01-02 00:00:00 |   200 |
| folder3 | None        | None                |   300 |
+---------+-------------+---------------------+-------+

Conclusion: Successfully inspected the Delta Lake tables.

Source code in src/toolbox_pyspark/delta.py
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
class DeltaLoader:
    """
    !!! note "Summary"
        A class to load and inspect Delta Lake tables from a specified root directory.

    ???+ abstract "Details"
        The `DeltaLoader` class provides methods to load Delta Lake tables from a specified root directory and inspect the contents of these tables. It uses the `dbutils` library if available to list folders, otherwise it falls back to using the `os` library.

    Params:
        root (str):
            The root directory where the Delta Lake tables are stored.
        spark (SparkSession):
            The Spark session to use for loading the Delta Lake tables.
        dbutils (optional):
            The `dbutils` library to use for listing folders. If not provided, the `os` library will be used.<br>
            Defaults to `None`.

    Methods:
        load(folder_name: str) -> psDataFrame:
            Load a Delta Lake table from the specified folder.

        folders() -> str_list:
            List the folders in the root directory.

        inspect() -> psDataFrame:
            Inspect the Delta Lake tables in the root directory and return a DataFrame with information about each table.

    ???+ example "Examples"

        ```{.py .python linenums="1" title="Set up"}
        >>> # Imports
        >>> from pyspark.sql import SparkSession
        >>> from toolbox_pyspark.delta import DeltaLoader
        >>>
        >>> # Instantiate Spark
        >>> spark = SparkSession.builder.getOrCreate()
        >>>
        >>> # Create DeltaLoader instance
        >>> delta_loader = DeltaLoader(root="/path/to/delta/tables", spark=spark)
        ```

        ```{.py .python linenums="1" title="Example 1: Load a table"}
        >>> df = delta_loader.load("folder_name")
        >>> df.show()
        ```
        <div class="result" markdown>
        ```{.txt .text title="Terminal"}
        +---+---+---+
        | a | b | c |
        +---+---+---+
        | 1 | 2 | 3 |
        | 4 | 5 | 6 |
        +---+---+---+
        ```
        !!! success "Conclusion: Successfully loaded the table from the specified folder."
        </div>

        ```{.py .python linenums="1" title="Example 2: List folders"}
        >>> folders = delta_loader.folders
        >>> print(folders)
        ```
        <div class="result" markdown>
        ```{.txt .text title="Terminal"}
        ['folder1', 'folder2', 'folder3']
        ```
        !!! success "Conclusion: Successfully listed the folders in the root directory."
        </div>

        ```{.py .python linenums="1" title="Example 3: Inspect tables"}
        >>> inspection_df = delta_loader.inspect()
        >>> inspection_df.show()
        ```
        <div class="result" markdown>
        ```{.txt .text title="Terminal"}
        +---------+-------------+---------------------+-------+
        | Folder  | TimeElement | TimeStamp           | Count |
        +---------+-------------+---------------------+-------+
        | folder1 | EDITDATE    | 2023-01-01 00:00:00 |   100 |
        | folder2 | ADDDATE     | 2023-01-02 00:00:00 |   200 |
        | folder3 | None        | None                |   300 |
        +---------+-------------+---------------------+-------+
        ```
        !!! success "Conclusion: Successfully inspected the Delta Lake tables."
        </div>
    """

    def __init__(self, root: str, spark: SparkSession, dbutils=None) -> None:
        self._root: str = root
        self._spark_session: SparkSession = spark
        self._dbutils = dbutils

    def load(self, folder_name: str) -> psDataFrame:
        """
        !!! note "Summary"
            Load a Delta Lake table from the specified folder.

        ???+ abstract "Details"
            This method loads a Delta Lake table from the specified folder within the root directory. It uses the `read_from_path` function to read the data in Delta format.

        Params:
            folder_name (str):
                The name of the folder from which to load the Delta Lake table.

        Returns:
            (psDataFrame):
                The loaded Delta Lake table as a PySpark DataFrame.

        ???+ example "Examples"

            ```{.py .python linenums="1" title="Set up"}
            >>> # Imports
            >>> from pyspark.sql import SparkSession
            >>> from toolbox_pyspark.delta import DeltaLoader
            >>>
            >>> # Instantiate Spark
            >>> spark = SparkSession.builder.getOrCreate()
            >>>
            >>> # Create DeltaLoader instance
            >>> delta_loader = DeltaLoader(root="/path/to/delta/tables", spark=spark)
            ```

            ```{.py .python linenums="1" title="Example 1: Load a table"}
            >>> df = delta_loader.load("folder_name")
            >>> df.show()
            ```
            <div class="result" markdown>
            ```{.txt .text title="Terminal"}
            +---+---+---+
            | a | b | c |
            +---+---+---+
            | 1 | 2 | 3 |
            | 4 | 5 | 6 |
            +---+---+---+
            ```
            !!! success "Conclusion: Successfully loaded the table from the specified folder."
            </div>
        """
        return read_from_path(
            folder_name,
            self._root,
            spark_session=self._spark_session,
            data_format="delta",
        )

    @property
    def folders(self) -> str_list:
        """
        !!! note "Summary"
            List the folders in the root directory.

        ???+ abstract "Details"
            This property lists the folders in the root directory specified during the instantiation of the `DeltaLoader` class. It uses the `dbutils` library if available to list folders, otherwise it falls back to using the `os` library.

        Returns:
            (str_list):
                A list of folder names in the root directory.

        ???+ example "Examples"

            ```{.py .python linenums="1" title="Set up"}
            >>> # Imports
            >>> from pyspark.sql import SparkSession
            >>> from toolbox_pyspark.delta import DeltaLoader
            >>>
            >>> # Instantiate Spark
            >>> spark = SparkSession.builder.getOrCreate()
            >>>
            >>> # Create DeltaLoader instance
            >>> delta_loader = DeltaLoader(root="/path/to/delta/tables", spark=spark)
            ```

            ```{.py .python linenums="1" title="Example 1: List folders"}
            >>> folders = delta_loader.folders
            >>> print(folders)
            ```
            <div class="result" markdown>
            ```{.txt .text title="Terminal"}
            ['folder1', 'folder2', 'folder3']
            ```
            !!! success "Conclusion: Successfully listed the folders in the root directory."
            </div>
        """
        if self._dbutils is not None:
            return [
                folder.name.replace("/", "")
                for folder in self._dbutils.fs.ls(self._root)  # type:ignore
            ]
        else:
            return os.listdir(self._root)

    def inspect(self) -> psDataFrame:
        """
        !!! note "Summary"
            Inspect the Delta Lake tables in the root directory and return a DataFrame with information about each table.

        ???+ abstract "Details"
            This method inspects the Delta Lake tables in the root directory specified during the instantiation of the `DeltaLoader` class. It loads each table, checks for specific columns (`EDITDATE` and `ADDDATE`), and collects information about each table, including the folder name, the time element, the latest timestamp, and the row count.

        Returns:
            (psDataFrame):
                A DataFrame with information about each Delta Lake table in the root directory.

        ???+ example "Examples"

            ```{.py .python linenums="1" title="Set up"}
            >>> # Imports
            >>> from pyspark.sql import SparkSession
            >>> from toolbox_pyspark.delta import DeltaLoader
            >>>
            >>> # Instantiate Spark
            >>> spark = SparkSession.builder.getOrCreate()
            >>>
            >>> # Create DeltaLoader instance
            >>> delta_loader = DeltaLoader(root="/path/to/delta/tables", spark=spark)
            ```

            ```{.py .python linenums="1" title="Example 1: Inspect tables"}
            >>> inspection_df = delta_loader.inspect()
            >>> inspection_df.show()
            ```
            <div class="result" markdown>
            ```{.txt .text title="Terminal"}
            +---------+-------------+---------------------+-------+
            | Folder  | TimeElement | TimeStamp           | Count |
            +---------+-------------+---------------------+-------+
            | folder1 | EDITDATE    | 2023-01-01 00:00:00 |   100 |
            | folder2 | ADDDATE     | 2023-01-02 00:00:00 |   200 |
            | folder3 | None        | None                |   300 |
            +---------+-------------+---------------------+-------+
            ```
            !!! success "Conclusion: Successfully inspected the Delta Lake tables."
            </div>
        """
        data = []
        for folder in self.folders:
            df: psDataFrame = self.load(folder)
            cols: str_list = [col.upper() for col in df.columns]
            if "EDITDATE" in cols:
                data.append(
                    (
                        folder,
                        "EDITDATE",
                        df.select(F.max("EDITDATE")).first()["max(EDITDATE)"],
                        df.count(),
                    )
                )
            elif "ADDDATE" in cols:
                data.append(
                    (
                        folder,
                        "ADDDATE",
                        df.select(F.max("ADDDATE")).first()["max(ADDDATE)"],
                        df.count(),
                    )
                )
            else:
                data.append((folder, None, None, df.count()))
        return self._spark_session.createDataFrame(
            pd.DataFrame(data, columns=["Folder", "TimeElement", "TimeStamp", "Count"])
        )
__init__ 🔗
__init__(
    root: str, spark: SparkSession, dbutils=None
) -> None
Source code in src/toolbox_pyspark/delta.py
1391
1392
1393
1394
def __init__(self, root: str, spark: SparkSession, dbutils=None) -> None:
    self._root: str = root
    self._spark_session: SparkSession = spark
    self._dbutils = dbutils
load 🔗
load(folder_name: str) -> psDataFrame

Summary

Load a Delta Lake table from the specified folder.

Details

This method loads a Delta Lake table from the specified folder within the root directory. It uses the read_from_path function to read the data in Delta format.

Parameters:

Name Type Description Default
folder_name str

The name of the folder from which to load the Delta Lake table.

required

Returns:

Type Description
DataFrame

The loaded Delta Lake table as a PySpark DataFrame.

Examples
Set up
1
2
3
4
5
6
7
8
9
>>> # Imports
>>> from pyspark.sql import SparkSession
>>> from toolbox_pyspark.delta import DeltaLoader
>>>
>>> # Instantiate Spark
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> # Create DeltaLoader instance
>>> delta_loader = DeltaLoader(root="/path/to/delta/tables", spark=spark)

Example 1: Load a table
1
2
>>> df = delta_loader.load("folder_name")
>>> df.show()
Terminal
+---+---+---+
| a | b | c |
+---+---+---+
| 1 | 2 | 3 |
| 4 | 5 | 6 |
+---+---+---+

Conclusion: Successfully loaded the table from the specified folder.

Source code in src/toolbox_pyspark/delta.py
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
def load(self, folder_name: str) -> psDataFrame:
    """
    !!! note "Summary"
        Load a Delta Lake table from the specified folder.

    ???+ abstract "Details"
        This method loads a Delta Lake table from the specified folder within the root directory. It uses the `read_from_path` function to read the data in Delta format.

    Params:
        folder_name (str):
            The name of the folder from which to load the Delta Lake table.

    Returns:
        (psDataFrame):
            The loaded Delta Lake table as a PySpark DataFrame.

    ???+ example "Examples"

        ```{.py .python linenums="1" title="Set up"}
        >>> # Imports
        >>> from pyspark.sql import SparkSession
        >>> from toolbox_pyspark.delta import DeltaLoader
        >>>
        >>> # Instantiate Spark
        >>> spark = SparkSession.builder.getOrCreate()
        >>>
        >>> # Create DeltaLoader instance
        >>> delta_loader = DeltaLoader(root="/path/to/delta/tables", spark=spark)
        ```

        ```{.py .python linenums="1" title="Example 1: Load a table"}
        >>> df = delta_loader.load("folder_name")
        >>> df.show()
        ```
        <div class="result" markdown>
        ```{.txt .text title="Terminal"}
        +---+---+---+
        | a | b | c |
        +---+---+---+
        | 1 | 2 | 3 |
        | 4 | 5 | 6 |
        +---+---+---+
        ```
        !!! success "Conclusion: Successfully loaded the table from the specified folder."
        </div>
    """
    return read_from_path(
        folder_name,
        self._root,
        spark_session=self._spark_session,
        data_format="delta",
    )
folders property 🔗
folders: str_list

Summary

List the folders in the root directory.

Details

This property lists the folders in the root directory specified during the instantiation of the DeltaLoader class. It uses the dbutils library if available to list folders, otherwise it falls back to using the os library.

Returns:

Type Description
str_list

A list of folder names in the root directory.

Examples
Set up
1
2
3
4
5
6
7
8
9
>>> # Imports
>>> from pyspark.sql import SparkSession
>>> from toolbox_pyspark.delta import DeltaLoader
>>>
>>> # Instantiate Spark
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> # Create DeltaLoader instance
>>> delta_loader = DeltaLoader(root="/path/to/delta/tables", spark=spark)

Example 1: List folders
1
2
>>> folders = delta_loader.folders
>>> print(folders)
Terminal
['folder1', 'folder2', 'folder3']

Conclusion: Successfully listed the folders in the root directory.

inspect 🔗
inspect() -> psDataFrame

Summary

Inspect the Delta Lake tables in the root directory and return a DataFrame with information about each table.

Details

This method inspects the Delta Lake tables in the root directory specified during the instantiation of the DeltaLoader class. It loads each table, checks for specific columns (EDITDATE and ADDDATE), and collects information about each table, including the folder name, the time element, the latest timestamp, and the row count.

Returns:

Type Description
DataFrame

A DataFrame with information about each Delta Lake table in the root directory.

Examples
Set up
1
2
3
4
5
6
7
8
9
>>> # Imports
>>> from pyspark.sql import SparkSession
>>> from toolbox_pyspark.delta import DeltaLoader
>>>
>>> # Instantiate Spark
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> # Create DeltaLoader instance
>>> delta_loader = DeltaLoader(root="/path/to/delta/tables", spark=spark)

Example 1: Inspect tables
1
2
>>> inspection_df = delta_loader.inspect()
>>> inspection_df.show()
Terminal
+---------+-------------+---------------------+-------+
| Folder  | TimeElement | TimeStamp           | Count |
+---------+-------------+---------------------+-------+
| folder1 | EDITDATE    | 2023-01-01 00:00:00 |   100 |
| folder2 | ADDDATE     | 2023-01-02 00:00:00 |   200 |
| folder3 | None        | None                |   300 |
+---------+-------------+---------------------+-------+

Conclusion: Successfully inspected the Delta Lake tables.

Source code in src/toolbox_pyspark/delta.py
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
def inspect(self) -> psDataFrame:
    """
    !!! note "Summary"
        Inspect the Delta Lake tables in the root directory and return a DataFrame with information about each table.

    ???+ abstract "Details"
        This method inspects the Delta Lake tables in the root directory specified during the instantiation of the `DeltaLoader` class. It loads each table, checks for specific columns (`EDITDATE` and `ADDDATE`), and collects information about each table, including the folder name, the time element, the latest timestamp, and the row count.

    Returns:
        (psDataFrame):
            A DataFrame with information about each Delta Lake table in the root directory.

    ???+ example "Examples"

        ```{.py .python linenums="1" title="Set up"}
        >>> # Imports
        >>> from pyspark.sql import SparkSession
        >>> from toolbox_pyspark.delta import DeltaLoader
        >>>
        >>> # Instantiate Spark
        >>> spark = SparkSession.builder.getOrCreate()
        >>>
        >>> # Create DeltaLoader instance
        >>> delta_loader = DeltaLoader(root="/path/to/delta/tables", spark=spark)
        ```

        ```{.py .python linenums="1" title="Example 1: Inspect tables"}
        >>> inspection_df = delta_loader.inspect()
        >>> inspection_df.show()
        ```
        <div class="result" markdown>
        ```{.txt .text title="Terminal"}
        +---------+-------------+---------------------+-------+
        | Folder  | TimeElement | TimeStamp           | Count |
        +---------+-------------+---------------------+-------+
        | folder1 | EDITDATE    | 2023-01-01 00:00:00 |   100 |
        | folder2 | ADDDATE     | 2023-01-02 00:00:00 |   200 |
        | folder3 | None        | None                |   300 |
        +---------+-------------+---------------------+-------+
        ```
        !!! success "Conclusion: Successfully inspected the Delta Lake tables."
        </div>
    """
    data = []
    for folder in self.folders:
        df: psDataFrame = self.load(folder)
        cols: str_list = [col.upper() for col in df.columns]
        if "EDITDATE" in cols:
            data.append(
                (
                    folder,
                    "EDITDATE",
                    df.select(F.max("EDITDATE")).first()["max(EDITDATE)"],
                    df.count(),
                )
            )
        elif "ADDDATE" in cols:
            data.append(
                (
                    folder,
                    "ADDDATE",
                    df.select(F.max("ADDDATE")).first()["max(ADDDATE)"],
                    df.count(),
                )
            )
        else:
            data.append((folder, None, None, df.count()))
    return self._spark_session.createDataFrame(
        pd.DataFrame(data, columns=["Folder", "TimeElement", "TimeStamp", "Count"])
    )