Coverage for src/toolbox_pyspark/info.py: 100%
31 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
1# ============================================================================ #
2# #
3# Title: Info #
4# Purpose: Provide utility functions for retrieving information from #
5# `pyspark` dataframes. #
6# #
7# ============================================================================ #
10# ---------------------------------------------------------------------------- #
11# #
12# Overview ####
13# #
14# ---------------------------------------------------------------------------- #
17# ---------------------------------------------------------------------------- #
18# Description ####
19# ---------------------------------------------------------------------------- #
22"""
23!!! note "Summary"
24 The `info` module is used to provide utility functions for retrieving information from `pyspark` dataframes.
25"""
28# ---------------------------------------------------------------------------- #
29# #
30# Setup ####
31# #
32# ---------------------------------------------------------------------------- #
35## --------------------------------------------------------------------------- #
36## Imports ####
37## --------------------------------------------------------------------------- #
40# ## Python StdLib Imports ----
41from typing import Any, Optional, Union
43# ## Python Third Party Imports ----
44from numpy import ndarray as npArray
45from pandas import DataFrame as pdDataFrame
46from pyspark.sql import DataFrame as psDataFrame, types as T
47from toolbox_python.checkers import is_type
48from toolbox_python.collection_types import str_collection, str_list
49from typeguard import typechecked
51# ## Local First Party Imports ----
52from toolbox_pyspark.checks import assert_column_exists
53from toolbox_pyspark.constants import (
54 LITERAL_LIST_OBJECT_NAMES,
55 LITERAL_NUMPY_ARRAY_NAMES,
56 LITERAL_PANDAS_DATAFRAME_NAMES,
57 LITERAL_PYSPARK_DATAFRAME_NAMES,
58 VALID_LIST_OBJECT_NAMES,
59 VALID_NUMPY_ARRAY_NAMES,
60 VALID_PANDAS_DATAFRAME_NAMES,
61 VALID_PYSPARK_DATAFRAME_NAMES,
62)
65## --------------------------------------------------------------------------- #
66## Exports ####
67## --------------------------------------------------------------------------- #
70__all__: str_list = ["get_distinct_values", "extract_column_values"]
73# ---------------------------------------------------------------------------- #
74# #
75# Main Section ####
76# #
77# ---------------------------------------------------------------------------- #
80## --------------------------------------------------------------------------- #
81## `get_*()` functions ####
82## --------------------------------------------------------------------------- #
85@typechecked
86def extract_column_values(
87 dataframe: psDataFrame,
88 column: str,
89 distinct: bool = True,
90 return_type: Union[
91 LITERAL_PYSPARK_DATAFRAME_NAMES,
92 LITERAL_PANDAS_DATAFRAME_NAMES,
93 LITERAL_NUMPY_ARRAY_NAMES,
94 LITERAL_LIST_OBJECT_NAMES,
95 ] = "pd",
96) -> Optional[Union[psDataFrame, pdDataFrame, npArray, list]]:
97 """
98 !!! note "Summary"
99 Retrieve the values from a specified column in a `pyspark` dataframe.
101 Params:
102 dataframe (psDataFrame):
103 The DataFrame to retrieve the column values from.
104 column (str):
105 The column to retrieve the values from.
106 distinct (bool, optional):
107 Whether to retrieve only distinct values.<br>
108 Defaults to `#!py True`.
109 return_type (Union[LITERAL_PYSPARK_DATAFRAME_NAMES, LITERAL_PANDAS_DATAFRAME_NAMES, LITERAL_NUMPY_ARRAY_NAMES, LITERAL_LIST_OBJECT_NAMES], optional):
110 The type of object to return.<br>
111 Defaults to `#!py "pd"`.
113 Raises:
114 TypeError:
115 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
116 ValueError:
117 If the `return_type` is not one of the valid options.
118 ColumnDoesNotExistError:
119 If the `#!py column` does not exist within `#!py dataframe.columns`.
121 Returns:
122 (Optional[Union[psDataFrame, pdDataFrame, npArray, list]]):
123 The values from the specified column in the specified return type.
125 ???+ example "Examples"
127 ```{.py .python linenums="1" title="Set up"}
128 >>> # Imports
129 >>> import pandas as pd
130 >>> from pyspark.sql import SparkSession
131 >>> from toolbox_pyspark.info import get_column_values
132 >>>
133 >>> # Instantiate Spark
134 >>> spark = SparkSession.builder.getOrCreate()
135 >>>
136 >>> # Create data
137 >>> df = spark.createDataFrame(
138 ... pd.DataFrame(
139 ... {
140 ... "a": [1, 2, 3, 4],
141 ... "b": ["a", "b", "c", "d"],
142 ... "c": [1, 1, 1, 1],
143 ... "d": ["2", "3", "3", "3"],
144 ... "e": ["a", "a", "b", "b"],
145 ... }
146 ... )
147 ... )
148 >>>
149 >>> # Check
150 >>> df.show()
151 ```
152 <div class="result" markdown>
153 ```{.txt .text title="Terminal"}
154 +---+---+---+---+---+
155 | a | b | c | d | e |
156 +---+---+---+---+---+
157 | 1 | a | 1 | 2 | a |
158 | 2 | b | 1 | 3 | a |
159 | 3 | c | 1 | 3 | b |
160 | 4 | d | 1 | 3 | b |
161 +---+---+---+---+---+
162 ```
163 </div>
165 ```{.py .python linenums="1" title="Example 1: Retrieve all values as pyspark DataFrame"}
166 >>> result = get_column_values(df, "e", distinct=False, return_type="ps")
167 >>> result.show()
168 ```
169 <div class="result" markdown>
170 ```{.txt .text title="Terminal"}
171 +---+
172 | e |
173 +---+
174 | a |
175 | a |
176 | b |
177 | b |
178 +---+
179 ```
180 !!! success "Conclusion: Successfully retrieved all values as pyspark DataFrame."
181 </div>
183 ```{.py .python linenums="1" title="Example 2: Retrieve distinct values as pandas DataFrame"}
184 >>> result = get_column_values(df, "b", distinct=True, return_type="pd")
185 >>> print(result)
186 ```
187 <div class="result" markdown>
188 ```{.txt .text title="Terminal"}
189 b
190 0 a
191 1 b
192 2 c
193 3 d
194 ```
195 !!! success "Conclusion: Successfully retrieved distinct values as pandas DataFrame."
196 </div>
198 ```{.py .python linenums="1" title="Example 3: Retrieve all values as list"}
199 >>> result = get_column_values(df, "c", distinct=False, return_type="list")
200 >>> print(result)
201 ```
202 <div class="result" markdown>
203 ```{.txt .text title="Terminal"}
204 ['1', '1', '1', '1']
205 ```
206 !!! success "Conclusion: Successfully retrieved all values as list."
207 </div>
209 ```{.py .python linenums="1" title="Example 4: Retrieve distinct values as numpy array"}
210 >>> result = get_column_values(df, "d", distinct=True, return_type="np")
211 >>> print(result)
212 ```
213 <div class="result" markdown>
214 ```{.txt .text title="Terminal"}
215 ['2' '3']
216 ```
217 !!! success "Conclusion: Successfully retrieved distinct values as numpy array."
218 </div>
220 ```{.py .python linenums="1" title="Example 5: Invalid column"}
221 >>> result = get_column_values(df, "invalid", distinct=True, return_type="pd")
222 ```
223 <div class="result" markdown>
224 ```{.txt .text title="Terminal"}
225 ColumnDoesNotExistError: Column 'invalid' does not exist. Did you mean one of the following? [a, b, c, d, e]
226 ```
227 !!! failure "Conclusion: Failed to retrieve values due to invalid column."
228 </div>
230 ```{.py .python linenums="1" title="Example 6: Invalid return type"}
231 >>> result = get_column_values(df, "b", distinct=True, return_type="invalid")
232 ```
233 <div class="result" markdown>
234 ```{.txt .text title="Terminal"}
235 ValueError: Invalid return type: invalid
236 ```
237 !!! failure "Conclusion: Failed to retrieve values due to invalid return type."
238 </div>
240 ??? tip "See Also"
241 - [`get_distinct_values`][toolbox_pyspark.info.get_distinct_values]
242 """
244 assert_column_exists(dataframe, column)
246 dataframe = dataframe.select(column)
248 if distinct:
249 dataframe = dataframe.distinct()
251 if return_type in VALID_PYSPARK_DATAFRAME_NAMES:
252 return dataframe
253 elif return_type in VALID_PANDAS_DATAFRAME_NAMES:
254 return dataframe.toPandas()
255 elif return_type in VALID_NUMPY_ARRAY_NAMES:
256 return dataframe.select(column).toPandas().to_numpy()
257 elif return_type in VALID_LIST_OBJECT_NAMES:
258 return dataframe.select(column).toPandas()[column].tolist()
261@typechecked
262def get_distinct_values(
263 dataframe: psDataFrame, columns: Union[str, str_collection]
264) -> tuple[Any, ...]:
265 """
266 !!! note "Summary"
267 Retrieve the distinct values from a specified column in a `pyspark` dataframe.
269 Params:
270 dataframe (psDataFrame):
271 The DataFrame to retrieve the distinct column values from.
272 columns (str):
273 The column(s) to retrieve the distinct values from.
275 Raises:
276 TypeError:
277 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
279 Returns:
280 (str_tuple):
281 The distinct values from the specified column.
283 ???+ example "Examples"
285 ```{.py .python linenums="1" title="Set up"}
286 >>> import pandas as pd
287 >>> from pyspark.sql import SparkSession
288 >>> from toolbox_pyspark.info import get_distinct_values
289 >>> spark = SparkSession.builder.getOrCreate()
290 >>> df = spark.createDataFrame(
291 ... pd.DataFrame(
292 ... {
293 ... "a": [1, 2, 3, 4],
294 ... "b": ["a", "b", "c", "d"],
295 ... "c": [1, 1, 1, 1],
296 ... "d": ["2", "2", "2", "2"],
297 ... }
298 ... )
299 ... )
300 ```
302 ```{.py .python linenums="1" title="Example 1: Retrieve distinct values"}
303 >>> result = get_distinct_values(df, "b")
304 >>> print(result)
305 ```
306 <div class="result" markdown>
307 ```{.txt .text title="Terminal"}
308 ('a', 'b', 'c', 'd')
309 ```
310 !!! success "Conclusion: Successfully retrieved distinct values."
311 </div>
313 ```{.py .python linenums="1" title="Example 2: Invalid column"}
314 >>> result = get_distinct_values(df, "invalid")
315 ```
316 <div class="result" markdown>
317 ```{.txt .text title="Terminal"}
318 AnalysisException: Column 'invalid' does not exist. Did you mean one of the following? [a, b, c, d]
319 ```
320 !!! failure "Conclusion: Failed to retrieve values due to invalid column."
321 </div>
323 ??? tip "See Also"
324 - [`get_column_values`][toolbox_pyspark.info.extract_column_values]
325 """
326 columns = [columns] if is_type(columns, str) else columns
327 rows: list[T.Row] = dataframe.select(*columns).distinct().collect()
328 if len(columns) == 1:
329 return tuple(row[columns[0]] for row in rows)
330 return tuple(tuple(row[col] for col in columns) for row in rows)