Coverage for src/toolbox_pyspark/keys.py: 100%
24 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
1# ============================================================================ #
2# #
3# Title : Keys #
4# Purpose : Creating new columns to act as keys (primary and foreign), #
5# to be used for joins with other tables, or to create #
6# relationships within downstream applications, like PowerBI. #
7# #
8# ============================================================================ #
11# ---------------------------------------------------------------------------- #
12# #
13# Overview ####
14# #
15# ---------------------------------------------------------------------------- #
18# ---------------------------------------------------------------------------- #
19# Description ####
20# ---------------------------------------------------------------------------- #
23"""
24!!! note "Summary"
25 The `keys` module is used for creating new columns to act as keys (primary and foreign), to be used for joins with other tables, or to create relationships within downstream applications, like PowerBI.
26"""
29# ---------------------------------------------------------------------------- #
30# #
31# Setup ####
32# #
33# ---------------------------------------------------------------------------- #
36# ---------------------------------------------------------------------------- #
37# Imports ####
38# ---------------------------------------------------------------------------- #
41# ## Python StdLib Imports ----
42from typing import Optional, Union
44# ## Python Third Party Imports ----
45from pyspark.sql import DataFrame as psDataFrame, functions as F
46from toolbox_python.checkers import is_type
47from toolbox_python.collection_types import str_collection, str_list
48from typeguard import typechecked
50# ## Local First Party Imports ----
51from toolbox_pyspark.checks import assert_columns_exists
54# ---------------------------------------------------------------------------- #
55# Exports ####
56# ---------------------------------------------------------------------------- #
59__all__: str_list = ["add_keys_from_columns", "add_key_from_columns"]
62# ---------------------------------------------------------------------------- #
63# #
64# Functions ####
65# #
66# ---------------------------------------------------------------------------- #
69# ---------------------------------------------------------------------------- #
70# Add Keys ####
71# ---------------------------------------------------------------------------- #
74@typechecked
75def add_key_from_columns(
76 dataframe: psDataFrame,
77 columns: Union[str, str_collection],
78 join_character: Optional[str] = "_",
79 key_name: Optional[str] = None,
80) -> psDataFrame:
81 """
82 !!! note "Summary"
83 Using a list of column names, add a new column which is a combination of all of them.
85 ???+ abstract "Details"
86 This is a combine key, and is especially important because PowerBI cannot handle joins on multiple columns.
88 Params:
89 dataframe (psDataFrame):
90 The table to be updated.
91 columns (Union[str, str_collection]):
92 The columns to be combined.<br>
93 If `columns` is a `#!py str`, then it will be coerced to a single-element list: `#!py [columns]`.
94 join_character (Optional[str], optional):
95 The character to use to combine the columns together.<br>
96 Defaults to `#!py "_"`.
97 key_name (Optional[str], optional):
98 The name of the column to be given to the key.
99 If not provided, it will form as the capitalised string of all the other column names, prefixed with `key_`.<br>
100 Defaults to `#!py None`.
102 Raises:
103 TypeError:
104 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
105 ColumnDoesNotExistError:
106 If any of the `#!py columns` do not exist within `#!py dataframe.columns`.
108 Returns:
109 (psDataFrame):
110 The updated `dataframe`.
112 ???+ example "Examples"
114 ```{.py .python linenums="1" title="Set up"}
115 >>> # Imports
116 >>> import pandas as pd
117 >>> from pyspark.sql import SparkSession
118 >>> from toolbox_pyspark.types import get_column_types
119 >>>
120 >>> # Instantiate Spark
121 >>> spark = SparkSession.builder.getOrCreate()
122 >>>
123 >>> # Create data
124 >>> df = spark.createDataFrame(
125 ... pd.DataFrame(
126 ... {
127 ... "a": [1, 2, 3, 4],
128 ... "b": ["a", "b", "c", "d"],
129 ... "c": [1, 1, 1, 1],
130 ... "d": ["2", "2", "2", "2"],
131 ... }
132 ... )
133 ... )
134 >>>
135 >>> # Check
136 >>> df.show()
137 ```
138 <div class="result" markdown>
139 ```{.txt .text title="Terminal"}
140 +---+---+---+---+
141 | a | b | c | d |
142 +---+---+---+---+
143 | 1 | a | 1 | 2 |
144 | 2 | b | 1 | 2 |
145 | 3 | c | 1 | 2 |
146 | 4 | d | 1 | 2 |
147 +---+---+---+---+
148 ```
149 </div>
151 ```{.py .python linenums="1" title="Example 1: Basic usage"}
152 >>> new_df = add_key_from_columns(df, ["a", "b"])
153 >>> new_df.show()
154 ```
155 <div class="result" markdown>
156 ```{.txt .text title="Terminal"}
157 +---+---+---+---+---------+
158 | a | b | c | d | key_A_B |
159 +---+---+---+---+---------+
160 | 1 | a | 1 | 2 | 1_a |
161 | 2 | b | 1 | 2 | 2_b |
162 | 3 | c | 1 | 2 | 3_c |
163 | 4 | d | 1 | 2 | 4_d |
164 +---+---+---+---+---------+
165 ```
166 !!! success "Conclusion: Successfully added new key column to DataFrame."
167 </div>
169 ```{.py .python linenums="1" title="Example 2: Single column"}
170 >>> new_df = add_key_from_columns(df, "a")
171 >>> new_df.show()
172 ```
173 <div class="result" markdown>
174 ```{.txt .text title="Terminal"}
175 +---+---+---+---+-------+
176 | a | b | c | d | key_A |
177 +---+---+---+---+-------+
178 | 1 | a | 1 | 2 | 1 |
179 | 2 | b | 1 | 2 | 2 |
180 | 3 | c | 1 | 2 | 3 |
181 | 4 | d | 1 | 2 | 4 |
182 +---+---+---+---+-------+
183 ```
184 !!! success "Conclusion: Successfully added new key column to DataFrame."
185 </div>
187 ```{.py .python linenums="1" title="Example 3: New name"}
188 >>> new_df = add_key_from_columns(df, ["a", "b"], "new_key")
189 >>> new_df.show()
190 ```
191 <div class="result" markdown>
192 ```{.txt .text title="Terminal"}
193 +---+---+---+---+---------+
194 | a | b | c | d | new_key |
195 +---+---+---+---+---------+
196 | 1 | a | 1 | 2 | 1_a |
197 | 2 | b | 1 | 2 | 2_b |
198 | 3 | c | 1 | 2 | 3_c |
199 | 4 | d | 1 | 2 | 4_d |
200 +---+---+---+---+---------+
201 ```
202 !!! success "Conclusion: Successfully added new key column to DataFrame."
203 </div>
205 ```{.py .python linenums="1" title="Example 4: Raise error"}
206 >>> new_df = add_key_from_columns(df, ["a", "x"])
207 ```
208 <div class="result" markdown>
209 ```{.txt .text title="Terminal"}
210 Attribute Error: Columns ["x"] do not exist in "dataframe". Try one of: ["a", "b", "c", "d"].
211 ```
212 !!! failure "Conclusion: Invalid column selection."
213 </div>
214 """
215 columns = [columns] if is_type(columns, str) else columns
216 assert_columns_exists(dataframe, columns)
217 join_character = join_character or ""
218 key_name = key_name or f"key_{'_'.join([col.upper() for col in columns])}"
219 return dataframe.withColumn(
220 key_name,
221 F.concat_ws(join_character, *columns),
222 )
225@typechecked
226def add_keys_from_columns(
227 dataframe: psDataFrame,
228 collection_of_columns: Union[
229 tuple[Union[str, str_collection], ...],
230 list[Union[str, str_collection]],
231 dict[str, Union[str, str_collection]],
232 ],
233 join_character: Optional[str] = "_",
234) -> psDataFrame:
235 """
236 !!! note "Summary"
237 Add multiple new keys, each of which are collections of other columns.
239 ???+ abstract "Details"
240 There are a few reasons why this functionality would be needed:
242 1. When you wanted to create a new single column to act as a combine key, derived from multiple other columns.
243 1. When you're interacting with PowerBI, it will only allow you to create relationships on one single column, not a combination of multiple columns.
244 1. When you're joining multiple tables together, each of them join on a different combination of different columns, and you want to make your `pyspark` joins cleaner, instead of using `#!py list`'s of multiple `#!py F.col(...)` equality checks.
246 Params:
247 dataframe (psDataFrame):
248 The table to be updated.
249 collection_of_columns (Union[tuple[Union[str, str_collection], ...], [Union[str, str_collection]], dict[str, Union[str, str_collection]]]):
250 The collection of columns to be combined together.<br>
251 If it is a `#!py list` of `#!py list`'s of `#!py str`'s (or similar), then the key name will be derived from a concatenation of the original columns names.<br>
252 If it's a `#!py dict` where the values are a `#!py list` of `#!py str`'s (or similar), then the column name for the new key is taken from the key of the dictionary.
253 join_character (Optional[str], optional):
254 The character to use to combine the columns together.<br>
255 Defaults to `#!py "_"`.
257 Raises:
258 TypeError:
259 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
260 ColumnDoesNotExistError:
261 If any of the `#!py columns` do not exist within `#!py dataframe.columns`.
263 Returns:
264 (psDataFrame):
265 The updated `dataframe`.
267 ???+ example "Examples"
269 ```{.py .python linenums="1" title="Set up"}
270 >>> # Imports
271 >>> import pandas as pd
272 >>> from pyspark.sql import SparkSession
273 >>> from toolbox_pyspark.types import get_column_types
274 >>>
275 >>> # Instantiate Spark
276 >>> spark = SparkSession.builder.getOrCreate()
277 >>>
278 >>> # Create data
279 >>> df = spark.createDataFrame(
280 ... pd.DataFrame(
281 ... {
282 ... "a": [1, 2, 3, 4],
283 ... "b": ["a", "b", "c", "d"],
284 ... "c": [1, 1, 1, 1],
285 ... "d": ["2", "2", "2", "2"],
286 ... }
287 ... )
288 ... )
289 >>>
290 >>> # Check
291 >>> df.show()
292 ```
293 <div class="result" markdown>
294 ```{.txt .text title="Terminal"}
295 +---+---+---+---+
296 | a | b | c | d |
297 +---+---+---+---+
298 | 1 | a | 1 | 2 |
299 | 2 | b | 1 | 2 |
300 | 3 | c | 1 | 2 |
301 | 4 | d | 1 | 2 |
302 +---+---+---+---+
303 ```
304 </div>
306 ```{.py .python linenums="1" title="Example 1: Basic usage"}
307 >>> new_df = add_keys_from_columns(df, [["a", "b"], ["b", "c"]])
308 >>> new_df.show()
309 ```
310 <div class="result" markdown>
311 ```{.txt .text title="Terminal"}
312 +---+---+---+---+---------+---------+
313 | a | b | c | d | key_A_B | key_B_C |
314 +---+---+---+---+---------+---------+
315 | 1 | a | 1 | 2 | 1_a | a_1 |
316 | 2 | b | 1 | 2 | 2_b | b_1 |
317 | 3 | c | 1 | 2 | 3_c | c_1 |
318 | 4 | d | 1 | 2 | 4_d | d_1 |
319 +---+---+---+---+---------+---------+
320 ```
321 !!! success "Conclusion: Successfully added two new key columns to DataFrame."
322 </div>
324 ```{.py .python linenums="1" title="Example 2: Created from dict"}
325 >>> new_df = add_keys_from_columns(df, {"first": ["a", "b"], "second": ["b", "c"]])
326 >>> new_df.show()
327 ```
328 <div class="result" markdown>
329 ```{.txt .text title="Terminal"}
330 +---+---+---+---+-------+--------+
331 | a | b | c | d | first | second |
332 +---+---+---+---+-------+--------+
333 | 1 | a | 1 | 2 | 1_a | a_1 |
334 | 2 | b | 1 | 2 | 2_b | b_1 |
335 | 3 | c | 1 | 2 | 3_c | c_1 |
336 | 4 | d | 1 | 2 | 4_d | d_1 |
337 +---+---+---+---+-------+--------+
338 ```
339 !!! success "Conclusion: Successfully added two new key columns to DataFrame."
340 </div>
341 """
342 join_character = join_character or ""
343 if is_type(collection_of_columns, dict):
344 for key_name, columns in collection_of_columns.items():
345 dataframe = add_key_from_columns(dataframe, columns, join_character, key_name)
346 elif is_type(collection_of_columns, (tuple, list)):
347 for columns in collection_of_columns:
348 dataframe = add_key_from_columns(dataframe, columns, join_character)
349 return dataframe