Coverage for src/toolbox_pyspark/checks.py: 100%
133 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
1# ============================================================================ #
2# #
3# Title : Checks #
4# Purpose : Check and validate various attributed about a given `pyspark` #
5# `dataframe`. #
6# #
7# ============================================================================ #
10# ---------------------------------------------------------------------------- #
11# #
12# Overview ####
13# #
14# ---------------------------------------------------------------------------- #
17# ---------------------------------------------------------------------------- #
18# Description ####
19# ---------------------------------------------------------------------------- #
22"""
23!!! note "Summary"
24 The `checks` module is used to check and validate various attributed about a given `pyspark` dataframe.
25"""
28# ---------------------------------------------------------------------------- #
29# #
30# Setup ####
31# #
32# ---------------------------------------------------------------------------- #
35# ---------------------------------------------------------------------------- #
36# Imports ####
37# ---------------------------------------------------------------------------- #
39# ## Python StdLib Imports ----
40from dataclasses import dataclass, fields
41from typing import Union
42from warnings import warn
44# ## Python Third Party Imports ----
45from pyspark.sql import (
46 DataFrame as psDataFrame,
47 SparkSession,
48 functions as F,
49 types as T,
50)
51from toolbox_python.checkers import is_type
52from toolbox_python.collection_types import str_collection, str_list
53from typeguard import typechecked
55# ## Local First Party Imports ----
56from toolbox_pyspark.constants import ALL_PYSPARK_TYPES, VALID_PYSPARK_TYPE_NAMES
57from toolbox_pyspark.io import SPARK_FORMATS, read_from_path
58from toolbox_pyspark.utils.exceptions import (
59 ColumnDoesNotExistError,
60 InvalidPySparkDataTypeError,
61 TableDoesNotExistError,
62)
63from toolbox_pyspark.utils.warnings import (
64 ColumnDoesNotExistWarning,
65 InvalidPySparkDataTypeWarning,
66)
69# ---------------------------------------------------------------------------- #
70# Exports ####
71# ---------------------------------------------------------------------------- #
74__all__: str_list = [
75 "ColumnExistsResult",
76 "column_exists",
77 "columns_exists",
78 "assert_column_exists",
79 "assert_columns_exists",
80 "warn_column_missing",
81 "warn_columns_missing",
82 "is_vaid_spark_type",
83 "assert_valid_spark_type",
84 "ColumnsAreTypeResult",
85 "column_is_type",
86 "columns_are_type",
87 "assert_column_is_type",
88 "assert_columns_are_type",
89 "warn_column_invalid_type",
90 "warn_columns_invalid_type",
91 "table_exists",
92 "assert_table_exists",
93 "column_contains_value",
94]
97# ---------------------------------------------------------------------------- #
98# #
99# Functions ####
100# #
101# ---------------------------------------------------------------------------- #
104# ---------------------------------------------------------------------------- #
105# Column Existence ####
106# ---------------------------------------------------------------------------- #
109@dataclass
110class ColumnExistsResult:
111 result: bool
112 missing_cols: str_list
114 def __iter__(self):
115 for field in fields(self):
116 yield getattr(self, field.name)
119@typechecked
120def _columns_exists(
121 dataframe: psDataFrame,
122 columns: str_collection,
123 match_case: bool = False,
124) -> ColumnExistsResult:
125 cols: str_collection = columns if match_case else [col.upper() for col in columns]
126 df_cols: str_list = (
127 dataframe.columns if match_case else [df_col.upper() for df_col in dataframe.columns]
128 )
129 missing_cols: str_list = [col for col in cols if col not in df_cols]
130 return ColumnExistsResult(len(missing_cols) == 0, missing_cols)
133@typechecked
134def column_exists(
135 dataframe: psDataFrame,
136 column: str,
137 match_case: bool = False,
138) -> bool:
139 """
140 !!! note "Summary"
141 Check whether a given `#!py column` exists as a valid column within `#!py dataframe.columns`.
143 Params:
144 dataframe (psDataFrame):
145 The DataFrame to check.
146 column (str):
147 The column to check.
148 match_case (bool, optional):
149 Whether or not to match the string case for the columns.<br>
150 If `#!py False`, will default to: `#!py column.upper()`.<br>
151 Default: `#!py False`.
153 Raises:
154 TypeError:
155 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
157 Returns:
158 (bool):
159 `#!py True` if exists or `#!py False` otherwise.
161 ???+ example "Examples"
163 ```{.py .python linenums="1" title="Set up"}
164 >>> import pandas as pd
165 >>> from pyspark.sql import SparkSession
166 >>> from toolbox_pyspark.checks import column_exists
167 >>> spark = SparkSession.builder.getOrCreate()
168 >>> df = spark.createDataFrame(
169 ... pd.DataFrame(
170 ... {
171 ... "a": [1, 2, 3, 4],
172 ... "b": ["a", "b", "c", "d"],
173 ... }
174 ... )
175 ... )
176 ```
178 ```{.py .python linenums="1" title="Example1: Column Exists"}
179 >>> result = column_exists(df, "a")
180 >>> print(result)
181 ```
182 <div class="result" markdown>
183 ```{.sh .shell title="Terminal"}
184 True
185 ```
186 !!! success "Conclusion: Column exists."
187 </div>
189 ```{.py .python linenums="1" title="Example 2: Column Missing"}
190 >>> result = column_exists(df, "c")
191 >>> print(result)
192 ```
193 <div class="result" markdown>
194 ```{.sh .shell title="Terminal"}
195 False
196 ```
197 !!! failure "Conclusion: Column does not exist."
198 </div>
200 ??? tip "See Also"
201 - [`column_exists`][toolbox_pyspark.checks.column_exists]
202 - [`columns_exists`][toolbox_pyspark.checks.columns_exists]
203 - [`assert_column_exists`][toolbox_pyspark.checks.assert_column_exists]
204 - [`assert_columns_exists`][toolbox_pyspark.checks.assert_columns_exists]
205 - [`warn_column_missing`][toolbox_pyspark.checks.warn_column_missing]
206 - [`warn_columns_missing`][toolbox_pyspark.checks.warn_columns_missing]
207 """
208 return _columns_exists(dataframe, [column], match_case).result
211@typechecked
212def columns_exists(
213 dataframe: psDataFrame,
214 columns: str_collection,
215 match_case: bool = False,
216) -> bool:
217 """
218 !!! note "Summary"
219 Check whether all of the values in `#!py columns` exist in `#!py dataframe.columns`.
221 Params:
222 dataframe (psDataFrame):
223 The DataFrame to check.
224 columns (Union[str_list, str_tuple, str_set]):
225 The columns to check.
226 match_case (bool, optional):
227 Whether or not to match the string case for the columns.<br>
228 If `#!py False`, will default to: `#!py [col.upper() for col in columns]`.<br>
229 Default: `#!py False`.
231 Raises:
232 TypeError:
233 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
235 Returns:
236 (bool):
237 `#!py True` if all columns exist or `#!py False` otherwise.
239 ???+ example "Examples"
241 ```{.py .python linenums="1" title="Set up"}
242 >>> import pandas as pd
243 >>> from pyspark.sql import SparkSession
244 >>> from toolbox_pyspark.checks import columns_exists
245 >>> spark = SparkSession.builder.getOrCreate()
246 >>> df = spark.createDataFrame(
247 ... pd.DataFrame(
248 ... {
249 ... "a": [1, 2, 3, 4],
250 ... "b": ["a", "b", "c", "d"],
251 ... }
252 ... )
253 ... )
254 ```
256 ```{.py .python linenums="1" title="Example 1: Columns exist"}
257 >>> columns_exists(df, ["a", "b"])
258 ```
259 <div class="result" markdown>
260 ```{.sh .shell title="Terminal"}
261 True
262 ```
263 !!! success "Conclusion: All columns exist."
264 </div>
266 ```{.py .python linenums="1" title="Example 2: One column missing"}
267 >>> columns_exists(df, ["b", "d"])
268 ```
269 <div class="result" markdown>
270 ```{.sh .shell title="Terminal"}
271 False
272 ```
273 !!! failure "Conclusion: One column is missing."
274 </div>
276 ```{.py .python linenums="1" title="Example 3: All columns missing"}
277 >>> columns_exists(df, ["c", "d"])
278 ```
279 <div class="result" markdown>
280 ```{.sh .shell title="Terminal"}
281 False
282 ```
283 !!! failure "Conclusion: All columns are missing."
284 </div>
286 ??? tip "See Also"
287 - [`column_exists`][toolbox_pyspark.checks.column_exists]
288 - [`columns_exists`][toolbox_pyspark.checks.columns_exists]
289 - [`assert_column_exists`][toolbox_pyspark.checks.assert_column_exists]
290 - [`assert_columns_exists`][toolbox_pyspark.checks.assert_columns_exists]
291 - [`warn_column_missing`][toolbox_pyspark.checks.warn_column_missing]
292 - [`warn_columns_missing`][toolbox_pyspark.checks.warn_columns_missing]
293 """
294 return _columns_exists(dataframe, columns, match_case).result
297@typechecked
298def assert_column_exists(
299 dataframe: psDataFrame,
300 column: str,
301 match_case: bool = False,
302) -> None:
303 """
304 !!! note "Summary"
305 Check whether a given `#!py column` exists as a valid column within `#!py dataframe.columns`.
307 Params:
308 dataframe (psDataFrame):
309 The DataFrame to check.
310 column (str):
311 The column to check.
312 match_case (bool, optional):
313 Whether or not to match the string case for the columns.<br>
314 If `#!py False`, will default to: `#!py column.upper()`.<br>
315 Default: `#!py True`.
317 Raises:
318 TypeError:
319 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
320 ColumnDoesNotExistError:
321 If the `#!py column` does not exist within `#!py dataframe.columns`.
323 Returns:
324 (type(None)):
325 Nothing is returned. Either an `#!py ColumnDoesNotExistError` exception is raised, or nothing.
327 ???+ example "Examples"
329 ```{.py .python linenums="1" title="Set up"}
330 >>> import pandas as pd
331 >>> from pyspark.sql import SparkSession
332 >>> from toolbox_pyspark.checks import assert_column_exists
333 >>> spark = SparkSession.builder.getOrCreate()
334 >>> df = spark.createDataFrame(
335 ... pd.DataFrame(
336 ... {
337 ... "a": [1,2,3,4],
338 ... "b": ["a", "b", "c", "d"],
339 ... }
340 ... )
341 ... )
342 ```
344 ```{.py .python linenums="1" title="Example 1: No error"}
345 >>> assert_column_exists(df, "a")
346 ```
347 <div class="result" markdown>
348 ```{.sh .shell title="Terminal"}
349 None
350 ```
351 !!! success "Conclusion: Column exists."
352 </div>
354 ```{.py .python linenums="1" title="Example 2: Error raised"}
355 >>> assert_column_exists(df, "c")
356 ```
357 <div class="result" markdown>
358 ```{.txt .text title="Terminal"}
359 ColumnDoesNotExistError: Column "c" does not exist in "dataframe".
360 Try one of: ["a", "b"].
361 ```
362 !!! failure "Conclusion: Column does not exist."
363 </div>
365 ??? tip "See Also"
366 - [`column_exists`][toolbox_pyspark.checks.column_exists]
367 - [`columns_exists`][toolbox_pyspark.checks.columns_exists]
368 - [`assert_column_exists`][toolbox_pyspark.checks.assert_column_exists]
369 - [`assert_columns_exists`][toolbox_pyspark.checks.assert_columns_exists]
370 - [`warn_column_missing`][toolbox_pyspark.checks.warn_column_missing]
371 - [`warn_columns_missing`][toolbox_pyspark.checks.warn_columns_missing]
372 """
373 if not column_exists(dataframe, column, match_case):
374 raise ColumnDoesNotExistError(
375 f"Column '{column}' does not exist in 'dataframe'.\n"
376 f"Try one of: {dataframe.columns}."
377 )
380@typechecked
381def assert_columns_exists(
382 dataframe: psDataFrame,
383 columns: Union[str, str_collection],
384 match_case: bool = False,
385) -> None:
386 """
387 !!! note "Summary"
388 Check whether all of the values in `#!py columns` exist in `#!py dataframe.columns`.
390 Params:
391 dataframe (psDataFrame):
392 The DataFrame to check.
393 columns (Union[str_list, str_tuple, str_set]):
394 The columns to check.
395 match_case (bool, optional):
396 Whether or not to match the string case for the columns.<br>
397 If `#!py False`, will default to: `#!py [col.upper() for col in columns]`.<br>
398 Default: `#!py True`.
400 Raises:
401 TypeError:
402 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
403 ColumnDoesNotExistError:
404 If any of the `#!py columns` do not exist within `#!py dataframe.columns`.
406 Returns:
407 (type(None)):
408 Nothing is returned. Either an `#!py ColumnDoesNotExistError` exception is raised, or nothing.
410 ???+ example "Examples"
412 ```{.py .python linenums="1" title="Set up"}
413 >>> import pandas as pd
414 >>> from pyspark.sql import SparkSession
415 >>> from toolbox_pyspark.checks import assert_columns_exists
416 >>> spark = SparkSession.builder.getOrCreate()
417 >>> df = spark.createDataFrame(
418 ... pd.DataFrame(
419 ... {
420 ... "a": [1, 2, 3, 4],
421 ... "b": ["a", "b", "c", "d"],
422 ... }
423 ... )
424 ... )
425 ```
427 ```{.py .python linenums="1" title="Example 1: No error"}
428 >>> assert_columns_exists(df, ["a", "b"])
429 ```
430 <div class="result" markdown>
431 ```{.sh .shell title="Terminal"}
432 None
433 ```
434 !!! success "Conclusion: Columns exist."
435 </div>
437 ```{.py .python linenums="1" title="Example 2: One column missing"}
438 >>> assert_columns_exists(df, ["b", "c"])
439 ```
440 <div class="result" markdown>
441 ```{.txt .text title="Terminal"}
442 ColumnDoesNotExistError: Columns ["c"] do not exist in "dataframe".
443 Try one of: ["a", "b"].
444 ```
445 !!! failure "Conclusion: Column "c" does not exist."
446 </div>
448 ```{.py .python linenums="1" title="Example 3: Multiple columns missing"}
449 >>> assert_columns_exists(df, ["b", "c", "d"])
450 ```
451 <div class="result" markdown>
452 ```{.txt .text title="Terminal"}
453 ColumnDoesNotExistError: Columns ["c", "d"] do not exist in "dataframe".
454 Try one of: ["a", "b"].
455 ```
456 !!! failure "Conclusion: Columns "c" and "d" does not exist."
457 </div>
459 ??? tip "See Also"
460 - [`column_exists`][toolbox_pyspark.checks.column_exists]
461 - [`columns_exists`][toolbox_pyspark.checks.columns_exists]
462 - [`assert_column_exists`][toolbox_pyspark.checks.assert_column_exists]
463 - [`assert_columns_exists`][toolbox_pyspark.checks.assert_columns_exists]
464 - [`warn_column_missing`][toolbox_pyspark.checks.warn_column_missing]
465 - [`warn_columns_missing`][toolbox_pyspark.checks.warn_columns_missing]
466 """
467 columns = [columns] if is_type(columns, str) else columns
468 (exist, missing_cols) = _columns_exists(dataframe, columns, match_case)
469 if not exist:
470 raise ColumnDoesNotExistError(
471 f"Columns {missing_cols} do not exist in 'dataframe'.\n"
472 f"Try one of: {dataframe.columns}"
473 )
476@typechecked
477def warn_column_missing(
478 dataframe: psDataFrame,
479 column: str,
480 match_case: bool = False,
481) -> None:
482 """
483 !!! summary "Summary"
484 Check whether a given `#!py column` exists as a valid column within `#!py dataframe.columns`.
486 Params:
487 dataframe (psDataFrame):
488 The DataFrame to check.
489 column (str):
490 The column to check.
491 match_case (bool, optional):
492 Whether or not to match the string case for the columns.<br>
493 Defaults to `#!py False`.
495 Raises:
496 TypeError:
497 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
499 Returns:
500 (type(None)):
501 Nothing is returned. Either an `#!py ColumnDoesNotExistWarning` exception is raised, or nothing.
503 ???+ example "Examples"
505 ```{.py .python linenums="1" title="Set up"}
506 >>> import pandas as pd
507 >>> from pyspark.sql import SparkSession
508 >>> from toolbox_pyspark.checks import warn_column_missing
509 >>> spark = SparkSession.builder.getOrCreate()
510 >>> df = spark.createDataFrame(
511 ... pd.DataFrame(
512 ... {
513 ... "a": [1, 2, 3, 4],
514 ... "b": ["a", "b", "c", "d"],
515 ... }
516 ... )
517 ... )
518 ```
520 ```{.py .python linenums="1" title="Example 1: No error"}
521 >>> warn_column_missing(df, ["a", "b"])
522 ```
523 <div class="result" markdown>
524 ```{.txt .text title="Terminal"}
525 None
526 ```
527 !!! success "Conclusion: Columns exist."
528 </div>
530 ```{.py .python linenums="1" title="Example 2: Warning raised"}
531 >>> warn_column_missing(df, "c")
532 ```
533 <div class="result" markdown>
534 ```{.txt .text title="Terminal"}
535 ColumnDoesNotExistWarning: Column "c" does not exist in "dataframe".
536 Try one of: ["a", "b"].
537 ```
538 !!! failure "Conclusion: Column does not exist."
539 </div>
541 ??? tip "See Also"
542 - [`column_exists`][toolbox_pyspark.checks.column_exists]
543 - [`columns_exists`][toolbox_pyspark.checks.columns_exists]
544 - [`assert_column_exists`][toolbox_pyspark.checks.assert_column_exists
545 - [`assert_columns_exists`][toolbox_pyspark.checks.assert_columns_exists]
546 - [`warn_column_missing`][toolbox_pyspark.checks.warn_column_missing]
547 - [`warn_columns_missing`][toolbox_pyspark.checks.warn_columns_missing]
548 """
549 if not column_exists(dataframe, column, match_case):
550 warn(
551 f"Column '{column}' does not exist in 'dataframe'.\n"
552 f"Try one of: {dataframe.columns}.",
553 ColumnDoesNotExistWarning,
554 )
557@typechecked
558def warn_columns_missing(
559 dataframe: psDataFrame,
560 columns: Union[str, str_collection],
561 match_case: bool = False,
562) -> None:
563 """
564 !!! summary "Summary"
565 Check whether all of the values in `#!py columns` exist in `#!py dataframe.columns`.
567 Params:
568 dataframe (psDataFrame):
569 The DataFrame to check.
570 columns (Union[str, str_collection]):
571 The columns to check.
572 match_case (bool, optional):
573 Whether or not to match the string case for the columns.<br>
574 Defaults to `#!py False`.
576 Raises:
577 TypeError:
578 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
580 Returns:
581 (type(None)):
582 Nothing is returned. Either an `#!py ColumnDoesNotExistWarning` exception is raised, or nothing.
584 ???+ example "Examples"
586 ```{.py .python linenums="1" title="Set up"}
587 >>> import pandas as pd
588 >>> from pyspark.sql import SparkSession
589 >>> from toolbox_pyspark.checks import warn_columns_missing
590 >>> spark = SparkSession.builder.getOrCreate()
591 >>> df = spark.createDataFrame(
592 ... pd.DataFrame(
593 ... {
594 ... "a": [1, 2, 3, 4],
595 ... "b": ["a", "b", "c", "d"],
596 ... }
597 ... )
598 ... )
599 ```
601 ```{.py .python linenums="1" title="Example 1: No error"}
602 >>> warn_columns_missing(df, ["a", "b"])
603 ```
604 <div class="result" markdown>
605 ```{.txt .text title="Terminal"}
606 None
607 ```
608 !!! success "Conclusion: Columns exist."
609 </div>
611 ```{.py .python linenums="1" title="Example 2: One column missing"}
612 >>> warn_columns_missing(df, ["b", "c"])
613 ```
614 <div class="result" markdown>
615 ```{.txt .text title="Terminal"}
616 ColumnDoesNotExistWarning: Columns ["c"] do not exist in "dataframe".
617 Try one of: ["a", "b"].
618 ```
619 !!! failure "Conclusion: Column "c" does not exist."
620 </div>
622 ```{.py .python linenums="1" title="Example 3: Multiple columns missing"}
623 >>> warn_columns_missing(df, ["b", "c", "d"])
624 ```
625 <div class="result" markdown>
626 ```{.txt .text title="Terminal"}
627 ColumnDoesNotExistWarning: Columns ["c", "d"] do not exist in "dataframe".
628 Try one of: ["a", "b"].
629 ```
630 !!! failure "Conclusion: Columns "c" and "d" does not exist."
631 </div>
633 ??? tip "See Also"
634 - [`column_exists`][toolbox_pyspark.checks.column_exists]
635 - [`columns_exists`][toolbox_pyspark.checks.columns_exists]
636 - [`assert_column_exists`][toolbox_pyspark.checks.assert_column_exists]
637 - [`assert_columns_exists`][toolbox_pyspark.checks.assert_columns_exists]
638 - [`warn_column_missing`][toolbox_pyspark.checks.warn_column_missing]
639 - [`warn_columns_missing`][toolbox_pyspark.checks.warn_columns_missing]
640 """
641 columns = [columns] if is_type(columns, str) else columns
642 (exist, missing_cols) = _columns_exists(dataframe, columns, match_case)
643 if not exist:
644 warn(
645 f"Columns {missing_cols} do not exist in 'dataframe'.\n"
646 f"Try one of: {dataframe.columns}",
647 ColumnDoesNotExistWarning,
648 )
651# ---------------------------------------------------------------------------- #
652# Type checks ####
653# ---------------------------------------------------------------------------- #
656@typechecked
657def is_vaid_spark_type(datatype: str) -> bool:
658 """
659 !!! note "Summary"
660 Check whether a given `#!py datatype` is a correct and valid `#!py pyspark` data type.
662 Params:
663 datatype (str):
664 The name of the data type to check.
666 Raises:
667 TypeError:
668 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
669 InvalidPySparkDataTypeError:
670 If the given `#!py datatype` is not a valid `#!py pyspark` data type.
672 Returns:
673 (bool):
674 `#!py True` if the datatype is valid, `#!py False` otherwise.
676 ???+ example "Examples"
678 ```{.py .python linenums="1" title="Set up"}
679 >>> from toolbox_pyspark.checks import is_vaid_spark_type
680 ```
682 ```{.py .python linenums="1" title="Loop through all valid types"}
683 >>> type_names = ["string", "char", "varchar", "binary", "boolean", "decimal", "float", "double", "byte", "short", "integer", "long", "date", "timestamp", "timestamp_ntz", "void"]
684 >>> for type_name in type_names:
685 ... is_vaid_spark_type(type_name)
686 ```
687 <div class="result" markdown>
688 Nothing is returned each time. Because they're all valid.
689 !!! success "Conclusion: They're all valid."
690 </div>
692 ```{.py .python linenums="1" title="Check some invalid types"}
693 >>> type_names = ["np.ndarray", "pd.DataFrame", "dict"]
694 >>> for type_name in type_names:
695 ... is_vaid_spark_type(type_name)
696 ```
697 <div class="result" markdown>
698 ```{.txt .text title="Terminal"}
699 InvalidPySparkDataTypeError: DataType 'np.ndarray' is not valid.
700 Must be one of: ["binary", "bool", "boolean", "byte", "char", "date", "decimal", "double", "float", "int", "integer", "long", "short", "str", "string", "timestamp", "timestamp_ntz", "varchar", "void"]
701 ```
702 ```{.txt .text title="Terminal"}
703 InvalidPySparkDataTypeError: DataType 'pd.DataFrame' is not valid.
704 Must be one of: ["binary", "bool", "boolean", "byte", "char", "date", "decimal", "double", "float", "int", "integer", "long", "short", "str", "string", "timestamp", "timestamp_ntz", "varchar", "void"]
705 ```
706 ```{.txt .text title="Terminal"}
707 InvalidPySparkDataTypeError: DataType 'dict' is not valid.
708 Must be one of: ["binary", "bool", "boolean", "byte", "char", "date", "decimal", "double", "float", "int", "integer", "long", "short", "str", "string", "timestamp", "timestamp_ntz", "varchar", "void"]
709 ```
710 !!! failure "Conclusion: All of these types are invalid."
711 </div>
713 ??? tip "See Also"
714 - [`assert_valid_spark_type`][toolbox_pyspark.checks.assert_valid_spark_type]
715 """
716 return datatype in VALID_PYSPARK_TYPE_NAMES
719@typechecked
720def assert_valid_spark_type(datatype: str) -> None:
721 """
722 !!! note "Summary"
723 Assert whether a given `#!py datatype` is a correct and valid `#!py pyspark` data type.
725 Params:
726 datatype (str):
727 The name of the data type to check.
729 Raises:
730 TypeError:
731 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
732 InvalidPySparkDataTypeError:
733 If the given `#!py datatype` is not a valid `#!py pyspark` data type.
735 Returns:
736 (type(None)):
737 Nothing is returned. Either an `#!py InvalidPySparkDataTypeError` exception is raised, or nothing.
739 ???+ example "Examples"
741 ```{.py .python linenums="1" title="Set up"}
742 >>> from toolbox_pyspark.checks import assert_valid_spark_type
743 ```
745 ```{.py .python linenums="1" title="Example 1: Valid type"}
746 >>> assert_valid_spark_type("string")
747 ```
748 <div class="result" markdown>
749 ```{.txt .text title="Terminal"}
750 None
751 ```
752 !!! success "Conclusion: Valid type."
753 </div>
755 ```{.py .python linenums="1" title="Example 2: Invalid type"}
756 >>> assert_valid_spark_type("invalid_type")
757 ```
758 <div class="result" markdown>
759 ```{.txt .text title="Terminal"}
760 InvalidPySparkDataTypeError: DataType 'invalid_type' is not valid.
761 Must be one of: ["binary", "bool", "boolean", "byte", "char", "date", "decimal", "double", "float", "int", "integer", "long", "short", "str", "string", "timestamp", "timestamp_ntz", "varchar", "void"]
762 ```
763 !!! failure "Conclusion: Invalid type."
764 </div>
766 ??? tip "See Also"
767 - [`is_vaid_spark_type`][toolbox_pyspark.checks.is_vaid_spark_type]
768 """
769 if not is_vaid_spark_type(datatype):
770 raise InvalidPySparkDataTypeError(
771 f"DataType '{datatype}' is not valid.\n"
772 f"Must be one of: {VALID_PYSPARK_TYPE_NAMES}"
773 )
776# ---------------------------------------------------------------------------- #
777# Column Types ####
778# ---------------------------------------------------------------------------- #
781@dataclass
782class ColumnsAreTypeResult:
783 result: bool
784 invalid_types: list[tuple[str, str]]
786 def __iter__(self):
787 for field in fields(self):
788 yield getattr(self, field.name)
791def _validate_pyspark_datatype(
792 datatype: Union[str, type, T.DataType],
793) -> ALL_PYSPARK_TYPES:
794 datatype = T.FloatType() if datatype == "float" or datatype is float else datatype
795 if is_type(datatype, str):
796 datatype = "string" if datatype == "str" else datatype
797 datatype = "boolean" if datatype == "bool" else datatype
798 datatype = "integer" if datatype == "int" else datatype
799 datatype = "timestamp" if datatype == "datetime" else datatype
800 try:
801 datatype = eval(datatype)
802 except NameError:
803 datatype = T._parse_datatype_string(s=datatype) # type:ignore
804 if type(datatype).__name__ == "type":
805 datatype = T._type_mappings.get(datatype)() # type:ignore
806 return datatype
809@typechecked
810def _columns_are_type(
811 dataframe: psDataFrame,
812 columns: Union[str, str_collection],
813 datatype: str,
814 match_case: bool = False,
815) -> ColumnsAreTypeResult:
816 columns = [columns] if is_type(columns, str) else columns
817 assert_columns_exists(dataframe, columns, match_case)
818 assert_valid_spark_type(datatype)
819 target_type: ALL_PYSPARK_TYPES = _validate_pyspark_datatype(datatype)
820 df_dtypes: list[tuple[str, str]] = dataframe.dtypes
821 df_dtypess: list[tuple[str, ALL_PYSPARK_TYPES]] = [
822 (col, _validate_pyspark_datatype(dtype)) for col, dtype in df_dtypes
823 ]
824 invalid_cols: list[tuple[str, str]] = [
825 (col, dtype.simpleString())
826 for col, dtype in df_dtypess
827 if (col.upper() if match_case else col)
828 in [col.upper() if match_case else col for col in columns]
829 and dtype != target_type
830 ]
831 return ColumnsAreTypeResult(len(invalid_cols) == 0, invalid_cols)
834@typechecked
835def column_is_type(
836 dataframe: psDataFrame,
837 column: str,
838 datatype: str,
839 match_case: bool = False,
840) -> bool:
841 """
842 !!! note "Summary"
843 Check whether a given `#!py column` is of a given `#!py datatype` in `#!py dataframe`.
845 Params:
846 dataframe (psDataFrame):
847 The DataFrame to check.
848 column (str):
849 The column to check.
850 datatype (str):
851 The data type to check.
852 match_case (bool, optional):
853 Whether or not to match the string case for the columns.<br>
854 Defaults to `#!py False`.
856 Raises:
857 TypeError:
858 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
859 ColumnDoesNotExistError:
860 If the `#!py column` does not exist within `#!py dataframe.columns`.
861 InvalidPySparkDataTypeError:
862 If the `#!py datatype` is not a valid `#!py pyspark` data type.
864 Returns:
865 (bool):
866 `#!py True` if the column is of the given `#!py datatype`, `#!py False` otherwise.
868 ???+ example "Examples"
870 ```{.py .python linenums="1" title="Set up"}
871 >>> import pandas as pd
872 >>> from pyspark.sql import SparkSession
873 >>> from toolbox_pyspark.checks import column_is_type
874 >>> spark = SparkSession.builder.getOrCreate()
875 >>> df = spark.createDataFrame(
876 ... pd.DataFrame(
877 ... {
878 ... "a": [1, 2, 3, 4],
879 ... "b": ["a", "b", "c", "d"],
880 ... }
881 ... )
882 ... )
883 ```
885 ```{.py .python linenums="1" title="Example 1: Column is of type"}
886 >>> column_is_type(df, "a", "integer")
887 ```
888 <div class="result" markdown>
889 ```{.sh .shell title="Terminal"}
890 True
891 ```
892 !!! success "Conclusion: Column is the correct type."
893 </div>
895 ```{.py .python linenums="1" title="Example 2: Column is not of type"}
896 >>> column_is_type(df, "b", "integer")
897 ```
898 <div class="result" markdown>
899 ```{.sh .shell title="Terminal"}
900 False
901 ```
902 !!! failure "Conclusion: Column is not the correct type."
903 </div>
905 ??? tip "See Also"
906 - [`column_is_type`][toolbox_pyspark.checks.column_is_type]
907 - [`columns_are_type`][toolbox_pyspark.checks.columns_are_type]
908 - [`assert_column_is_type`][toolbox_pyspark.checks.assert_column_is_type]
909 - [`assert_columns_are_type`][toolbox_pyspark.checks.assert_columns_are_type]
910 - [`warn_column_invalid_type`][toolbox_pyspark.checks.warn_column_invalid_type]
911 - [`warn_columns_invalid_type`][toolbox_pyspark.checks.warn_columns_invalid_type]
912 """
913 return _columns_are_type(dataframe, column, datatype, match_case).result
916@typechecked
917def columns_are_type(
918 dataframe: psDataFrame,
919 columns: Union[str, str_collection],
920 datatype: str,
921 match_case: bool = False,
922) -> bool:
923 """
924 !!! note "Summary"
925 Check whether the given `#!py columns` are of a given `#!py datatype` in `#!py dataframe`.
927 Params:
928 dataframe (psDataFrame):
929 The DataFrame to check.
930 columns (Union[str, str_collection]):
931 The columns to check.
932 datatype (str):
933 The data type to check.
934 match_case (bool, optional):
935 Whether or not to match the string case for the columns.<br>
936 Defaults to `#!py False`.
938 Raises:
939 TypeError:
940 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
941 ColumnDoesNotExistError:
942 If any of the `#!py columns` do not exist within `#!py dataframe.columns`.
943 InvalidPySparkDataTypeError:
944 If the `#!py datatype` is not a valid `#!py pyspark` data type.
946 Returns:
947 (bool):
948 `#!py True` if all the columns are of the given `#!py datatype`, `#!py False` otherwise.
950 ???+ example "Examples"
952 ```{.py .python linenums="1" title="Set up"}
953 >>> import pandas as pd
954 >>> from pyspark.sql import SparkSession
955 >>> from toolbox_pyspark.checks import columns_are_type
956 >>> spark = SparkSession.builder.getOrCreate()
957 >>> df = spark.createDataFrame(
958 ... pd.DataFrame(
959 ... {
960 ... "a": [1, 2, 3, 4],
961 ... "b": ["a", "b", "c", "d"],
962 ... "c": [1.1, 2.2, 3.3, 4.4],
963 ... }
964 ... )
965 ... )
966 ```
968 ```{.py .python linenums="1" title="Example 1: Columns are of type"}
969 >>> columns_are_type(df, ["a", "c"], "double")
970 ```
971 <div class="result" markdown>
972 ```{.sh .shell title="Terminal"}
973 True
974 ```
975 !!! success "Conclusion: Columns are the correct type."
976 </div>
978 ```{.py .python linenums="1" title="Example 2: Columns are not of type"}
979 >>> columns_are_type(df, ["a", "b"], "double")
980 ```
981 <div class="result" markdown>
982 ```{.sh .shell title="Terminal"}
983 False
984 ```
985 !!! failure "Conclusion: Columns are not the correct type."
986 </div>
988 ```{.py .python linenums="1" title="Example 3: Single column is of type"}
989 >>> columns_are_type(df, "a", "integer")
990 ```
991 <div class="result" markdown>
992 ```{.sh .shell title="Terminal"}
993 True
994 ```
995 !!! success "Conclusion: Column is the correct type."
996 </div>
998 ```{.py .python linenums="1" title="Example 4: Single column is not of type"}
999 >>> columns_are_type(df, "b", "integer")
1000 ```
1001 <div class="result" markdown>
1002 ```{.sh .shell title="Terminal"}
1003 False
1004 ```
1005 !!! failure "Conclusion: Column is not the correct type."
1006 </div>
1008 ??? tip "See Also"
1009 - [`column_is_type`][toolbox_pyspark.checks.column_is_type]
1010 - [`columns_are_type`][toolbox_pyspark.checks.columns_are_type]
1011 - [`assert_column_is_type`][toolbox_pyspark.checks.assert_column_is_type]
1012 - [`assert_columns_are_type`][toolbox_pyspark.checks.assert_columns_are_type]
1013 - [`warn_column_invalid_type`][toolbox_pyspark.checks.warn_column_invalid_type]
1014 - [`warn_columns_invalid_type`][toolbox_pyspark.checks.warn_columns_invalid
1015 """
1016 return _columns_are_type(dataframe, columns, datatype, match_case).result
1019@typechecked
1020def assert_column_is_type(
1021 dataframe: psDataFrame,
1022 column: str,
1023 datatype: str,
1024 match_case: bool = False,
1025) -> None:
1026 """
1027 !!! note "Summary"
1028 Check whether a given `#!py column` is of a given `#!py datatype` in `#!py dataframe`.
1030 Params:
1031 dataframe (psDataFrame):
1032 The DataFrame to check.
1033 column (str):
1034 The column to check.
1035 datatype (str):
1036 The data type to check.
1037 match_case (bool, optional):
1038 Whether or not to match the string case for the columns.<br>
1039 Defaults to `#!py False`.
1041 Raises:
1042 TypeError:
1043 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
1044 ColumnDoesNotExistError:
1045 If the `#!py column` does not exist within `#!py dataframe.columns`.
1046 InvalidPySparkDataTypeError:
1047 If the given `#!py column` is not of the given `#!py datatype`.
1049 Returns:
1050 (type(None)):
1051 Nothing is returned. Either an `#!py InvalidPySparkDataTypeError` exception is raised, or nothing.
1053 ???+ example "Examples"
1055 ```{.py .python linenums="1" title="Set up"}
1056 >>> import pandas as pd
1057 >>> from pyspark.sql import SparkSession
1058 >>> from toolbox_pyspark.checks import assert_column_is_type
1059 >>> spark = SparkSession.builder.getOrCreate()
1060 >>> df = spark.createDataFrame(
1061 ... pd.DataFrame(
1062 ... {
1063 ... "a": [1, 2, 3, 4],
1064 ... "b": ["a", "b", "c", "d"],
1065 ... }
1066 ... )
1067 ... )
1068 ```
1070 ```{.py .python linenums="1" title="Example 1: No error"}
1071 >>> assert_column_is_type(df, "a", "integer")
1072 ```
1073 <div class="result" markdown>
1074 ```{.sh .shell title="Terminal"}
1075 None
1076 ```
1077 !!! success "Conclusion: Column is of type."
1078 </div>
1080 ```{.py .python linenums="1" title="Example 2: Error raised"}
1081 >>> assert_column_is_type(df, "b", "integer")
1082 ```
1083 <div class="result" markdown>
1084 ```{.txt .text title="Terminal"}
1085 InvalidPySparkDataTypeError: Column 'b' is not of type 'integer'.
1086 ```
1087 !!! failure "Conclusion: Column is not of type."
1088 </div>
1090 ??? tip "See Also"
1091 - [`column_is_type`][toolbox_pyspark.checks.column_is_type]
1092 - [`columns_are_type`][toolbox_pyspark.checks.columns_are_type]
1093 - [`assert_column_is_type`][toolbox_pyspark.checks.assert_column_is_type]
1094 - [`assert_columns_are_type`][toolbox_pyspark.checks.assert_columns_are_type]
1095 - [`warn_column_invalid_type`][toolbox_pyspark.checks.warn_column_invalid_type]
1096 - [`warn_columns_invalid_type`][toolbox_pyspark.checks.warn_columns_invalid
1097 """
1098 result, invalid_types = _columns_are_type(dataframe, column, datatype, match_case)
1099 if not result:
1100 raise InvalidPySparkDataTypeError(
1101 f"Column '{column}' is type '{invalid_types[0][1]}', "
1102 f"which is not the required type: '{datatype}'."
1103 )
1106@typechecked
1107def assert_columns_are_type(
1108 dataframe: psDataFrame,
1109 columns: Union[str, str_collection],
1110 datatype: str,
1111 match_case: bool = False,
1112) -> None:
1113 """
1114 !!! note "Summary"
1115 Check whether the given `#!py columns` are of a given `#!py datatype` in `#!py dataframe`.
1117 Params:
1118 dataframe (psDataFrame):
1119 The DataFrame to check.
1120 columns (Union[str, str_collection]):
1121 The columns to check.
1122 datatype (str):
1123 The data type to check.
1124 match_case (bool, optional):
1125 Whether or not to match the string case for the columns.<br>
1126 Defaults to `#!py False`.
1128 Raises:
1129 TypeError:
1130 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
1131 ColumnDoesNotExistError:
1132 If any of the `#!py columns` do not exist within `#!py dataframe.columns`.
1133 InvalidPySparkDataTypeError:
1134 If any of the given `#!py columns` are not of the given `#!py datatype`.
1136 Returns:
1137 (type(None)):
1138 Nothing is returned. Either an `#!py InvalidPySparkDataTypeError` exception is raised, or nothing.
1140 ???+ example "Examples"
1142 ```{.py .python linenums="1" title="Set up"}
1143 >>> import pandas as pd
1144 >>> from pyspark.sql import SparkSession
1145 >>> from toolbox_pyspark.checks import assert_columns_are_type
1146 >>> spark = SparkSession.builder.getOrCreate()
1147 >>> df = spark.createDataFrame(
1148 ... pd.DataFrame(
1149 ... {
1150 ... "a": [1, 2, 3, 4],
1151 ... "b": ["a", "b", "c", "d"],
1152 ... "c": [1.1, 2.2, 3.3, 4.4],
1153 ... }
1154 ... )
1155 ... )
1156 ```
1158 ```{.py .python linenums="1" title="Example 1: No error"}
1159 >>> assert_columns_are_type(df, ["a", "c"], "double")
1160 ```
1161 <div class="result" markdown>
1162 ```{.sh .shell title="Terminal"}
1163 None
1164 ```
1165 !!! success "Conclusion: Columns are of type."
1166 </div>
1168 ```{.py .python linenums="1" title="Example 2: Error raised"}
1169 >>> assert_columns_are_type(df, ["a", "b"], "double")
1170 ```
1171 <div class="result" markdown>
1172 ```{.txt .text title="Terminal"}
1173 InvalidPySparkDataTypeError: Columns ['a', 'b'] are types ['int', 'string'], which are not the required type: 'double'.
1174 ```
1175 !!! failure "Conclusion: Columns are not of type."
1176 </div>
1178 ```{.py .python linenums="1" title="Example 3: Single column is of type"}
1179 >>> assert_columns_are_type(df, "a", "integer")
1180 ```
1181 <div class="result" markdown>
1182 ```{.sh .shell title="Terminal"}
1183 None
1184 ```
1185 !!! success "Conclusion: Column is of type."
1186 </div>
1188 ```{.py .python linenums="1" title="Example 4: Single column is not of type"}
1189 >>> assert_columns_are_type(df, "b", "integer")
1190 ```
1191 <div class="result" markdown>
1192 ```{.txt .text title="Terminal"}
1193 InvalidPySparkDataTypeError: Columns ['b'] are types ['string'], which are not the required type: 'integer'.
1194 ```
1195 !!! failure "Conclusion: Column is not of type."
1196 </div>
1198 ??? tip "See Also"
1199 - [`column_is_type`][toolbox_pyspark.checks.column_is_type]
1200 - [`columns_are_type`][toolbox_pyspark.checks.columns_are_type]
1201 - [`assert_column_is_type`][toolbox_pyspark.checks.assert_column_is_type]
1202 - [`assert_columns_are_type`][toolbox_pyspark.checks.assert_columns_are_type]
1203 - [`warn_column_invalid_type`][toolbox_pyspark.checks.warn_column_invalid_type]
1204 - [`warn_columns_invalid_type`][toolbox_pyspark.checks.warn_columns_invalid
1205 """
1206 result, invalid_types = _columns_are_type(dataframe, columns, datatype, match_case)
1207 if not result:
1208 raise InvalidPySparkDataTypeError(
1209 f"Columns {[col for col, _ in invalid_types]} are types {[typ for _, typ in invalid_types]}, "
1210 f"which are not the required type: '{datatype}'."
1211 )
1214@typechecked
1215def warn_column_invalid_type(
1216 dataframe: psDataFrame,
1217 column: str,
1218 datatype: str,
1219 match_case: bool = False,
1220) -> None:
1221 """
1222 !!! note "Summary"
1223 Check whether a given `#!py column` is of a given `#!py datatype` in `#!py dataframe` and raise a warning if not.
1225 Params:
1226 dataframe (psDataFrame):
1227 The DataFrame to check.
1228 column (str):
1229 The column to check.
1230 datatype (str):
1231 The data type to check.
1232 match_case (bool, optional):
1233 Whether or not to match the string case for the columns.<br>
1234 Defaults to `#!py False`.
1236 Raises:
1237 TypeError:
1238 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
1240 Returns:
1241 (type(None)):
1242 Nothing is returned. Either an `#!py InvalidPySparkDataTypeWarning` exception is raised, or nothing.
1244 ???+ example "Examples"
1246 ```{.py .python linenums="1" title="Set up"}
1247 >>> import pandas as pd
1248 >>> from pyspark.sql import SparkSession
1249 >>> from toolbox_pyspark.checks import warn_column_invalid_type
1250 >>> spark = SparkSession.builder.getOrCreate()
1251 >>> df = spark.createDataFrame(
1252 ... pd.DataFrame(
1253 ... {
1254 ... "a": [1, 2, 3, 4],
1255 ... "b": ["a", "b", "c", "d"],
1256 ... }
1257 ... )
1258 ... )
1259 ```
1261 ```{.py .python linenums="1" title="Example 1: No warning"}
1262 >>> warn_column_invalid_type(df, "a", "integer")
1263 ```
1264 <div class="result" markdown>
1265 ```{.txt .text title="Terminal"}
1266 None
1267 ```
1268 !!! success "Conclusion: Column is of type."
1269 </div>
1271 ```{.py .python linenums="1" title="Example 2: Warning raised"}
1272 >>> warn_column_invalid_type(df, "b", "integer")
1273 ```
1274 <div class="result" markdown>
1275 ```{.txt .text title="Terminal"}
1276 InvalidPySparkDataTypeWarning: Column 'b' is type 'string', which is not the required type: 'integer'.
1277 ```
1278 !!! failure "Conclusion: Column is not of type."
1279 </div>
1281 ??? tip "See Also"
1282 - [`column_is_type`][toolbox_pyspark.checks.column_is_type]
1283 - [`columns_are_type`][toolbox_pyspark.checks.columns_are_type]
1284 - [`assert_column_is_type`][toolbox_pyspark.checks.assert_column_is_type]
1285 - [`assert_columns_are_type`][toolbox_pyspark.checks.assert_columns_are_type]
1286 - [`warn_column_invalid_type`][toolbox_pyspark.checks.warn_column_invalid_type]
1287 - [`warn_columns_invalid_type`][toolbox_pyspark.checks.warn_columns_invalid
1288 """
1289 result, invalid_types = _columns_are_type(dataframe, column, datatype, match_case)
1290 if not result:
1291 warn(
1292 f"Column '{column}' is type '{invalid_types[0][1]}', "
1293 f"which is not the required type: '{datatype}'.",
1294 InvalidPySparkDataTypeWarning,
1295 )
1298@typechecked
1299def warn_columns_invalid_type(
1300 dataframe: psDataFrame,
1301 columns: Union[str, str_collection],
1302 datatype: str,
1303 match_case: bool = False,
1304) -> None:
1305 """
1306 !!! note "Summary"
1307 Check whether the given `#!py columns` are of a given `#!py datatype` in `#!py dataframe` and raise a warning if not.
1309 Params:
1310 dataframe (psDataFrame):
1311 The DataFrame to check.
1312 columns (Union[str, str_collection]):
1313 The columns to check.
1314 datatype (str):
1315 The data type to check.
1316 match_case (bool, optional):
1317 Whether or not to match the string case for the columns.<br>
1318 Defaults to `#!py False`.
1320 Raises:
1321 TypeError:
1322 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
1324 Returns:
1325 (type(None)):
1326 Nothing is returned. Either an `#!py InvalidPySparkDataTypeWarning` exception is raised, or nothing.
1328 ???+ example "Examples"
1330 ```{.py .python linenums="1" title="Set up"}
1331 >>> import pandas as pd
1332 >>> from pyspark.sql import SparkSession
1333 >>> from toolbox_pyspark.checks import warn_columns_invalid_type
1334 >>> spark = SparkSession.builder.getOrCreate()
1335 >>> df = spark.createDataFrame(
1336 ... pd.DataFrame(
1337 ... {
1338 ... "a": [1, 2, 3, 4],
1339 ... "b": ["a", "b", "c", "d"],
1340 ... "c": [1.1, 2.2, 3.3, 4.4],
1341 ... }
1342 ... )
1343 ... )
1344 ```
1346 ```{.py .python linenums="1" title="Example 1: No warning"}
1347 >>> warn_columns_invalid_type(df, ["a", "c"], "double")
1348 ```
1349 <div class="result" markdown>
1350 ```{.txt .text title="Terminal"}
1351 None
1352 ```
1353 !!! success "Conclusion: Columns are of type."
1354 </div>
1356 ```{.py .python linenums="1" title="Example 2: Warning raised"}
1357 >>> warn_columns_invalid_type(df, ["a", "b"], "double")
1358 ```
1359 <div class="result" markdown>
1360 ```{.txt .text title="Terminal"}
1361 InvalidPySparkDataTypeWarning: Columns ['a', 'b'] are types ['int', 'string'], which are not the required type: 'double'.
1362 ```
1363 !!! failure "Conclusion: Columns are not of type."
1364 </div>
1366 ??? tip "See Also"
1367 - [`column_is_type`][toolbox_pyspark.checks.column_is_type]
1368 - [`columns_are_type`][toolbox_pyspark.checks.columns_are_type]
1369 - [`assert_column_is_type`][toolbox_pyspark.checks.assert_column_is_type]
1370 - [`assert_columns_are_type`][toolbox_pyspark.checks.assert_columns_are_type]
1371 - [`warn_column_invalid_type`][toolbox_pyspark.checks.warn_column_invalid_type]
1372 - [`warn_columns_invalid_type`][toolbox_pyspark.checks.warn_columns_invalid
1373 """
1374 result, invalid_types = _columns_are_type(dataframe, columns, datatype, match_case)
1375 if not result:
1376 warn(
1377 f"Columns {[col for col, _ in invalid_types]} are types {[typ for _, typ in invalid_types]}, "
1378 f"which are not the required type: '{datatype}'.",
1379 InvalidPySparkDataTypeWarning,
1380 )
1383@typechecked
1384def column_contains_value(
1385 dataframe: psDataFrame,
1386 column: str,
1387 value: str,
1388 match_case: bool = False,
1389) -> bool:
1390 """
1391 !!! note "Summary"
1392 Check whether a given `#!py column` contains a specific `#!py value` in `#!py dataframe`.
1394 Params:
1395 dataframe (psDataFrame):
1396 The DataFrame to check.
1397 column (str):
1398 The column to check.
1399 value (str):
1400 The value to check for.
1401 match_case (bool, optional):
1402 Whether or not to match the string case for the value.<br>
1403 Defaults to `#!py False`.
1405 Raises:
1406 TypeError:
1407 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
1408 ColumnDoesNotExistError:
1409 If the `#!py column` does not exist within `#!py dataframe.columns`.
1411 Returns:
1412 (bool):
1413 `#!py True` if the column contains the value, `#!py False` otherwise.
1415 ???+ example "Examples"
1417 ```{.py .python linenums="1" title="Set up"}
1418 >>> import pandas as pd
1419 >>> from pyspark.sql import SparkSession
1420 >>> from toolbox_pyspark.checks import column_contains_value
1421 >>> spark = SparkSession.builder.getOrCreate()
1422 >>> df = spark.createDataFrame(
1423 ... pd.DataFrame(
1424 ... {
1425 ... "a": [1, 2, 3, 4],
1426 ... "b": ["a", "b", "c", "d"],
1427 ... }
1428 ... )
1429 ... )
1430 ```
1432 ```{.py .python linenums="1" title="Example 1: Value exists"}
1433 >>> column_contains_value(df, "b", "a")
1434 ```
1435 <div class="result" markdown>
1436 ```{.sh .shell title="Terminal"}
1437 True
1438 ```
1439 !!! success "Conclusion: Value exists in column."
1440 </div>
1442 ```{.py .python linenums="1" title="Example 2: Value does not exist"}
1443 >>> column_contains_value(df, "b", "z")
1444 ```
1445 <div class="result" markdown>
1446 ```{.sh .shell title="Terminal"}
1447 False
1448 ```
1449 !!! failure "Conclusion: Value does not exist in column."
1450 </div>
1452 ??? tip "See Also"
1453 - [`assert_column_exists`][toolbox_pyspark.checks.assert_column_exists]
1454 """
1455 assert_column_exists(dataframe, column, match_case)
1457 if not match_case:
1458 value = value.lower()
1459 dataframe = dataframe.withColumn(column, F.lower(F.col(column)))
1461 return dataframe.filter(f"{column} = '{value}'").count() > 0
1464# ---------------------------------------------------------------------------- #
1465# Table Existence ####
1466# ---------------------------------------------------------------------------- #
1469@typechecked
1470def table_exists(
1471 name: str,
1472 path: str,
1473 data_format: SPARK_FORMATS,
1474 spark_session: SparkSession,
1475) -> bool:
1476 """
1477 !!! note "Summary"
1478 Will try to read `#!py table` from `#!py path` using `#!py format`, and if successful will return `#!py True` otherwise `#!py False`.
1480 Params:
1481 name (str):
1482 The name of the table to check exists.
1483 path (str):
1484 The directory where the table should be existing.
1485 data_format (str):
1486 The format of the table to try checking.
1487 spark_session (SparkSession):
1488 The `#!py spark` session to use for the importing.
1490 Raises:
1491 TypeError:
1492 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
1494 Returns:
1495 (bool):
1496 Returns `#!py True` if the table exists, `False` otherwise.
1498 ???+ example "Examples"
1500 ```{.py .python linenums="1" title="Set up"}
1501 >>> # Imports
1502 >>> import pandas as pd
1503 >>> from pyspark.sql import SparkSession
1504 >>> from toolbox_pyspark.io import write_to_path
1505 >>> from toolbox_pyspark.checks import table_exists
1506 >>>
1507 >>> # Constants
1508 >>> write_name = "test_df"
1509 >>> write_path = f"./test"
1510 >>> write_format = "parquet"
1511 >>>
1512 >>> # Instantiate Spark
1513 >>> spark = SparkSession.builder.getOrCreate()
1514 >>>
1515 >>> # Create data
1516 >>> df = spark.createDataFrame(
1517 ... pd.DataFrame(
1518 ... {
1519 ... "a": [1, 2, 3, 4],
1520 ... "b": ["a", "b", "c", "d"],
1521 ... }
1522 ... )
1523 ... )
1524 >>>
1525 >>> # Write data
1526 >>> write_to_path(df, f"{write_name}.{write_format}", write_path)
1527 ```
1529 ```{.py .python linenums="1" title="Example 1: Table exists"}
1530 >>> table_exists("test_df.parquet", "./test", "parquet", spark)
1531 ```
1532 <div class="result" markdown>
1533 ```{.sh .shell title="Terminal"}
1534 True
1535 ```
1536 !!! success "Conclusion: Table exists."
1537 </div>
1539 ```{.py .python linenums="1" title="Example 2: Table does not exist"}
1540 >>> table_exists("bad_table_name.parquet", "./test", "parquet", spark)
1541 ```
1542 <div class="result" markdown>
1543 ```{.sh .shell title="Terminal"}
1544 False
1545 ```
1546 !!! failure "Conclusion: Table does not exist."
1547 </div>
1549 ??? tip "See Also"
1550 - [`assert_table_exists`][toolbox_pyspark.checks.assert_table_exists]
1551 """
1552 try:
1553 _ = read_from_path(
1554 name=name,
1555 path=path,
1556 data_format=data_format,
1557 spark_session=spark_session,
1558 )
1559 except Exception:
1560 return False
1561 return True
1564@typechecked
1565def assert_table_exists(
1566 name: str,
1567 path: str,
1568 data_format: SPARK_FORMATS,
1569 spark_session: SparkSession,
1570) -> None:
1571 """
1572 !!! note "Summary"
1573 Assert whether a table exists at a given `path` using `data_format`.
1575 Params:
1576 name (str):
1577 The name of the table to check exists.
1578 path (str):
1579 The directory where the table should be existing.
1580 data_format (str):
1581 The format of the table to try checking.
1582 spark_session (SparkSession):
1583 The `#!py spark` session to use for the importing.
1585 Raises:
1586 TypeError:
1587 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
1588 TableDoesNotExistError:
1589 If the table does not exist at the specified location.
1591 Returns:
1592 (type(None)):
1593 Nothing is returned. Either an `#!py TableDoesNotExistError` exception is raised, or nothing.
1595 ???+ example "Examples"
1597 ```{.py .python linenums="1" title="Set up"}
1598 >>> # Imports
1599 >>> import pandas as pd
1600 >>> from pyspark.sql import SparkSession
1601 >>> from toolbox_pyspark.io import write_to_path
1602 >>> from toolbox_pyspark.checks import assert_table_exists
1603 >>>
1604 >>> # Constants
1605 >>> write_name = "test_df"
1606 >>> write_path = f"./test"
1607 >>> write_format = "parquet"
1608 >>>
1609 >>> # Instantiate Spark
1610 >>> spark = SparkSession.builder.getOrCreate()
1611 >>>
1612 >>> # Create data
1613 >>> df = spark.createDataFrame(
1614 ... pd.DataFrame(
1615 ... {
1616 ... "a": [1, 2, 3, 4],
1617 ... "b": ["a", "b", "c", "d"],
1618 ... }
1619 ... )
1620 ... )
1621 >>>
1622 >>> # Write data
1623 >>> write_to_path(df, f"{write_name}.{write_format}", write_path)
1624 ```
1626 ```{.py .python linenums="1" title="Example 1: Table exists"}
1627 >>> assert_table_exists("test_df.parquet", "./test", "parquet", spark)
1628 ```
1629 <div class="result" markdown>
1630 ```{.sh .shell title="Terminal"}
1631 None
1632 ```
1633 !!! success "Conclusion: Table exists."
1634 </div>
1636 ```{.py .python linenums="1" title="Example 2: Table does not exist"}
1637 >>> assert_table_exists("bad_table_name.parquet", "./test", "parquet", spark)
1638 ```
1639 <div class="result" markdown>
1640 ```{.txt .text title="Terminal"}
1641 TableDoesNotExistError: Table 'bad_table_name.parquet' does not exist at path './test'.
1642 ```
1643 !!! failure "Conclusion: Table does not exist."
1644 </div>
1646 ??? tip "See Also"
1647 - [`table_exists`][toolbox_pyspark.checks.table_exists]
1648 """
1649 if not table_exists(
1650 name=name, path=path, data_format=data_format, spark_session=spark_session
1651 ):
1652 raise TableDoesNotExistError(f"Table '{name}' does not exist at path '{path}'.")