Coverage for src/toolbox_pyspark/cleaning.py: 100%
71 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
1# ============================================================================ #
2# #
3# Title : Dataframe Cleaning #
4# Purpose : Clean, fix, and fetch various aspects on a given DataFrame. #
5# #
6# ============================================================================ #
9# ---------------------------------------------------------------------------- #
10# #
11# Overview ####
12# #
13# ---------------------------------------------------------------------------- #
16# ---------------------------------------------------------------------------- #
17# Description ####
18# ---------------------------------------------------------------------------- #
21"""
22!!! note "Summary"
23 The `cleaning` module is used to clean, fix, and fetch various aspects on a given DataFrame.
24"""
27# ---------------------------------------------------------------------------- #
28# #
29# Setup ####
30# #
31# ---------------------------------------------------------------------------- #
34# ---------------------------------------------------------------------------- #
35# Imports ####
36# ---------------------------------------------------------------------------- #
39# ## Python StdLib Imports ----
40from typing import Optional, Union
42# ## Python Third Party Imports ----
43from numpy import ndarray as npArray
44from pandas import DataFrame as pdDataFrame
45from pyspark.sql import (
46 Column,
47 DataFrame as psDataFrame,
48 SparkSession,
49 functions as F,
50 types as T,
51)
52from toolbox_python.checkers import is_type
53from toolbox_python.collection_types import str_collection, str_list
54from toolbox_python.lists import flatten
55from typeguard import typechecked
57# ## Local First Party Imports ----
58from toolbox_pyspark.checks import assert_column_exists, assert_columns_exists
59from toolbox_pyspark.columns import get_columns
60from toolbox_pyspark.constants import (
61 LITERAL_LIST_OBJECT_NAMES,
62 LITERAL_NUMPY_ARRAY_NAMES,
63 LITERAL_PANDAS_DATAFRAME_NAMES,
64 LITERAL_PYSPARK_DATAFRAME_NAMES,
65 VALID_LIST_OBJECT_NAMES,
66 VALID_NUMPY_ARRAY_NAMES,
67 VALID_PANDAS_DATAFRAME_NAMES,
68 VALID_PYAPARK_JOIN_TYPES,
69 VALID_PYSPARK_DATAFRAME_NAMES,
70 WHITESPACE_CHARACTERS as WHITESPACES,
71)
74# ---------------------------------------------------------------------------- #
75# Exports ####
76# ---------------------------------------------------------------------------- #
79__all__: str_list = [
80 "create_empty_dataframe",
81 "keep_first_record_by_columns",
82 "convert_dataframe",
83 "update_nullability",
84 "trim_spaces_from_column",
85 "trim_spaces_from_columns",
86 "apply_function_to_column",
87 "apply_function_to_columns",
88 "drop_matching_rows",
89]
92# ---------------------------------------------------------------------------- #
93# #
94# Functions ####
95# #
96# ---------------------------------------------------------------------------- #
99# ---------------------------------------------------------------------------- #
100# Empty DataFrame ####
101# ---------------------------------------------------------------------------- #
104@typechecked
105def create_empty_dataframe(spark_session: SparkSession) -> psDataFrame:
106 return spark_session.createDataFrame([], T.StructType([]))
109# ---------------------------------------------------------------------------- #
110# Column processes ####
111# ---------------------------------------------------------------------------- #
114@typechecked
115def keep_first_record_by_columns(
116 dataframe: psDataFrame,
117 columns: Union[str, str_collection],
118) -> psDataFrame:
119 """
120 !!! note "Summary"
121 For a given Spark `#!py DataFrame`, keep the first record given by the column(s) specified in `#!py columns`.
123 ???+ abstract "Details"
124 The necessity for this function arose when we needed to perform a `#!py distinct()` function for a given `#!py DataFrame`; however, we still wanted to retain data provided in the other columns.
126 Params:
127 dataframe (psDataFrame):
128 The DataFrame that you want to filter.
129 columns (Union[str, str_collection]):
130 The single or multiple columns by which you want to extract the distinct values from.
132 Raises:
133 TypeError:
134 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
135 ColumnDoesNotExistError:
136 If any of the `#!py columns` do not exist within `#!py dataframe.columns`.
138 Returns:
139 (psDataFrame):
140 The updated dataframe, retaining only the first unique set of records from the columns specified in `#!py columns`.
142 ???+ example "Examples"
144 ```{.py .python linenums="1" title="Set up"}
145 >>> # Imports
146 >>> import pandas as pd
147 >>> from pyspark.sql import SparkSession
148 >>> from toolbox_pyspark.cleaning import keep_first_record_by_columns
149 >>>
150 >>> # Instantiate Spark
151 >>> spark = SparkSession.builder.getOrCreate()
152 >>>
153 >>> # Create data
154 >>> df = spark.createDataFrame(
155 ... pd.DataFrame(
156 ... {
157 ... "a": [1, 2, 3, 4],
158 ... "b": ["a", "b", "c", "d"],
159 ... "c": [1, 1, 2, 2],
160 ... "d": [1, 2, 2, 2],
161 ... "e": [1, 1, 2, 3],
162 ... }
163 ... )
164 ... )
165 >>>
166 >>> # Check
167 >>> df.show()
168 ```
169 <div class="result" markdown>
170 ```{.txt .text title="Terminal"}
171 +---+---+---+---+---+
172 | a | b | c | d | e |
173 +---+---+---+---+---+
174 | 1 | a | 1 | 1 | 1 |
175 | 2 | b | 1 | 2 | 1 |
176 | 3 | c | 2 | 2 | 2 |
177 | 4 | d | 2 | 2 | 3 |
178 +---+---+---+---+---+
179 ```
180 </div>
182 ```{.py .python linenums="1" title="Example 1: Distinct by the `c` column"}
183 >>> keep_first_record_by_columns(df, "c").show()
184 ```
185 <div class="result" markdown>
186 ```{.txt .text title="Terminal"}
187 +---+---+---+---+---+
188 | a | b | c | d | e |
189 +---+---+---+---+---+
190 | 1 | a | 1 | 1 | 1 |
191 | 3 | c | 2 | 2 | 2 |
192 +---+---+---+---+---+
193 ```
194 !!! success "Conclusion: Successfully kept first records by the `c` column."
195 </div>
197 ```{.py .python linenums="1" title="Example 2: Distinct by the `d` column"}
198 >>> keep_first_record_by_columns(df, "d").show()
199 ```
200 <div class="result" markdown>
201 ```{.txt .text title="Terminal"}
202 +---+---+---+---+---+
203 | a | b | c | d | e |
204 +---+---+---+---+---+
205 | 1 | a | 1 | 1 | 1 |
206 | 2 | b | 1 | 2 | 1 |
207 +---+---+---+---+---+
208 ```
209 !!! success "Conclusion: Successfully kept first records by the `d` column."
210 </div>
212 ```{.py .python linenums="1" title="Example 3: Distinct by the `e` column"}
213 >>> keep_first_record_by_columns(df, "e").show()
214 ```
215 <div class="result" markdown>
216 ```{.txt .text title="Terminal"}
217 +---+---+---+---+---+
218 | a | b | c | d | e |
219 +---+---+---+---+---+
220 | 1 | a | 1 | 1 | 1 |
221 | 3 | c | 2 | 2 | 2 |
222 | 4 | d | 2 | 2 | 3 |
223 +---+---+---+---+---+
224 ```
225 !!! success "Conclusion: Successfully kept first records by the `e` column."
226 </div>
228 ```{.py .python linenums="1" title="Example 4: Distinct by the `c` & `d` columns"}
229 >>> keep_first_record_by_columns(df, ["c", "d"]).show()
230 ```
231 <div class="result" markdown>
232 ```{.txt .text title="Terminal"}
233 +---+---+---+---+---+
234 | a | b | c | d | e |
235 +---+---+---+---+---+
236 | 1 | a | 1 | 1 | 1 |
237 | 2 | b | 1 | 2 | 1 |
238 | 3 | c | 2 | 2 | 2 |
239 +---+---+---+---+---+
240 ```
241 !!! success "Conclusion: Successfully kept first records by the `c` & `d` columns."
242 </div>
244 ```{.py .python linenums="1" title="Example 5: Distinct by the `c` & `e` columns"}
245 >>> keep_first_record_by_columns(df, ["c", "e"]).show()
246 ```
247 <div class="result" markdown>
248 ```{.txt .text title="Terminal"}
249 +---+---+---+---+---+
250 | a | b | c | d | e |
251 +---+---+---+---+---+
252 | 1 | a | 1 | 1 | 1 |
253 | 3 | c | 2 | 2 | 2 |
254 | 4 | d | 2 | 2 | 3 |
255 +---+---+---+---+---+
256 ```
257 !!! success "Conclusion: Successfully kept first records by the `c` & `e` columns."
258 </div>
260 ```{.py .python linenums="1" title="Example 6: Distinct by the `d` & `e` columns"}
261 >>> keep_first_record_by_columns(df, ["d", "e"]).show()
262 ```
263 <div class="result" markdown>
264 ```{.txt .text title="Terminal"}
265 +---+---+---+---+---+
266 | a | b | c | d | e |
267 +---+---+---+---+---+
268 | 1 | a | 1 | 1 | 1 |
269 | 2 | b | 1 | 2 | 1 |
270 | 3 | c | 2 | 2 | 2 |
271 | 4 | d | 2 | 2 | 3 |
272 +---+---+---+---+---+
273 ```
274 !!! success "Conclusion: Successfully kept first records by the `d` & `e` columns."
275 </div>
277 ```{.py .python linenums="1" title="Example 7: Distinct by the `c`, `d` & `e` columns"}
278 >>> keep_first_record_by_columns(df, ["c", "d", "e"]).show()
279 ```
280 <div class="result" markdown>
281 ```{.txt .text title="Terminal"}
282 +---+---+---+---+---+
283 | a | b | c | d | e |
284 +---+---+---+---+---+
285 | 1 | a | 1 | 1 | 1 |
286 | 2 | b | 1 | 2 | 1 |
287 | 3 | c | 2 | 2 | 2 |
288 | 4 | d | 2 | 2 | 3 |
289 +---+---+---+---+---+
290 ```
291 !!! success "Conclusion: Successfully kept first records by the `c`, `d` & `e` columns."
292 !!! failure "Conclusion: Failure."
293 </div>
295 ```{.py .python linenums="1" title="Example 8: Column missing"}
296 >>> keep_first_record_by_columns(df, "f")
297 ```
298 <div class="result" markdown>
299 ```{.txt .text title="Terminal"}
300 ColumnDoesNotExistError: Column 'f' does not exist in the DataFrame.
301 Try one of: ["a", "b", "c", "d", "e"]
302 ```
303 !!! failure "Conclusion: Column missing."
304 </div>
306 ??? info "Notes"
307 The way this process will retain only the first record in the given `#!py columns` is by:
309 1. Add a new column called `RowNum`
310 1. This `RowNum` column uses the SparkSQL function `#!sql ROW_NUMBER()`
311 1. The window-function `#!sql OVER` clause will then:
312 1. `#!sql PARTITION BY` the `#!py columns`,
313 1. `#!sql ORDER BY` the `#!py columns`.
314 1. Filter so that `#!sql RowNum=1`.
315 1. Drop the `#!py RowNum` column.
317 ??? tip "See Also"
318 - [`toolbox_pyspark.checks.assert_columns_exists()`][toolbox_pyspark.checks.assert_columns_exists]
319 """
320 columns = [columns] if is_type(columns, str) else columns
321 assert_columns_exists(dataframe, columns)
322 return (
323 dataframe.withColumn(
324 colName="RowNum",
325 col=F.expr(
326 f"""
327 ROW_NUMBER()
328 OVER
329 (
330 PARTITION BY {','.join(columns)}
331 ORDER BY {','.join(columns)}
332 )
333 """
334 ),
335 )
336 .where("RowNum=1")
337 .drop("RowNum")
338 )
341@typechecked
342def convert_dataframe(
343 dataframe: psDataFrame,
344 return_type: Union[
345 LITERAL_PYSPARK_DATAFRAME_NAMES,
346 LITERAL_PANDAS_DATAFRAME_NAMES,
347 LITERAL_NUMPY_ARRAY_NAMES,
348 LITERAL_LIST_OBJECT_NAMES,
349 str,
350 ] = "pd",
351) -> Optional[Union[psDataFrame, pdDataFrame, npArray, list]]:
352 """
353 !!! note "Summary"
354 Convert a PySpark DataFrame to the desired return type.
356 ???+ abstract "Details"
357 This function converts a PySpark DataFrame to one of the supported return types, including:
359 PySpark DataFrame:
361 <div class="mdx-four-columns" markdown>
363 - `#!py "spark.DataFrame"`
364 - `#!py "pyspark.DataFrame"`
365 - `#!py "pyspark"`
366 - `#!py "spark"`
367 - `#!py "ps.DataFrame"`
368 - `#!py "ps.df"`
369 - `#!py "psdf"`
370 - `#!py "psDataFrame"`
371 - `#!py "psDF"`
372 - `#!py "ps"`
374 </div>
376 Pandas DataFrame:
378 <div class="mdx-four-columns" markdown>
380 - `#!py "pandas.DataFrame"`
381 - `#!py "pandas"`
382 - `#!py "pd.DataFrame"`
383 - `#!py "pd.df"`
384 - `#!py "pddf"`
385 - `#!py "pdDataFrame"`
386 - `#!py "pdDF"`
387 - `#!py "pd"`
389 </div>
391 NumPy array:
393 <div class="mdx-four-columns" markdown>
395 - `#!py "numpy.array"`
396 - `#!py "np.array"`
397 - `#!py "np"`
398 - `#!py "numpy"`
399 - `#!py "nparr"`
400 - `#!py "npa"`
401 - `#!py "np.arr"`
402 - `#!py "np.a"`
404 </div>
406 Python list:
408 <div class="mdx-four-columns" markdown>
410 - `#!py "list"`
411 - `#!py "lst"`
412 - `#!py "l"`
413 - `#!py "flat_list"`
414 - `#!py "flatten_list"`
416 </div>
418 Params:
419 dataframe (psDataFrame):
420 The PySpark DataFrame to be converted.
421 return_type (Union[LITERAL_LIST_OBJECT_NAMES, LITERAL_PANDAS_DATAFRAME_NAMES, LITERAL_PYSPARK_DATAFRAME_NAMES, LITERAL_NUMPY_ARRAY_NAMES, str], optional):
422 The desired return type.<br>
423 Options:
425 - `#!py "ps"`: Return the PySpark DataFrame.
426 - `#!py "pd"`: Return a Pandas DataFrame.
427 - `#!py "np"`: Return a NumPy array.
428 - `#!py "list"`: Return a Python list.
429 - `#!py "list_flat"`: Return a flat Python list (1D).
431 Defaults to `#!py "pd"` (Pandas DataFrame).
433 Raises:
434 TypeError:
435 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
436 ValueError:
437 If any of the values parsed to `return_type` are not valid options.
439 Returns:
440 (Optional[Union[psDataFrame, pdDataFrame, npArray, list]]):
441 The converted data in the specified return type.
443 ???+ example "Examples"
445 ```{.py .python linenums="1" title="Set up"}
446 >>> # Imports
447 >>> import pandas as pd
448 >>> from pyspark.sql import SparkSession
449 >>> from toolbox_pyspark.cleaning import convert_dataframe
450 >>>
451 >>> # Instantiate Spark
452 >>> spark = SparkSession.builder.getOrCreate()
453 >>>
454 >>> # Create data
455 >>> df = spark.createDataFrame(
456 ... pdDataFrame(
457 ... {
458 ... "a": [1, 2, 3, 4],
459 ... "b": ["a", "b", "c", "d"],
460 ... }
461 ... )
462 ... )
463 >>>
464 >>> # Check
465 >>> df.show()
466 ```
467 <div class="result" markdown>
468 ```{.txt .text title="Terminal"}
469 +---+---+
470 | a | b |
471 +---+---+
472 | 0 | a |
473 | 1 | b |
474 | 2 | c |
475 | 3 | d |
476 +---+---+
477 ```
479 ```{.py .python linenums="1" title="Example 1: Convert to PySpark"}
480 >>> new_df = convert_dataframe(df, "ps")
481 >>> print(type(new_df))
482 >>> new_df.show()
483 ```
484 <div class="result" markdown>
485 ```{.txt .text title="Terminal"}
486 <class 'pyspark.sql.dataframe.DataFrame'>
487 ```
488 ```{.txt .text title="Terminal"}
489 +---+---+
490 | a | b |
491 +---+---+
492 | 0 | a |
493 | 1 | b |
494 | 2 | c |
495 | 3 | d |
496 +---+---+
497 ```
498 !!! success "Conclusion: Successfully converted to PySpark."
499 </div>
501 ```{.py .python linenums="1" title="Example 2: Convert to Pandas"}
502 >>> new_df = convert_dataframe(df, "pd")
503 >>> print(type(new_df))
504 >>> print(new_df)
505 ```
506 <div class="result" markdown>
507 ```{.txt .text title="Terminal"}
508 <class 'pandas.core.frame.DataFrame'>
509 ```
510 ```{.txt .text title="Terminal"}
511 a b
512 0 0 a
513 1 1 b
514 2 2 c
515 3 3 d
516 ```
517 !!! success "Conclusion: Successfully converted to Pandas."
518 </div>
520 ```{.py .python linenums="1" title="Example 3: Convert to Numpy"}
521 >>> new_df = convert_dataframe(df, "np")
522 >>> print(type(new_df))
523 >>> print(new_df)
524 ```
525 <div class="result" markdown>
526 ```{.txt .text title="Terminal"}
527 <class 'numpy.ndarray'>
528 ```
529 ```{.txt .text title="Terminal"}
530 [[0 "a"]
531 [1 "b"]
532 [2 "c"]
533 [3 "d"]]
534 ```
535 !!! success "Conclusion: Successfully converted to Numpy."
536 </div>
538 ```{.py .python linenums="1" title="Example 4: List"}
539 >>> new_df = convert_dataframe(df, "list")
540 >>> print(type(new_df))
541 >>> print(new_df)
542 ```
543 <div class="result" markdown>
544 ```{.txt .text title="Terminal"}
545 <class 'list'>
546 ```
547 ```{.txt .text title="Terminal"}
548 [
549 [0, "a"],
550 [1, "b"],
551 [2, "c"],
552 [3, "d"],
553 ]
554 ```
555 !!! success "Conclusion: Successfully converted to List."
556 </div>
558 ```{.py .python linenums="1" title="Example 5: Convert to single column as list"}
559 >>> new_df = convert_dataframe(df.select("b"), "flat_list")
560 >>> print(type(new_df))
561 >>> print(new_df)
562 ```
563 <div class="result" markdown>
564 ```{.txt .text title="Terminal"}
565 <class 'list'>
566 ```
567 ```{.txt .text title="Terminal"}
568 ["a", "b", "c", "d"]
569 ```
570 !!! success "Conclusion: Successfully converted to flat List."
571 </div>
573 ```{.py .python linenums="1" title="Example 6: Invalid return type"}
574 >>> convert_dataframe(df, "invalid")
575 ```
576 <div class="result" markdown>
577 ```{.txt .text title="Terminal"}
578 ValueError: Unknown return type: 'invalid'.
579 Must be one of: ['pd', 'ps', 'np', 'list'].
580 For more info, check the `constants` module.
581 ```
582 !!! failure "Conclusion: Invalid return type."
583 </div>
585 ??? tip "See Also"
586 - [`toolbox_pyspark.constants`][toolbox_pyspark.constants]
587 """
588 if return_type in VALID_PYSPARK_DATAFRAME_NAMES:
589 return dataframe
590 elif return_type in VALID_PANDAS_DATAFRAME_NAMES:
591 return dataframe.toPandas()
592 elif return_type in VALID_NUMPY_ARRAY_NAMES:
593 return dataframe.toPandas().values # type:ignore
594 elif return_type in VALID_LIST_OBJECT_NAMES:
595 if "flat" in return_type:
596 return flatten(dataframe.toPandas().values.tolist()) # type:ignore
597 else:
598 return dataframe.toPandas().values.tolist() # type:ignore
599 else:
600 raise ValueError(
601 f"Unknown return type: '{return_type}'.\n"
602 f"Must be one of: {['pd', 'ps', 'np', 'list']}.\n"
603 f"For more info, check the `constants` module."
604 )
607@typechecked
608def update_nullability(
609 dataframe: psDataFrame,
610 columns: Optional[Union[str, str_collection]] = None,
611 nullable: bool = True,
612) -> psDataFrame:
613 """
614 !!! note "Summary"
615 Update the nullability of specified columns in a PySpark DataFrame.
617 ???+ abstract "Details"
618 This function updates the nullability of the specified columns in a PySpark DataFrame. If no columns are specified, it updates the nullability of all columns.
620 Params:
621 dataframe (psDataFrame):
622 The input PySpark DataFrame.
623 columns (Optional[Union[str, str_collection]], optional):
624 The columns for which to update nullability. If not provided, all columns will be updated.<br>
625 Defaults to `#!py None`.
626 nullable (bool, optional):
627 Whether to set the columns as nullable or not.<br>
628 Defaults to `#!py True`.
630 Raises:
631 TypeError:
632 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
633 ColumnDoesNotExistError:
634 If any of the `#!py columns` do not exist within `#!py dataframe.columns`.
636 Returns:
637 (psDataFrame):
638 The updated DataFrame with the specified columns' nullability updated.
640 ???+ example "Examples"
642 ```{.py .python linenums="1" title="Set up"}
643 >>> # Imports
644 >>> import pandas as pd
645 >>> from pyspark.sql import SparkSession
646 >>> from toolbox_pyspark.cleaning import update_nullability
647 >>>
648 >>> # Instantiate Spark
649 >>> spark = SparkSession.builder.getOrCreate()
650 >>>
651 >>> # Create data
652 >>> df = spark.createDataFrame(
653 ... pd.DataFrame(
654 ... {
655 ... "a": [1, 2, 3, 4],
656 ... "b": ["a", "b", "c", "d"],
657 ... "c": [1.1, 2.2, 3.3, 4.4],
658 ... }
659 ... )
660 ... )
661 >>>
662 >>> # Check
663 >>> df.show()
664 >>> print(df.schema)
665 >>> df.printSchema()
666 ```
667 <div class="result" markdown>
668 ```{.txt .text title="Terminal"}
669 +---+---+-----+
670 | a | b | c |
671 +---+---+-----+
672 | 1 | a | 1.1 |
673 | 2 | b | 2.2 |
674 | 3 | c | 3.3 |
675 | 4 | d | 4.4 |
676 +---+---+-----+
677 ```
678 ```{.sh .shell title="Terminal"}
679 StructType(
680 [
681 StructField("a", LongType(), True),
682 StructField("b", StringType(), True),
683 StructField("c", DoubleType(), True),
684 ]
685 )
686 ```
687 ```{.txt .text title="Terminal"}
688 root
689 |-- a: long (nullable = true)
690 |-- b: string (nullable = true)
691 |-- c: double (nullable = true)
692 ```
693 </div>
695 ```{.py .python linenums="1" title="Example 1: Update nullability of all columns"}
696 >>> new_df = update_nullability(df, nullable=False)
697 >>> new_df.printSchema()
698 ```
699 <div class="result" markdown>
700 ```{.txt .text title="Terminal"}
701 root
702 |-- a: long (nullable = false)
703 |-- b: string (nullable = false)
704 |-- c: double (nullable = false)
705 ```
706 !!! success "Conclusion: Successfully updated nullability of all columns."
707 </div>
709 ```{.py .python linenums="1" title="Example 2: Update nullability of specific columns"}
710 >>> new_df = update_nullability(df, columns=["a", "c"], nullable=False)
711 >>> new_df.printSchema()
712 ```
713 <div class="result" markdown>
714 ```{.txt .text title="Terminal"}
715 root
716 |-- a: long (nullable = false)
717 |-- b: string (nullable = true)
718 |-- c: double (nullable = false)
719 ```
720 !!! success "Conclusion: Successfully updated nullability of specific columns."
721 </div>
723 ```{.py .python linenums="1" title="Example 3: Column does not exist"}
724 >>> update_nullability(df, columns="d", nullable=False)
725 ```
726 <div class="result" markdown>
727 ```{.txt .text title="Terminal"}
728 ColumnDoesNotExistError: Column 'd' does not exist in the DataFrame.
729 Try one of: ["a", "b", "c"]
730 ```
731 !!! failure "Conclusion: Column does not exist."
732 </div>
734 ??? success "Credit"
735 All credit goes to: https://stackoverflow.com/questions/46072411/can-i-change-the-nullability-of-a-column-in-my-spark-dataframe#answer-51821437.
737 ??? tip "See Also"
738 - [`toolbox_pyspark.checks.assert_columns_exists()`][toolbox_pyspark.checks.assert_columns_exists]
739 """
740 columns = get_columns(dataframe, columns)
741 assert_columns_exists(dataframe=dataframe, columns=columns)
742 schema: T.StructType = dataframe.schema
743 for struct_field in schema:
744 if struct_field.name in columns:
745 struct_field.nullable = nullable
746 return dataframe.sparkSession.createDataFrame(data=dataframe.rdd, schema=dataframe.schema)
749# ---------------------------------------------------------------------------- #
750# Trimming ####
751# ---------------------------------------------------------------------------- #
754@typechecked
755def trim_spaces_from_column(
756 dataframe: psDataFrame,
757 column: str,
758) -> psDataFrame:
759 """
760 !!! note "Summary"
761 For a given list of columns, trim all of the excess white spaces from them.
763 Params:
764 dataframe (psDataFrame):
765 The DataFrame to update.
766 column (str):
767 The column to clean.
769 Raises:
770 TypeError:
771 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
772 ColumnDoesNotExistError:
773 If the `#!py column` does not exist within `#!py dataframe.columns`.
775 Returns:
776 (psDataFrame):
777 The updated Data Frame.
779 ???+ example "Examples"
781 ```{.py .python linenums="1" title="Set up"}
782 >>> # Imports
783 >>> import pandas as pd
784 >>> from pyspark.sql import SparkSession
785 >>> from toolbox_pyspark.cleaning import trim_spaces_from_column
786 >>>
787 >>> # Instantiate Spark
788 >>> spark = SparkSession.builder.getOrCreate()
789 >>>
790 >>> # Create data
791 >>> df = spark.createDataFrame(
792 ... pd.DataFrame(
793 ... {
794 ... "a": [1, 2, 3, 4],
795 ... "b": ["a", "b", "c", "d"],
796 ... "c": ["1 ", "1 ", "1 ", "1 "],
797 ... "d": [" 2", " 2", " 2", " 2"],
798 ... "e": [" 3 ", " 3 ", " 3 ", " 3 "],
799 ... }
800 ... )
801 ... )
802 >>>
803 >>> # Check
804 ```{.py .python linenums="1" title="Check"}
805 >>> df.show()
806 ```
807 <div class="result" markdown>
808 ```{.txt .text title="Terminal"}
809 +---+---+------+------+---------+
810 | a | b | c | d | e |
811 +---+---+------+------+---------+
812 | 1 | a | 1 | 2 | 3 |
813 | 2 | b | 1 | 2 | 3 |
814 | 3 | c | 1 | 2 | 3 |
815 | 4 | d | 1 | 2 | 3 |
816 +---+---+------+------+---------+
817 ```
818 </div>
820 ```{.py .python linenums="1" title="Example 1: Trim column"}
821 >>> trim_spaces_from_column(df, "c").show()
822 ```
823 <div class="result" markdown>
824 ```{.txt .text title="Terminal"}
825 +---+---+---+------+--------+
826 | a | b | c | d | e |
827 +---+---+---+------+--------+
828 | 1 | a | 1 | 2 | 2 |
829 | 2 | b | 1 | 2 | 2 |
830 | 3 | c | 1 | 2 | 2 |
831 | 4 | d | 1 | 2 | 2 |
832 +---+---+---+------+--------+
833 ```
834 !!! success "Conclusion: Successfully trimmed the `c` column."
835 </div>
837 ```{.py .python linenums="1" title="Example 2: Invalid column"}
838 >>> trim_spaces_from_column(df, "f")
839 ```
840 <div class="result" markdown>
841 ```{.txt .text title="Terminal"}
842 ColumnDoesNotExistError: Column 'f' does not exist in the DataFrame.
843 Try one of: ["a", "b", "c", "d", "e"]
844 ```
845 !!! failure "Conclusion: Column does not exist."
846 </div>
848 ??? info "Notes"
850 ???+ info "Justification"
851 - The main reason for this function is because when the data was exported from the Legacy WMS's, there's a _whole bunch_ of trailing spaces in the data fields. My theory is because of the data type in the source system. That is, if it's originally stored as 'char' type, then it will maintain the data length. This issues doesn't seem to be affecting the `varchar` fields. Nonetheless, this function will strip the white spaces from the data; thus reducing the total size of the data stored therein.
852 - The reason why it is necessary to write this out as a custom function, instead of using the [`F.trim()`][trim] function from the PySpark library directly is due to the deficiencies of the Java [`trim()`](https://docs.oracle.com/javase/8/docs/api/java/lang/String.html#trim) function. More specifically, there are 13 different whitespace characters available in our ascii character set. The Java function only cleans about 6 of these. So therefore, we define this function which iterates through all 13 whitespace characters, and formats them in to a regular expression, to then parse it to the [`F.regexp_replace()`][regexp_replace] function to be replaced with an empty string (`""`). Therefore, all 13 characters will be replaced, the strings will be cleaned and trimmed ready for further processing.
854 [trim]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.trim.html
855 [regexp_replace]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.regexp_replace.html
857 ???+ info "Regex definition: `^[...]+|[...]+$`"
858 - 1st Alternative: '^[...]+'
859 - '^' asserts position at start of a line
860 - Match a single character present in the list below '[...]'
861 - '+' matches the previous token between one and unlimited times, as many times as possible, giving back as needed (greedy)
862 - matches a single character in the list ' ' (case sensitive)
863 - matches the character ' ' with index 160 (A0 or 240) literally (case sensitive)
864 - matches the character ' ' with index 32 (20 or 40) literally (case sensitive)
865 - ... (repeat for all whitespace characters)
866 - 2nd Alternative: '[...]+$'
867 - Match a single character present in the list below '[...]'
868 - '+' matches the previous token between one and unlimited times, as many times as possible, giving back as needed (greedy)
869 - matches a single character in the list ' ' (case sensitive)
870 - matches the character ' ' with index 160 (A0 or 240) literally (case sensitive)
871 - matches the character ' ' with index 32 (20 or 40) literally (case sensitive)
872 - ... (repeat for all whitespace characters)
873 - '$' asserts position at the end of a line
875 ??? tip "See Also"
876 - [`trim_spaces_from_columns()`][toolbox_pyspark.cleaning.trim_spaces_from_columns]
877 - [`ALL_WHITESPACE_CHARACTERS`][toolbox_pyspark.constants.ALL_WHITESPACE_CHARACTERS]
878 """
879 assert_column_exists(dataframe=dataframe, column=column, match_case=True)
880 space_chars: str_list = [chr(char.ascii) for char in WHITESPACES]
881 regexp: str = f"^[{''.join(space_chars)}]+|[{''.join(space_chars)}]+$"
882 return dataframe.withColumn(column, F.regexp_replace(column, regexp, ""))
885@typechecked
886def trim_spaces_from_columns(
887 dataframe: psDataFrame,
888 columns: Optional[Union[str, str_collection]] = None,
889) -> psDataFrame:
890 """
891 !!! note "Summary"
892 For a given list of columns, trim all of the excess white spaces from them.
894 Params:
895 dataframe (psDataFrame):
896 The DataFrame to be updated.
897 columns (Optional[Union[str, str_collection]], optional):
898 The list of columns to be updated.
899 Must be valid columns on `dataframe`.
900 If given as a string, will be executed as a single column (ie. one-element long list).
901 If not given, will apply to all columns in `dataframe` which have the data-type `string`.
902 It is also possible to parse the values `#!py "all"` or `#!py "all_string"`, which will also apply this function to all columns in `dataframe` which have the data-type `string`.<br>
903 Defaults to `#!py None`.
905 Returns:
906 (psDataFrame):
907 The updated DataFrame.
909 ???+ example "Examples"
911 ```{.py .python linenums="1" title="Set up"}
912 >>> # Imports
913 >>> import pandas as pd
914 >>> from pyspark.sql import SparkSession
915 >>> from toolbox_pyspark.cleaning import trim_spaces_from_columns
916 >>>
917 >>> # Instantiate Spark
918 >>> spark = SparkSession.builder.getOrCreate()
919 >>>
920 >>> # Create data
921 >>> df = spark.createDataFrame(
922 ... pd.DataFrame(
923 ... {
924 ... "a": [1, 2, 3, 4],
925 ... "b": ["a", "b", "c", "d"],
926 ... "c": ["1 ", "1 ", "1 ", "1 "],
927 ... "d": [" 2", " 2", " 2", " 2"],
928 ... "e": [" 3 ", " 3 ", " 3 ", " 3 "],
929 ... }
930 ... )
931 ... )
932 >>>
933 >>> # Check
934 ```{.py .python linenums="1" title="Check"}
935 >>> df.show()
936 ```
937 <div class="result" markdown>
938 ```{.txt .text title="Terminal"}
939 +---+---+------+------+---------+
940 | a | b | c | d | e |
941 +---+---+------+------+---------+
942 | 1 | a | 1 | 2 | 3 |
943 | 2 | b | 1 | 2 | 3 |
944 | 3 | c | 1 | 2 | 3 |
945 | 4 | d | 1 | 2 | 3 |
946 +---+---+------+------+---------+
947 ```
948 </div>
950 ```{.py .python linenums="1" title="Example 1: One column as list"}
951 >>> trim_spaces_from_columns(df, ["c"]).show()
952 ```
953 <div class="result" markdown>
954 ```{.txt .text title="Terminal"}
955 +---+---+---+------+---------+
956 | a | b | c | d | e |
957 +---+---+---+------+---------+
958 | 1 | a | 1 | 2 | 3 |
959 | 2 | b | 1 | 2 | 3 |
960 | 3 | c | 1 | 2 | 3 |
961 | 4 | d | 1 | 2 | 3 |
962 +---+---+---+------+---------+
963 ```
964 !!! success "Conclusion: Successfully trimmed the `c` column."
965 </div>
967 ```{.py .python linenums="1" title="Example 2: Single column as string"}
968 >>> trim_spaces_from_columns(df, "d").show()
969 ```
970 <div class="result" markdown>
971 ```{.txt .text title="Terminal"}
972 +---+---+------+---+---------+
973 | a | b | c | d | e |
974 +---+---+------+---+---------+
975 | 1 | a | 1 | 2 | 3 |
976 | 2 | b | 1 | 2 | 3 |
977 | 3 | c | 1 | 2 | 3 |
978 | 4 | d | 1 | 2 | 3 |
979 +---+---+------+---+---------+
980 ```
981 !!! success "Conclusion: Successfully trimmed the `d` column."
982 </div>
984 ```{.py .python linenums="1" title="Example 3: Multiple columns"}
985 >>> trim_spaces_from_columns(df, ["c", "d"]).show()
986 ```
987 <div class="result" markdown>
988 ```{.txt .text title="Terminal"}
989 +---+---+---+---+---------+
990 | a | b | c | d | e |
991 +---+---+---+---+---------+
992 | 1 | a | 1 | 2 | 3 |
993 | 2 | b | 1 | 2 | 3 |
994 | 3 | c | 1 | 2 | 3 |
995 | 4 | d | 1 | 2 | 3 |
996 +---+---+---+---+---------+
997 ```
998 !!! success "Conclusion: Successfully trimmed the `c` and `d` columns."
999 </div>
1001 ```{.py .python linenums="1" title="Example 4: All columns"}
1002 >>> trim_spaces_from_columns(df, "all").show()
1003 ```
1004 <div class="result" markdown>
1005 ```{.txt .text title="Terminal"}
1006 +---+---+---+---+---+
1007 | a | b | c | d | e |
1008 +---+---+---+---+---+
1009 | 1 | a | 1 | 2 | 3 |
1010 | 2 | b | 1 | 2 | 3 |
1011 | 3 | c | 1 | 2 | 3 |
1012 | 4 | d | 1 | 2 | 3 |
1013 +---+---+---+---+---+
1014 ```
1015 !!! success "Conclusion: Successfully trimmed all columns."
1016 </div>
1018 ```{.py .python linenums="1" title="Example 5: Default config"}
1019 >>> trim_spaces_from_columns(df).show()
1020 ```
1021 <div class="result" markdown>
1022 ```{.txt .text title="Terminal"}
1023 +---+---+---+---+---+
1024 | a | b | c | d | e |
1025 +---+---+---+---+---+
1026 | 1 | a | 1 | 2 | 3 |
1027 | 2 | b | 1 | 2 | 3 |
1028 | 3 | c | 1 | 2 | 3 |
1029 | 4 | d | 1 | 2 | 3 |
1030 +---+---+---+---+---+
1031 ```
1032 !!! success "Conclusion: Successfully trimmed all columns."
1033 </div>
1035 ```{.py .python linenums="1" title="Example 6: Invalid column"}
1036 >>> trim_spaces_from_columns(df, ["f"])
1037 ```
1038 <div class="result" markdown>
1039 ```{.txt .text title="Terminal"}
1040 ColumnDoesNotExistError: Columns ['f'] do not exist in the DataFrame.
1041 Try one of: ["a", "b", "c", "d", "e"]
1042 ```
1043 !!! failure "Conclusion: Columns do not exist."
1044 </div>
1046 ???+ info "Notes"
1048 ???+ info "Justification"
1049 - The main reason for this function is because when the data was exported from the Legacy WMS's, there's a _whole bunch_ of trailing spaces in the data fields. My theory is because of the data type in the source system. That is, if it's originally stored as 'char' type, then it will maintain the data length. This issues doesn't seem to be affecting the `varchar` fields. Nonetheless, this function will strip the white spaces from the data; thus reducing the total size of the data stored therein.
1050 - The reason why it is necessary to write this out as a custom function, instead of using the [`F.trim()`][trim] function from the PySpark library directly is due to the deficiencies of the Java [`trim()`](https://docs.oracle.com/javase/8/docs/api/java/lang/String.html#trim) function. More specifically, there are 13 different whitespace characters available in our ascii character set. The Java function only cleans about 6 of these. So therefore, we define this function which iterates through all 13 whitespace characters, and formats them in to a regular expression, to then parse it to the [`F.regexp_replace()`][regexp_replace] function to be replaced with an empty string (`""`). Therefore, all 13 characters will be replaced, the strings will be cleaned and trimmed ready for further processing.
1051 - The reason why this function exists as a standalone, and does not call [`trim_spaces_from_column()`][toolbox_pyspark.cleaning.trim_spaces_from_column] from within a loop is because [`trim_spaces_from_column()`][toolbox_pyspark.cleaning.trim_spaces_from_column] utilises the [`.withColumn()`][withColumn] method to implement the [`F.regexp_replace()`][regexp_replace] function on columns individually. When implemented iteratively, this process will create huge DAG's for the RDD, and blow out the complexity to a huge extend. Whereas this [`trim_spaces_from_columns()`][toolbox_pyspark.cleaning.trim_spaces_from_columns] function will utilise the [`.withColumns()`][withColumns] method to implement the [`F.regexp_replace()`][regexp_replace] function over all columns at once. This [`.withColumns()`][withColumns] method projects the function down to the underlying dataset in one single execution; not a different execution per column. Therefore, it is more simpler and more efficient.
1053 [withColumn]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumn.html
1054 [withColumns]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumns.html
1055 [trim]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.trim.html
1056 [regexp_replace]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.regexp_replace.html
1058 ???+ info "Regex definition: `^[...]+|[...]+$`"
1059 - 1st Alternative: `^[...]+`
1060 - `^` asserts position at start of a line
1061 - Match a single character present in the list below `[...]`
1062 - `+` matches the previous token between one and unlimited times, as many times as possible, giving back as needed (greedy)
1063 - matches a single character in the list ` ` (case sensitive)
1064 - matches the character ` ` with index 160 (A0 or 240) literally (case sensitive)
1065 - matches the character ` ` with index 32 (20 or 40) literally (case sensitive)
1066 - ... (repeat for all whitespace characters)
1067 - 2nd Alternative: `[...]+$`
1068 - Match a single character present in the list below `[...]`
1069 - `+` matches the previous token between one and unlimited times, as many times as possible, giving back as needed (greedy)
1070 - matches a single character in the list ` ` (case sensitive)
1071 - matches the character ` ` with index 160 (A0 or 240) literally (case sensitive)
1072 - matches the character ` ` with index 32 (20 or 40) literally (case sensitive)
1073 - ... (repeat for all whitespace characters)
1075 ??? tip "See Also"
1076 - [`trim_spaces_from_column()`][toolbox_pyspark.cleaning.trim_spaces_from_column]
1077 - [`ALL_WHITESPACE_CHARACTERS`][toolbox_pyspark.constants.ALL_WHITESPACE_CHARACTERS]
1078 """
1079 columns = get_columns(dataframe, columns)
1080 assert_columns_exists(dataframe=dataframe, columns=columns, match_case=True)
1081 space_chars: str_list = WHITESPACES.to_list("chr") # type:ignore
1082 regexp: str = f"^[{''.join(space_chars)}]+|[{''.join(space_chars)}]+$"
1083 cols_exprs: dict[str, Column] = {col: F.regexp_replace(col, regexp, "") for col in columns}
1084 return dataframe.withColumns(cols_exprs)
1087# ---------------------------------------------------------------------------- #
1088# Applying functions ####
1089# ---------------------------------------------------------------------------- #
1092@typechecked
1093def apply_function_to_column(
1094 dataframe: psDataFrame,
1095 column: str,
1096 function: str = "upper",
1097 *function_args,
1098 **function_kwargs,
1099) -> psDataFrame:
1100 """
1101 !!! note "Summary"
1102 Apply a given PySpark `function` to a single `column` on `dataframe`.
1104 ???+ abstract "Details"
1105 Under the hood, this function will simply call the [`.withColumn()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumn.html) method to apply the function named in `function` from the PySpark [`functions`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html) module.
1106 ```py
1107 return dataframe.withColumn(column, getattr(F, function)(column, *function_args, **function_kwargs))
1108 ```
1110 Params:
1111 dataframe (psDataFrame):
1112 The DataFrame to update.
1113 column (str):
1114 The column to update.
1115 function (str, optional):
1116 The function to execute. Must be a valid function from the PySpark [`functions`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html) module.<br>
1117 Defaults to `#!py "upper"`.
1118 *function_args (Any, optional):
1119 The arguments to push down to the underlying `function`.
1120 **function_kwargs (Any, optional):
1121 The keyword arguments to push down to the underlying `function`.
1123 Returns:
1124 (psDataFrame):
1125 The updated DataFrame.
1127 ???+ example "Examples"
1129 ```{.py .python linenums="1" title="Set up"}
1130 >>> # Imports
1131 >>> import pandas as pd
1132 >>> from pyspark.sql import SparkSession
1133 >>> from toolbox_pyspark.cleaning import apply_function_to_column
1134 >>>
1135 >>> # Instantiate Spark
1136 >>> spark = SparkSession.builder.getOrCreate()
1137 >>>
1138 >>> # Create data
1139 >>> df = spark.createDataFrame(
1140 ... pd.DataFrame(
1141 ... {
1142 ... "a": [0, 1, 2, 3],
1143 ... "b": ["a", "b", "c", "d"],
1144 ... "c": ["c", "c", "c", "c"],
1145 ... "d": ["d", "d", "d", "d"],
1146 ... }
1147 ... )
1148 ... )
1149 >>>
1150 >>> # Check
1151 ```{.py .python linenums="1" title="Check"}
1152 >>> df.show()
1153 ```
1154 <div class="result" markdown>
1155 ```{.txt .text title="Terminal"}
1156 +---+---+---+---+
1157 | a | b | c | d |
1158 +---+---+---+---+
1159 | 0 | a | c | d |
1160 | 1 | b | c | d |
1161 | 2 | c | c | d |
1162 | 3 | d | c | d |
1163 +---+---+---+---+
1164 ```
1165 </div>
1167 ```{.py .python linenums="1" title="Example 1: Default params"}
1168 >>> apply_function_to_column(df, "c").show()
1169 ```
1170 <div class="result" markdown>
1171 ```{.txt .text title="Terminal"}
1172 +---+---+---+---+
1173 | a | b | c | d |
1174 +---+---+---+---+
1175 | 0 | a | C | d |
1176 | 1 | b | C | d |
1177 | 2 | c | C | d |
1178 | 3 | d | C | d |
1179 +---+---+---+---+
1180 ```
1181 !!! success "Conclusion: Successfully applied the `upper` function to the `c` column."
1182 </div>
1184 ```{.py .python linenums="1" title="Example 2: Simple function"}
1185 >>> apply_function_to_column(df, "c", "lower").show()
1186 ```
1187 <div class="result" markdown>
1188 ```{.txt .text title="Terminal"}
1189 +---+---+---+---+
1190 | a | b | c | d |
1191 +---+---+---+---+
1192 | 0 | a | c | d |
1193 | 1 | b | c | d |
1194 | 2 | c | c | d |
1195 | 3 | d | c | d |
1196 +---+---+---+---+
1197 ```
1198 !!! success "Conclusion: Successfully applied the `lower` function to the `c` column."
1199 </div>
1201 ```{.py .python linenums="1" title="Example 3: Complex function, using args"}
1202 >>> apply_function_to_column(df, "d", "lpad", 5, "?").show()
1203 ```
1204 <div class="result" markdown>
1205 ```{.txt .text title="Terminal"}
1206 +---+---+---+-------+
1207 | a | b | c | d |
1208 +---+---+---+-------+
1209 | 0 | a | c | ????d |
1210 | 1 | b | c | ????d |
1211 | 2 | c | c | ????d |
1212 | 3 | d | c | ????d |
1213 +---+---+---+-------+
1214 ```
1215 !!! success "Conclusion: Successfully applied the `lpad` function to the `d` column."
1216 </div>
1218 ```{.py .python linenums="1" title="Example 4: Complex function, using kwargs"}
1219 >>> new_df = apply_function_to_column(
1220 ... dataframe=df,
1221 ... column="d",
1222 ... function="lpad",
1223 ... len=5,
1224 ... pad="?",
1225 ... ).show()
1226 ```
1227 <div class="result" markdown>
1228 ```{.txt .text title="Terminal"}
1229 +---+---+---+-------+
1230 | a | b | c | d |
1231 +---+---+---+-------+
1232 | 0 | a | c | ????d |
1233 | 1 | b | c | ????d |
1234 | 2 | c | c | ????d |
1235 | 3 | d | c | ????d |
1236 +---+---+---+-------+
1237 ```
1238 !!! success "Conclusion: Successfully applied the `lpad` function to the `d` column."
1239 </div>
1241 ```{.py .python linenums="1" title="Example 5: Different complex function, using kwargs"}
1242 >>> new_df = apply_function_to_column(
1243 ... dataframe=df,
1244 ... column="b",
1245 ... function="regexp_replace",
1246 ... pattern="c",
1247 ... replacement="17",
1248 ... ).show()
1249 ```
1250 <div class="result" markdown>
1251 ```{.txt .text title="Terminal"}
1252 +---+----+---+---+
1253 | a | b | c | d |
1254 +---+----+---+---+
1255 | 0 | a | c | d |
1256 | 1 | b | c | d |
1257 | 2 | 17 | c | d |
1258 | 3 | d | c | d |
1259 +---+----+---+---+
1260 ```
1261 !!! success "Conclusion: Successfully applied the `regexp_replace` function to the `b` column."
1262 </div>
1264 ```{.py .python linenums="1" title="Example 6: Part of pipe"}
1265 >>> new_df = df.transform(
1266 ... func=apply_function_to_column,
1267 ... column="d",
1268 ... function="lpad",
1269 ... len=5,
1270 ... pad="?",
1271 ... ).show()
1272 ```
1273 <div class="result" markdown>
1274 ```{.txt .text title="Terminal"}
1275 +---+---+---+-------+
1276 | a | b | c | d |
1277 +---+---+---+-------+
1278 | 0 | a | c | ????d |
1279 | 1 | b | c | ????d |
1280 | 2 | c | c | ????d |
1281 | 3 | d | c | ????d |
1282 +---+---+---+-------+
1283 ```
1284 !!! success "Conclusion: Successfully applied the `lpad` function to the `d` column."
1285 </div>
1287 ```{.py .python linenums="1" title="Example 7: Column name in different case"}
1288 >>> new_df = df.transform(
1289 ... func=apply_function_to_column,
1290 ... column="D",
1291 ... function="upper",
1292 ... ).show()
1293 ```
1294 <div class="result" markdown>
1295 ```{.txt .text title="Terminal"}
1296 +---+---+---+---+
1297 | a | b | c | d |
1298 +---+---+---+---+
1299 | 0 | a | c | D |
1300 | 1 | b | c | D |
1301 | 2 | c | c | D |
1302 | 3 | d | c | D |
1303 +---+---+---+---+
1304 ```
1305 !!! success "Conclusion: Successfully applied the `upper` function to the `D` column."
1306 </div>
1308 ```{.py .python linenums="1" title="Example 8: Invalid column"}
1309 >>> apply_function_to_column(df, "f")
1310 ```
1311 <div class="result" markdown>
1312 ```{.txt .text title="Terminal"}
1313 ColumnDoesNotExistError: Column 'f' does not exist in the DataFrame.
1314 Try one of: ["a", "b", "c", "d"]
1315 ```
1316 !!! failure "Conclusion: Column does not exist."
1317 </div>
1319 ??? info "Notes"
1320 - We have to name the `function` parameter as the full name because when this function is executed as part of a chain (by using the PySpark [`.transform()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.transform.html) method), that one uses the `func` parameter.
1322 ??? tip "See Also"
1323 - [`apply_function_to_columns()`][toolbox_pyspark.cleaning.apply_function_to_columns]
1324 """
1325 assert_column_exists(dataframe, column, False)
1326 return dataframe.withColumn(
1327 colName=column,
1328 col=getattr(F, function)(column, *function_args, **function_kwargs),
1329 )
1332@typechecked
1333def apply_function_to_columns(
1334 dataframe: psDataFrame,
1335 columns: Union[str, str_collection],
1336 function: str = "upper",
1337 *function_args,
1338 **function_kwargs,
1339) -> psDataFrame:
1340 """
1341 !!! note "Summary"
1342 Apply a given PySpark `function` over multiple `columns` on a given `dataframe`.
1344 ???+ abstract "Details"
1345 Under the hood, this function will simply call the [`.withColumns()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumns.html) method to apply the function named in `function` from the PySpark [`functions`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html) module.
1346 ```py
1347 return dataframe.withColumns(
1348 {column: getattr(F, function)(column, *args, **kwargs) for column in columns}
1349 )
1350 ```
1352 Params:
1353 dataframe (psDataFrame):
1354 The DataFrame to update.
1355 columns (Union[str, str_collection]):
1356 The columns to update.
1357 function (str, optional):
1358 The function to use.<br>
1359 Defaults to `#!py "upper"`.
1361 Returns:
1362 (psDataFrame):
1363 The updated DataFrame.
1365 ???+ example "Examples"
1367 ```{.py .python linenums="1" title="Set up"}
1368 >>> # Imports
1369 >>> import pandas as pd
1370 >>> from pyspark.sql import SparkSession
1371 >>> from toolbox_pyspark.cleaning import apply_function_to_columns
1372 >>>
1373 >>> # Instantiate Spark
1374 >>> spark = SparkSession.builder.getOrCreate()
1375 >>>
1376 >>> # Create data
1377 >>> df = spark.createDataFrame(
1378 ... pd.DataFrame(
1379 ... {
1380 ... "a": [0, 1, 2, 3],
1381 ... "b": ["a", "b", "c", "d"],
1382 ... "c": ["c", "c", "c", "c"],
1383 ... "d": ["d", "d", "d", "d"],
1384 ... }
1385 ... )
1386 ... )
1387 >>>
1388 >>> # Check
1389 ```{.py .python linenums="1" title="Check"}
1390 >>> df.show()
1391 ```
1392 <div class="result" markdown>
1393 ```{.txt .text title="Terminal"}
1394 +---+---+---+---+
1395 | a | b | c | d |
1396 +---+---+---+---+
1397 | 0 | a | c | d |
1398 | 1 | b | c | d |
1399 | 2 | c | c | d |
1400 | 3 | d | c | d |
1401 +---+---+---+---+
1402 ```
1403 </div>
1405 ```{.py .python linenums="1" title="Example 1: Default params"}
1406 >>> apply_function_to_columns(df, ["b", "c"]).show()
1407 ```
1408 <div class="result" markdown>
1409 ```{.txt .text title="Terminal"}
1410 +---+---+---+---+
1411 | a | b | c | d |
1412 +---+---+---+---+
1413 | 0 | A | C | d |
1414 | 1 | B | C | d |
1415 | 2 | C | C | d |
1416 | 3 | D | C | d |
1417 +---+---+---+---+
1418 ```
1419 !!! success "Conclusion: Successfully applied the `upper` function to the `b` and `c` columns."
1420 </div>
1422 ```{.py .python linenums="1" title="Example 2: Simple function"}
1423 >>> apply_function_to_columns(df, ["b", "c"], "lower").show()
1424 ```
1425 <div class="result" markdown>
1426 ```{.txt .text title="Terminal"}
1427 +---+---+---+---+
1428 | a | b | c | d |
1429 +---+---+---+---+
1430 | 0 | a | c | d |
1431 | 1 | b | c | d |
1432 | 2 | c | c | d |
1433 | 3 | d | c | d |
1434 +---+---+---+---+
1435 ```
1436 !!! success "Conclusion: Successfully applied the `lower` function to the `b` and `c` columns."
1437 </div>
1439 ```{.py .python linenums="1" title="Example 3: Complex function, with args"}
1440 >>> apply_function_to_columns(df, ["b", "c", "d"], "lpad", 5, "?").show()
1441 ```
1442 <div class="result" markdown>
1443 ```{.txt .text title="Terminal"}
1444 +---+-------+-------+-------+
1445 | a | b | c | d |
1446 +---+-------+-------+-------+
1447 | 0 | ????a | ????c | ????d |
1448 | 1 | ????b | ????c | ????d |
1449 | 2 | ????c | ????c | ????d |
1450 | 3 | ????d | ????c | ????d |
1451 +---+-------+-------+-------+
1452 ```
1453 !!! success "Conclusion: Successfully applied the `lpad` function to the `b`, `c` and `d` columns."
1454 </div>
1456 ```{.py .python linenums="1" title="Example 4: Complex function, with kwargs"}
1457 >>> apply_function_to_columns(
1458 ... dataframe=df,
1459 ... columns=["b", "c", "d"],
1460 ... function="lpad",
1461 ... len=5,
1462 ... pad="?",
1463 ... ).show()
1464 ```
1465 <div class="result" markdown>
1466 ```{.txt .text title="Terminal"}
1467 +---+-------+-------+-------+
1468 | a | b | c | d |
1469 +---+-------+-------+-------+
1470 | 0 | ????a | ????c | ????d |
1471 | 1 | ????b | ????c | ????d |
1472 | 2 | ????c | ????c | ????d |
1473 | 3 | ????d | ????c | ????d |
1474 +---+-------+-------+-------+
1475 ```
1476 !!! success "Conclusion: Successfully applied the `lpad` function to the `b`, `c` and `d` columns."
1477 </div>
1479 ```{.py .python linenums="1" title="Example 5: Different complex function, with kwargs"}
1480 >>> apply_function_to_columns(
1481 ... dataframe=df,
1482 ... columns=["b", "c", "d"],
1483 ... function="regexp_replace",
1484 ... pattern="c",
1485 ... replacement="17",
1486 ... ).show()
1487 ```
1488 <div class="result" markdown>
1489 ```{.txt .text title="Terminal"}
1490 +---+----+----+---+
1491 | a | b | c | d |
1492 +---+----+----+---+
1493 | 0 | a | 17 | d |
1494 | 1 | b | 17 | d |
1495 | 2 | 17 | 17 | d |
1496 | 3 | d | 17 | d |
1497 +---+----+----+---+
1498 ```
1499 !!! success "Conclusion: Successfully applied the `regexp_replace` function to the `b`, `c` and `d` columns."
1500 </div>
1502 ```{.py .python linenums="1" title="Example 6: Part of pipe"}
1503 >>> df.transform(
1504 ... func=apply_function_to_columns,
1505 ... columns=["b", "c", "d"],
1506 ... function="lpad",
1507 ... len=5,
1508 ... pad="?",
1509 ... ).show()
1510 ```
1511 <div class="result" markdown>
1512 ```{.txt .text title="Terminal"}
1513 +---+-------+-------+-------+
1514 | a | b | c | d |
1515 +---+-------+-------+-------+
1516 | 0 | ????a | ????c | ????d |
1517 | 1 | ????b | ????c | ????d |
1518 | 2 | ????c | ????c | ????d |
1519 | 3 | ????d | ????c | ????d |
1520 +---+-------+-------+-------+
1521 ```
1522 !!! success "Conclusion: Successfully applied the `lpad` function to the `b`, `c` and `d` columns."
1523 </div>
1525 ```{.py .python linenums="1" title="Example 7: Column name in different case"}
1526 >>> apply_function_to_columns(
1527 ... dataframe=df,
1528 ... columns=["B", "c", "D"],
1529 ... function="upper",
1530 ... ).show()
1531 ```
1532 <div class="result" markdown>
1533 ```{.txt .text title="Terminal"}
1534 +---+---+---+---+
1535 | a | b | c | d |
1536 +---+---+---+---+
1537 | 0 | A | C | D |
1538 | 1 | B | C | D |
1539 | 2 | C | C | D |
1540 | 3 | D | C | D |
1541 +---+---+---+---+
1542 ```
1543 !!! success "Conclusion: Successfully applied the `upper` function to the `B`, `c` and `D` columns."
1544 </div>
1546 ```{.py .python linenums="1" title="Example 8: Invalid columns"}
1547 >>> apply_function_to_columns(df, ["f"])
1548 ```
1549 <div class="result" markdown>
1550 ```{.txt .text title="Terminal"}
1551 ColumnDoesNotExistError: Columns ['f'] do not exist in the DataFrame.
1552 Try one of: ["a", "b", "c", "d"]
1553 ```
1554 !!! failure "Conclusion: Columns do not exist."
1555 </div>
1557 ??? info "Notes"
1558 - We have to name the `function` parameter as the full name because when this function is executed as part of a chain (by using the PySpark [`.transform()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.transform.html) method), that one uses the `func` parameter.
1560 ??? tip "See Also"
1561 - [`apply_function_to_column()`][toolbox_pyspark.cleaning.apply_function_to_column]
1562 """
1563 columns = get_columns(dataframe, columns)
1564 assert_columns_exists(dataframe, columns, False)
1565 return dataframe.withColumns(
1566 {
1567 column: getattr(F, function)(column, *function_args, **function_kwargs)
1568 for column in columns
1569 }
1570 )
1573# ---------------------------------------------------------------------------- #
1574# Clean across tables ####
1575# ---------------------------------------------------------------------------- #
1578@typechecked
1579def drop_matching_rows(
1580 left_table: psDataFrame,
1581 right_table: psDataFrame,
1582 on_keys: Union[str, str_collection],
1583 join_type: VALID_PYAPARK_JOIN_TYPES = "left_anti",
1584 where_clause: Optional[str] = None,
1585) -> psDataFrame:
1586 """
1587 !!! note "Summary"
1588 This function is designed to _remove_ any rows on the `left_table` which _are_ existing on the `right_table`. That's why the `join_type` should always be `left_anti`.
1590 ???+ abstract "Details"
1591 The intention behind this function is originating from the `Accumulation` layer in the BigDaS environment. The process on this table layer is to only _insert_ rows from the `left_table` to the `right_table` with are **not existing** on the `right_table`. We include the `where_clause` here so that we can control any updated rows. Specifically, we check the `editdatetime` field between the `left_table` and the `right_table`, and any record on the `left_table` where the `editdatetime` field is _greater than_ the `editdatetime` value on the `right_table`, then this row will _remain_ on the `left_table`, and will later be _updated_ on the `right_table`.
1593 It's important to specify here that this function was created to handle the _same table_ between the `left_table` and the `right_table`, which are existing between different layers in the ADLS environment. Logically, it can be used for other purposes (it's generic enough); however, the intention was specifically for cleaning during the data pipeline processes.
1595 Params:
1596 left_table (psDataFrame):
1597 The DataFrame _from which_ you will be deleting the records.
1598 right_table (psDataFrame):
1599 The DataFrame _from which_ to check for existing records. If any matching `on_keys` are existing on both the `right_table` and the `left_table`, then those records will be deleted from the `left_table`.
1600 on_keys (Union[str, str_collection]):
1601 The matching keys between the two tables. These keys (aka columns) must be existing on both the `left_table` and the `right_table`.
1602 join_type (VALID_PYAPARK_JOIN_TYPES, optional):
1603 The type of join to use for this process. For the best performance, keep it as the default value.<br>
1604 Defaults to `#!py "left_anti"`.
1605 where_clause (Optional[str], optional):
1606 Any additional conditions to place on this join. Any records which **match** this condition will be **kept** on the `left_table`.<br>
1607 Defaults to `#!py None`.
1609 Returns:
1610 (psDataFrame):
1611 The `left_table` after it has had it's rows deleted and cleaned by the `right_table`.
1613 ???+ example "Examples"
1615 ```{.py .python linenums="1" title="Set up"}
1616 >>> # Imports
1617 >>> import pandas as pd
1618 >>> from pyspark.sql import SparkSession
1619 >>> from toolbox_pyspark.cleaning import drop_matching_rows
1620 >>>
1621 >>> # Instantiate Spark
1622 >>> spark = SparkSession.builder.getOrCreate()
1623 >>>
1624 >>> # Create data
1625 >>> left = spark.createDataFrame(
1626 ... pd.DataFrame(
1627 ... {
1628 ... "a": [0, 1, 2, 3],
1629 ... "b": ["a", "b", "c", "d"],
1630 ... "c": [1, 1, 1, 1],
1631 ... "d": ["2", "2", "2", "2"],
1632 ... "n": ["a", "b", "c", "d"],
1633 ... }
1634 ... )
1635 ... )
1636 ... right = left.where("a in ("1", "2")")
1637 >>>
1638 >>> # Check
1639 ```{.py .python linenums="1" title="Check"}
1640 >>> left.show()
1641 >>> right.show()
1642 ```
1643 <div class="result" markdown>
1644 ```{.txt .text title="Terminal"}
1645 +---+---+---+---+---+
1646 | a | b | c | d | n |
1647 +---+---+---+---+---+
1648 | 1 | a | 1 | 2 | a |
1649 | 2 | b | 1 | 2 | b |
1650 | 3 | c | 1 | 2 | c |
1651 | 4 | d | 1 | 2 | d |
1652 +---+---+---+---+---+
1653 ```
1654 ```{.txt .text title="Terminal"}
1655 +---+---+---+---+---+
1656 | a | b | c | d | n |
1657 +---+---+---+---+---+
1658 | 1 | a | 1 | 2 | a |
1659 | 2 | b | 1 | 2 | b |
1660 +---+---+---+---+---+
1661 ```
1662 </div>
1664 ```{.py .python linenums="1" title="Example 1: Single column"}
1665 >>> drop_matching_rows(
1666 ... left_table=left,
1667 ... right_table=right,
1668 ... on_keys=["a"],
1669 ... ).show()
1670 ```
1671 <div class="result" markdown>
1672 ```{.txt .text title="Terminal"}
1673 +---+---+---+---+---+
1674 | a | b | c | d | n |
1675 +---+---+---+---+---+
1676 | 3 | c | 1 | 2 | c |
1677 | 4 | d | 1 | 2 | d |
1678 +---+---+---+---+---+
1679 ```
1680 !!! success "Conclusion: Successfully removed the records from the `left_table` which are existing on the `right_table`."
1681 </div>
1683 ```{.py .python linenums="1" title="Example 2: Single column as string"}
1684 >>> left.transform(
1685 ... drop_matching_rows,
1686 ... right_table=right,
1687 ... on_keys="a",
1688 ... ).show()
1689 ```
1690 <div class="result" markdown>
1691 ```{.txt .text title="Terminal"}
1692 +---+---+---+---+---+
1693 | a | b | c | d | n |
1694 +---+---+---+---+---+
1695 | 3 | c | 1 | 2 | c |
1696 | 4 | d | 1 | 2 | d |
1697 +---+---+---+---+---+
1698 ```
1699 !!! success "Conclusion: Successfully removed the records from the `left_table` which are existing on the `right_table`."
1700 </div>
1702 ```{.py .python linenums="1" title="Example 3: Multiple key columns"}
1703 >>> drop_matching_rows(
1704 ... left_table=left,
1705 ... right_table=right,
1706 ... on_keys=["a", "b"],
1707 ... ).show()
1708 ```
1709 <div class="result" markdown>
1710 ```{.txt .text title="Terminal"}
1711 +---+---+---+---+---+
1712 | a | b | c | d | n |
1713 +---+---+---+---+---+
1714 | 3 | c | 1 | 2 | c |
1715 | 4 | d | 1 | 2 | d |
1716 +---+---+---+---+---+
1717 ```
1718 !!! success "Conclusion: Successfully removed the records from the `left_table` which are existing on the `right_table`."
1719 </div>
1721 ```{.py .python linenums="1" title="Example 4: Including `where` clause"}
1722 >>> drop_matching_rows(
1723 ... left_table=left,
1724 ... right_table=right,
1725 ... on_keys=["a"],
1726 ... where_clause="n <> 'd'",
1727 ... ).show()
1728 ```
1729 <div class="result" markdown>
1730 ```{.txt .text title="Terminal"}
1731 +---+---+---+---+---+
1732 | a | b | c | d | n |
1733 +---+---+---+---+---+
1734 | 3 | c | 1 | 2 | c |
1735 +---+---+---+---+---+
1736 ```
1737 !!! success "Conclusion: Successfully removed the records from the `left_table` which are existing on the `right_table` and matched the `where` clause."
1738 </div>
1740 ```{.py .python linenums="1" title="Example 5: Part of pipe"}
1741 >>> left.transform(
1742 ... func=drop_matching_rows,
1743 ... right_table=right,
1744 ... on_keys=["a"],
1745 ... ).show()
1746 ```
1747 <div class="result" markdown>
1748 ```{.txt .text title="Terminal"}
1749 +---+---+---+---+---+
1750 | a | b | c | d | n |
1751 +---+---+---+---+---+
1752 | 3 | c | 1 | 2 | c |
1753 | 4 | d | 1 | 2 | d |
1754 +---+---+---+---+---+
1755 ```
1756 !!! success "Conclusion: Successfully removed the records from the `left_table` which are existing on the `right_table`."
1757 </div>
1759 ```{.py .python linenums="1" title="Example 6: Invalid column"}
1760 >>> drop_matching_rows(
1761 ... left_table=left,
1762 ... right_table=right,
1763 ... on_keys=["f"],
1764 ... )
1765 ```
1766 <div class="result" markdown>
1767 ```{.txt .text title="Terminal"}
1768 ColumnDoesNotExistError: Columns ['f'] do not exist in the DataFrame.
1769 Try one of: ["a", "b", "c", "d", "n"]
1770 ```
1771 !!! failure "Conclusion: Columns do not exist."
1772 </div>
1774 ??? info "Notes"
1775 - The `on_keys` parameter can be a single string or a list of strings. This is to allow for multiple columns to be used as the matching keys.
1776 - The `where_clause` parameter is optional. If specified, then only the records which match the condition will be kept on the `left_table`. It is applied after the join. If not specified, then all records which are existing on the `right_table` will be removed from the `left_table`.
1778 ??? tip "See Also"
1779 - [`assert_columns_exists()`][toolbox_pyspark.checks.assert_columns_exists]
1780 """
1781 on_keys = [on_keys] if is_type(on_keys, str) else on_keys
1782 assert_columns_exists(left_table, on_keys, False)
1783 assert_columns_exists(right_table, on_keys, False)
1784 return (
1785 left_table.alias("left")
1786 .join(right_table.alias("right"), on=on_keys, how=join_type)
1787 .where("1=1" if where_clause is None else where_clause)
1788 .select([f"left.{col}" for col in left_table.columns])
1789 )