Coverage for src/toolbox_pyspark/types.py: 100%
44 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
1# ============================================================================ #
2# #
3# Title : Column Types #
4# Purpose : Get, check, and change a datafames column data types. #
5# #
6# ============================================================================ #
9# ---------------------------------------------------------------------------- #
10# #
11# Overview ####
12# #
13# ---------------------------------------------------------------------------- #
16# ---------------------------------------------------------------------------- #
17# Description ####
18# ---------------------------------------------------------------------------- #
21"""
22!!! note "Summary"
23 The `types` module is used to get, check, and change a datafames column data types.
24"""
27# ---------------------------------------------------------------------------- #
28# #
29# Setup ####
30# #
31# ---------------------------------------------------------------------------- #
34# ---------------------------------------------------------------------------- #
35# Imports ####
36# ---------------------------------------------------------------------------- #
39# ## Python StdLib Imports ----
40from typing import Union
42# ## Python Third Party Imports ----
43import pandas as pd
44from pandas import DataFrame as pdDataFrame
45from pyspark.sql import DataFrame as psDataFrame, functions as F, types as T
46from toolbox_python.checkers import is_type
47from toolbox_python.collection_types import str_list, str_tuple
48from toolbox_python.dictionaries import dict_reverse_keys_and_values
49from typeguard import typechecked
51# ## Local First Party Imports ----
52from toolbox_pyspark.checks import (
53 _validate_pyspark_datatype,
54 assert_column_exists,
55 assert_columns_exists,
56)
57from toolbox_pyspark.constants import (
58 VALID_DATAFRAME_NAMES,
59 VALID_PYSPARK_DATAFRAME_NAMES,
60)
61from toolbox_pyspark.utils.exceptions import InvalidDataFrameNameError
64# ---------------------------------------------------------------------------- #
65# Exports ####
66# ---------------------------------------------------------------------------- #
69__all__: str_list = [
70 "get_column_types",
71 "cast_column_to_type",
72 "cast_columns_to_type",
73 "map_cast_columns_to_type",
74]
77# ---------------------------------------------------------------------------- #
78# #
79# Functions ####
80# #
81# ---------------------------------------------------------------------------- #
84# ---------------------------------------------------------------------------- #
85# Public functions ####
86# ---------------------------------------------------------------------------- #
89@typechecked
90def get_column_types(
91 dataframe: psDataFrame,
92 output_type: str = "psDataFrame",
93) -> Union[psDataFrame, pdDataFrame]:
94 """
95 !!! note "Summary"
96 This is a convenient function to return the data types from a given table as either a `#!py pyspark.sql.DataFrame` or `#!py pandas.DataFrame`.
98 Params:
99 dataframe (psDataFrame):
100 The DataFrame to be checked.
102 output_type (str, optional):
103 How should the data be returned? As `#!py pdDataFrame` or `#!py psDataFrame`.
105 For `#!py pandas`, use one of:
107 ```{.sh .shell title="Terminal"}
108 [
109 "pandas", "pandas.DataFrame",
110 "pd.df", "pd.DataFrame",
111 "pddf", "pdDataFrame",
112 "pd", "pdDF",
113 ]
114 ```
116 </div>
118 For `#!py pyspark` use one of:
120 ```{.sh .shell title="Terminal"}
121 [
122 "pyspark", "spark.DataFrame",
123 "spark", "pyspark.DataFrame",
124 "ps.df", "ps.DataFrame",
125 "psdf", "psDataFrame",
126 "ps", "psDF",
127 ]
128 ```
130 Any other options are invalid.<br>
131 Defaults to `#!py "psDataFrame"`.
133 Raises:
134 TypeError:
135 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
136 InvalidPySparkDataTypeError:
137 If the given value parsed to `#!py output_type` is not one of the given valid types.
139 Returns:
140 (Union[psDataFrame, pdDataFrame]):
141 The DataFrame where each row represents a column on the original `#!py dataframe` object, and which has two columns:
143 1. The column name from `#!py dataframe`; and
144 2. The data type for that column in `#!py dataframe`.
146 ???+ example "Examples"
148 ```{.py .python linenums="1" title="Set up"}
149 >>> # Imports
150 >>> import pandas as pd
151 >>> from pyspark.sql import SparkSession
152 >>> from toolbox_pyspark.types import get_column_types
153 >>>
154 >>> # Instantiate Spark
155 >>> spark = SparkSession.builder.getOrCreate()
156 >>>
157 >>> # Create data
158 >>> df = spark.createDataFrame(
159 ... pd.DataFrame(
160 ... {
161 ... "a": [1, 2, 3, 4],
162 ... "b": ["a", "b", "c", "d"],
163 ... "c": [1, 1, 1, 1],
164 ... "d": ["2", "2", "2", "2"],
165 ... }
166 ... )
167 ... )
168 >>>
169 >>> # Check
170 >>> print(df.dtypes)
171 ```
172 <div class="result" markdown>
173 ```{.sh .shell title="Terminal"}
174 [
175 ("a", "bigint"),
176 ("b", "string"),
177 ("c", "bigint"),
178 ("d", "string"),
179 ]
180 ```
181 </div>
183 ```{.py .python linenums="1" title="Example 1: Return PySpark"}
184 >>> get_column_types(df).show()
185 ```
186 <div class="result" markdown>
187 ```{.txt .text title="Terminal"}
188 +----------+----------+
189 | col_name | col_type |
190 +----------+----------+
191 | a | bigint |
192 | b | string |
193 | c | bigint |
194 | d | string |
195 +----------+----------+
196 ```
197 !!! success "Conclusion: Successfully print PySpark output."
198 </div>
200 ```{.py .python linenums="1" title="Example 2: Return Pandas"}
201 >>> print(get_column_types(df, "pd"))
202 ```
203 <div class="result" markdown>
204 ```{.txt .text title="Terminal"}
205 col_name col_type
206 0 a bigint
207 1 b string
208 2 c bigint
209 3 d string
210 ```
211 !!! success "Conclusion: Successfully print Pandas output."
212 </div>
214 ```{.py .python linenums="1" title="Example 3: Invalid output"}
215 >>> print(get_column_types(df, "foo"))
216 ```
217 <div class="result" markdown>
218 ```{.txt .text title="Terminal"}
219 InvalidDataFrameNameError: Invalid value for `output_type`: "foo".
220 Must be one of: ["pandas.DataFrame", "pandas", "pd.DataFrame", "pd.df", "pddf", "pdDataFrame", "pdDF", "pd", "spark.DataFrame", "pyspark.DataFrame", "pyspark", "spark", "ps.DataFrame", "ps.df", "psdf", "psDataFrame", "psDF", "ps"]
221 ```
222 !!! failure "Conclusion: Invalid input."
223 </div>
224 """
225 if output_type not in VALID_DATAFRAME_NAMES:
226 raise InvalidDataFrameNameError(
227 f"Invalid value for `output_type`: '{output_type}'.\n"
228 f"Must be one of: {VALID_DATAFRAME_NAMES}"
229 )
230 output = pd.DataFrame(dataframe.dtypes, columns=["col_name", "col_type"])
231 if output_type in VALID_PYSPARK_DATAFRAME_NAMES:
232 return dataframe.sparkSession.createDataFrame(output)
233 else:
234 return output
237@typechecked
238def cast_column_to_type(
239 dataframe: psDataFrame,
240 column: str,
241 datatype: Union[str, type, T.DataType],
242) -> psDataFrame:
243 """
244 !!! note "Summary"
245 This is a convenience function for casting a single column on a given table to another data type.
247 ???+ abstract "Details"
249 At it's core, it will call the function like this:
251 ```{.py .python linenums="1"}
252 dataframe = dataframe.withColumn(column, F.col(column).cast(datatype))
253 ```
255 The reason for wrapping it up in this function is for validation of a columns existence and convenient re-declaration of the same.
257 Params:
258 dataframe (psDataFrame):
259 The DataFrame to be updated.
260 column (str):
261 The column to be updated.
262 datatype (Union[str, type, T.DataType]):
263 The datatype to be cast to.
264 Must be a valid `#!py pyspark` DataType.
266 Use one of the following:
267 ```{.sh .shell title="Terminal"}
268 [
269 "string", "char",
270 "varchar", "binary",
271 "boolean", "decimal",
272 "float", "double",
273 "byte", "short",
274 "integer", "long",
275 "date", "timestamp",
276 "void", "timestamp_ntz",
277 ]
278 ```
280 Raises:
281 TypeError:
282 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
283 ColumnDoesNotExistError:
284 If the `#!py column` does not exist within `#!py dataframe.columns`.
285 ParseException:
286 If the given `#!py datatype` is not a valid PySpark DataType.
288 Returns:
289 (psDataFrame):
290 The updated DataFrame.
292 ???+ example "Examples"
294 ```{.py .python linenums="1" title="Set up"}
295 >>> # Imports
296 >>> import pandas as pd
297 >>> from pyspark.sql import SparkSession
298 >>> from toolbox_pyspark.types import cast_column_to_type, get_column_types
299 >>>
300 >>> # Instantiate Spark
301 >>> spark = SparkSession.builder.getOrCreate()
302 >>>
303 >>> # Create data
304 >>> df = spark.createDataFrame(
305 ... pd.DataFrame(
306 ... {
307 ... "a": [1, 2, 3, 4],
308 ... "b": ["a", "b", "c", "d"],
309 ... "c": [1, 1, 1, 1],
310 ... "d": ["2", "2", "2", "2"],
311 ... }
312 ... )
313 ... )
314 >>>
315 >>> # Check
316 >>> get_column_types(df).show()
317 ```
318 <div class="result" markdown>
319 ```{.txt .text title="Terminal"}
320 +----------+----------+
321 | col_name | col_type |
322 +----------+----------+
323 | a | bigint |
324 | b | string |
325 | c | bigint |
326 | d | string |
327 +----------+----------+
328 ```
329 </div>
331 ```{.py .python linenums="1" title="Example 1: Valid casting"}
332 >>> df = cast_column_to_type(df, "a", "string")
333 >>> get_column_types(df).show()
334 ```
335 <div class="result" markdown>
336 ```{.txt .text title="Terminal"}
337 +----------+----------+
338 | col_name | col_type |
339 +----------+----------+
340 | a | string |
341 | b | string |
342 | c | bigint |
343 | d | string |
344 +----------+----------+
345 ```
346 !!! success "Conclusion: Successfully cast column to type."
347 </div>
349 ```{.py .python linenums="1" title="Example 2: Invalid column"}
350 >>> df = cast_column_to_type(df, "x", "string")
351 ```
352 <div class="result" markdown>
353 ```{.txt .text title="Terminal"}
354 ColumnDoesNotExistError: Column "x" does not exist in DataFrame.
355 Try one of: ["a", "b", "c", "d"].
356 ```
357 !!! failure "Conclusion: Column `x` does not exist as a valid column."
358 </div>
360 ```{.py .python linenums="1" title="Example 3: Invalid datatype"}
361 >>> df = cast_column_to_type(df, "b", "foo")
362 ```
363 <div class="result" markdown>
364 ```{.txt .text title="Terminal"}
365 ParseException: DataType "foo" is not supported.
366 ```
367 !!! failure "Conclusion: Datatype `foo` is not valid."
368 </div>
370 ??? tip "See Also"
371 - [`assert_column_exists()`][toolbox_pyspark.checks.column_exists]
372 - [`is_vaid_spark_type()`][toolbox_pyspark.checks.is_vaid_spark_type]
373 - [`get_column_types()`][toolbox_pyspark.types.get_column_types]
374 """
375 assert_column_exists(dataframe, column)
376 datatype = _validate_pyspark_datatype(datatype=datatype)
377 return dataframe.withColumn(column, F.col(column).cast(datatype)) # type:ignore
380@typechecked
381def cast_columns_to_type(
382 dataframe: psDataFrame,
383 columns: Union[str, str_list],
384 datatype: Union[str, type, T.DataType],
385) -> psDataFrame:
386 """
387 !!! note "Summary"
388 Cast multiple columns to a given type.
390 ???+ abstract "Details"
391 An extension of [`#!py cast_column_to_type()`][toolbox_pyspark.types.cast_column_to_type] to allow casting of multiple columns simultaneously.
393 Params:
394 dataframe (psDataFrame):
395 The DataFrame to be updated.
396 columns (Union[str, str_list]):
397 The list of columns to be updated. They all must be valid columns existing on `#!py DataFrame`.
398 datatype (Union[str, type, T.DataType]):
399 The datatype to be cast to.
400 Must be a valid PySpark DataType.
402 Use one of the following:
403 ```{.sh .shell title="Terminal"}
404 [
405 "string", "char",
406 "varchar", "binary",
407 "boolean", "decimal",
408 "float", "double",
409 "byte", "short",
410 "integer", "long",
411 "date", "timestamp",
412 "void", "timestamp_ntz",
413 ]
414 ```
416 Raises:
417 TypeError:
418 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
420 Returns:
421 (psDataFrame):
422 The updated DataFrame.
424 ???+ example "Examples"
426 ```{.py .python linenums="1" title="Set up"}
427 >>> # Imports
428 >>> import pandas as pd
429 >>> from pyspark.sql import SparkSession
430 >>> from toolbox_pyspark.types import cast_column_to_type, get_column_types
431 >>>
432 >>> # Instantiate Spark
433 >>> spark = SparkSession.builder.getOrCreate()
434 >>>
435 >>> # Create data
436 >>> df = spark.createDataFrame(
437 ... pd.DataFrame(
438 ... {
439 ... "a": [1, 2, 3, 4],
440 ... "b": ["a", "b", "c", "d"],
441 ... "c": [1, 1, 1, 1],
442 ... "d": ["2", "2", "2", "2"],
443 ... }
444 ... )
445 ... )
446 >>>
447 >>> # Check
448 >>> get_column_types(df).show()
449 ```
450 <div class="result" markdown>
451 ```{.txt .text title="Terminal"}
452 +----------+----------+
453 | col_name | col_type |
454 +----------+----------+
455 | a | bigint |
456 | b | string |
457 | c | bigint |
458 | d | string |
459 +----------+----------+
460 ```
461 </div>
463 ```{.py .python linenums="1" title="Example 1: Basic usage"}
464 >>> df = cast_column_to_type(df, ["a"], "string")
465 >>> get_column_types(df).show()
466 ```
467 <div class="result" markdown>
468 ```{.txt .text title="Terminal"}
469 +----------+----------+
470 | col_name | col_type |
471 +----------+----------+
472 | a | string |
473 | b | string |
474 | c | bigint |
475 | d | bigint |
476 +----------+----------+
477 ```
478 !!! success "Conclusion: Successfully cast column to type."
479 </div>
481 ```{.py .python linenums="1" title="Example 2: Multiple columns"}
482 >>> df = cast_column_to_type(df, ["c", "d"], "string")
483 >>> get_column_types(df).show()
484 ```
485 <div class="result" markdown>
486 ```{.txt .text title="Terminal"}
487 +----------+----------+
488 | col_name | col_type |
489 +----------+----------+
490 | a | string |
491 | b | string |
492 | c | string |
493 | d | string |
494 +----------+----------+
495 ```
496 !!! success "Conclusion: Successfully cast columns to type."
497 </div>
499 ```{.py .python linenums="1" title="Example 3: Invalid column"}
500 >>> df = cast_columns_to_type(df, ["x", "y"], "string")
501 ```
502 <div class="result" markdown>
503 ```{.txt .text title="Terminal"}
504 ColumnDoesNotExistError: Columns ["x", "y"] do not exist in DataFrame.
505 Try one of: ["a", "b", "c", "d"].
506 ```
507 !!! failure "Conclusion: Columns `[x]` does not exist as a valid column."
508 </div>
510 ```{.py .python linenums="1" title="Example 4: Invalid datatype"}
511 >>> df = cast_columns_to_type(df, ["a", "b"], "foo")
512 ```
513 <div class="result" markdown>
514 ```{.txt .text title="Terminal"}
515 ParseException: DataType "foo" is not supported.
516 ```
517 !!! failure "Conclusion: Datatype `foo` is not valid."
518 </div>
520 ??? tip "See Also"
521 - [`assert_columns_exists()`][toolbox_pyspark.checks.assert_columns_exists]
522 - [`is_vaid_spark_type()`][toolbox_pyspark.checks.is_vaid_spark_type]
523 - [`get_column_types()`][toolbox_pyspark.types.get_column_types]
524 """
525 columns = [columns] if is_type(columns, str) else columns
526 assert_columns_exists(dataframe, columns)
527 datatype = _validate_pyspark_datatype(datatype=datatype)
528 return dataframe.withColumns({col: F.col(col).cast(datatype) for col in columns})
531@typechecked
532def map_cast_columns_to_type(
533 dataframe: psDataFrame,
534 columns_type_mapping: dict[
535 Union[str, type, T.DataType],
536 Union[str, str_list, str_tuple],
537 ],
538) -> psDataFrame:
539 """
540 !!! note "Summary"
541 Take a dictionary mapping of where the keys is the type and the values are the column(s), and apply that to the given dataframe.
543 ???+ abstract "Details"
544 Applies [`#!py cast_columns_to_type()`][toolbox_pyspark.types.cast_columns_to_type] and [`#!py cast_column_to_type()`][toolbox_pyspark.types.cast_column_to_type] under the hood.
546 Params:
547 dataframe (psDataFrame):
548 The DataFrame to transform.
549 columns_type_mapping (Dict[ Union[str, type, T.DataType], Union[str, str_list, str_tuple], ]):
550 The mapping of the columns to manipulate.<br>
551 The format must be: `#!py {type: columns}`.<br>
552 Where the keys are the relevant type to cast to, and the values are the column(s) for casting.
554 Returns:
555 (psDataFrame):
556 The transformed data frame.
558 ???+ example "Examples"
560 ```{.py .python linenums="1" title="Set up"}
561 >>> # Imports
562 >>> import pandas as pd
563 >>> from pyspark.sql import SparkSession
564 >>> from toolbox_pyspark.types import cast_column_to_type, get_column_types
565 >>>
566 >>> # Instantiate Spark
567 >>> spark = SparkSession.builder.getOrCreate()
568 >>>
569 >>> # Create data
570 >>> df = spark.createDataFrame(
571 ... pd.DataFrame(
572 ... {
573 ... "a": [1, 2, 3, 4],
574 ... "b": ["a", "b", "c", "d"],
575 ... "c": [1, 1, 1, 1],
576 ... "d": ["2", "2", "2", "2"],
577 ... }
578 ... )
579 ... )
580 >>>
581 >>> # Check
582 >>> get_column_types(df).show()
583 ```
584 <div class="result" markdown>
585 ```{.txt .text title="Terminal"}
586 +----------+----------+
587 | col_name | col_type |
588 +----------+----------+
589 | a | bigint |
590 | b | string |
591 | c | bigint |
592 | d | string |
593 +----------+----------+
594 ```
595 </div>
597 ```{.py .python linenums="1" title="Example 1: Basic usage"}
598 >>> df = map_cast_columns_to_type(df, {"str": ["a", "c"]})
599 >>> get_column_types(df).show()
600 ```
601 <div class="result" markdown>
602 ```{.txt .text title="Terminal"}
603 +----------+----------+
604 | col_name | col_type |
605 +----------+----------+
606 | a | string |
607 | b | string |
608 | c | string |
609 | d | string |
610 +----------+----------+
611 ```
612 !!! success "Conclusion: Successfully cast columns to type."
613 </div>
615 ```{.py .python linenums="1" title="Example 2: Multiple types"}
616 >>> df = map_cast_columns_to_type(df, {"int": ["a", "c"], "str": ["b"], "float": "d"})
617 >>> get_column_types(df).show()
618 ```
619 <div class="result" markdown>
620 ```{.txt .text title="Terminal"}
621 +----------+----------+
622 | col_name | col_type |
623 +----------+----------+
624 | a | bigint |
625 | b | string |
626 | c | bigint |
627 | d | float |
628 +----------+----------+
629 ```
630 !!! success "Conclusion: Successfully cast columns to types."
631 </div>
633 ```{.py .python linenums="1" title="Example 3: All to single type"}
634 >>> df = map_cast_columns_to_type(df, {str: [col for col in df.columns]})
635 >>> get_column_types(df).show()
636 ```
637 <div class="result" markdown>
638 ```{.txt .text title="Terminal"}
639 +----------+----------+
640 | col_name | col_type |
641 +----------+----------+
642 | a | string |
643 | b | string |
644 | c | string |
645 | d | string |
646 +----------+----------+
647 ```
648 !!! success "Conclusion: Successfully cast all columns to type."
649 </div>
651 ??? tip "See Also"
652 - [`cast_column_to_type()`][toolbox_pyspark.types.cast_column_to_type]
653 - [`cast_columns_to_type()`][toolbox_pyspark.types.cast_columns_to_type]
654 - [`assert_columns_exists()`][toolbox_pyspark.checks.assert_columns_exists]
655 - [`is_vaid_spark_type()`][toolbox_pyspark.checks.is_vaid_spark_type]
656 - [`get_column_types()`][toolbox_pyspark.types.get_column_types]
657 """
659 # Ensure all keys are `str`
660 keys = (*columns_type_mapping.keys(),)
661 for key in keys:
662 if is_type(key, type):
663 if key.__name__ in keys:
664 columns_type_mapping[key.__name__] = list(
665 columns_type_mapping[key.__name__]
666 ) + list(columns_type_mapping.pop(key))
667 else:
668 columns_type_mapping[key.__name__] = columns_type_mapping.pop(key)
670 # Reverse keys and values
671 reversed_mapping = dict_reverse_keys_and_values(dictionary=columns_type_mapping)
673 # Validate
674 assert_columns_exists(dataframe, reversed_mapping.keys())
676 # Apply mapping to dataframe
677 try:
678 dataframe = dataframe.withColumns(
679 {
680 col: F.col(col).cast(_validate_pyspark_datatype(typ))
681 for col, typ in reversed_mapping.items()
682 }
683 )
684 except Exception as e: # pragma: no cover
685 raise RuntimeError(f"Raised {e.__class__.__name__}: {e}") from e
687 # Return
688 return dataframe