Coverage for src/toolbox_pyspark/duplication.py: 100%
26 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
1# ============================================================================ #
2# #
3# Title : Duplication #
4# Purpose : Duplicate from an existing `dataframe`, or union multiple #
5# `dataframe`'s together. #
6# #
7# ============================================================================ #
10# ---------------------------------------------------------------------------- #
11# #
12# Overview ####
13# #
14# ---------------------------------------------------------------------------- #
17# ---------------------------------------------------------------------------- #
18# Description ####
19# ---------------------------------------------------------------------------- #
22"""
23!!! note "Summary"
24 The `duplication` module is used for duplicating data from an existing `dataframe`, or unioning multiple `dataframe`'s together.
25"""
28# ---------------------------------------------------------------------------- #
29# #
30# Setup ####
31# #
32# ---------------------------------------------------------------------------- #
35# ---------------------------------------------------------------------------- #
36# Imports ####
37# ---------------------------------------------------------------------------- #
40# ## Python Third Party Imports ----
41from pyspark.sql import (
42 DataFrame as psDataFrame,
43 functions as F,
44)
45from toolbox_python.collection_types import str_list
46from typeguard import typechecked
48# ## Local First Party Imports ----
49from toolbox_pyspark.info import extract_column_values
52# ---------------------------------------------------------------------------- #
53# Exports ####
54# ---------------------------------------------------------------------------- #
57__all__: str_list = [
58 "duplicate_union_dataframe",
59 "union_all",
60]
63# ---------------------------------------------------------------------------- #
64# #
65# Functions ####
66# #
67# ---------------------------------------------------------------------------- #
70# ---------------------------------------------------------------------------- #
71# Firstly ####
72# ---------------------------------------------------------------------------- #
75@typechecked
76def duplicate_union_dataframe(
77 dataframe: psDataFrame,
78 by_list: str_list,
79 new_column_name: str,
80) -> psDataFrame:
81 """
82 !!! note "Summary"
83 The purpose here is to take a given table and duplicate it entirely multiple times from values in a list, then union them all together.
85 ???+ abstract "Details"
86 There are sometimes instances where we need to duplicate an entire table multiple times, with no change to the underlying data. Sometimes this is to maintain the structure of the data, but duplicate it to match a different table structure. This function is designed to do just that.<br>
87 The `dataframe` is the table to be duplicated, the `by_list` is the list of values to loop over, and the `new_column_name` is the new column to hold the loop values.
89 Params:
90 dataframe (psDataFrame):
91 The table to be duplicated.
92 by_list (str_list):
93 The list to loop over.
94 new_column_name (str):
95 The new column to hold the loop values.
97 Raises:
98 TypeError:
99 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
100 AttributeError:
101 If any given value in the `by_list` list is not a string.
103 Returns:
104 (psDataFrame):
105 The updated DataFrame.
107 ???+ example "Examples"
109 ```{.py .python linenums="1" title="Set up"}
110 >>> # Imports
111 >>> import pandas as pd
112 >>> from pyspark.sql import SparkSession
113 >>> from toolbox_pyspark.duplication import duplicate_union_dataframe
114 >>>
115 >>> # Instantiate Spark
116 >>> spark = SparkSession.builder.getOrCreate()
117 >>>
118 >>> # Create data
119 >>> df = spark.createDataFrame(
120 ... pd.DataFrame(
121 ... {
122 ... "a": [1, 2, 3, 4],
123 ... "b": ["a", "b", "c", "d"],
124 ... "c": ["x", "x", "x", "x"],
125 ... "d": [2, 2, 2, 2],
126 ... }
127 ... )
128 ... )
129 >>>
130 >>> # Check
131 ```{.py .python linenums="1" title="Check"}
132 >>> df.show()
133 ```
134 <div class="result" markdown>
135 ```{.txt .text title="Terminal"}
136 +---+---+---+---+
137 | a | b | c | d |
138 +---+---+---+---+
139 | 1 | a | x | 2 |
140 | 2 | b | x | 2 |
141 | 3 | c | x | 2 |
142 | 4 | d | x | 2 |
143 +---+---+---+---+
144 ```
145 </div>
147 ```{.py .python linenums="1" title="Example 1: Column missing"}
148 >>> duplicate_union_dataframe(
149 ... dataframe=df,
150 ... by_list=["x", "y", "z"],
151 ... new_column_name="n",
152 ... ).show()
153 ```
154 <div class="result" markdown>
155 ```{.txt .text title="Terminal"}
156 +---+---+---+---+---+
157 | a | b | c | d | n |
158 +---+---+---+---+---+
159 | 1 | a | x | 2 | x |
160 | 2 | b | x | 2 | x |
161 | 3 | c | x | 2 | x |
162 | 4 | d | x | 2 | x |
163 | 1 | a | x | 2 | y |
164 | 2 | b | x | 2 | y |
165 | 3 | c | x | 2 | y |
166 | 4 | d | x | 2 | y |
167 | 1 | a | x | 2 | z |
168 | 2 | b | x | 2 | z |
169 | 3 | c | x | 2 | z |
170 | 4 | d | x | 2 | z |
171 +---+---+---+---+---+
172 ```
173 !!! success "Conclusion: Successfully duplicated data frame multiple times."
174 </div>
176 ```{.py .python linenums="1" title="Example 2: Column existing"}
177 >>> duplicate_union_dataframe(
178 ... dataframe=df,
179 ... by_list=["x", "y", "z"],
180 ... new_column_name="c",
181 ... ).show()
182 ```
183 <div class="result" markdown>
184 ```{.txt .text title="Terminal"}
185 +---+---+---+---+
186 | a | b | c | d |
187 +---+---+---+---+
188 | 1 | a | x | 2 |
189 | 2 | b | x | 2 |
190 | 3 | c | x | 2 |
191 | 4 | d | x | 2 |
192 | 1 | a | y | 2 |
193 | 2 | b | y | 2 |
194 | 3 | c | y | 2 |
195 | 4 | d | y | 2 |
196 | 1 | a | z | 2 |
197 | 2 | b | z | 2 |
198 | 3 | c | z | 2 |
199 | 4 | d | z | 2 |
200 +---+---+---+---+
201 ```
202 !!! success "Conclusion: Successfully duplicated data frame multiple times."
203 </div>
205 ??? info "Notes"
206 - How the `union` is performed:
207 - Currently this function uses the `loop` and `append` method.
208 - It was written this way because it's a lot easier and more logical for humans to understand.
209 - However, there's probably a more computationally efficient method for doing this by using SQL Joins.
210 - More specifically, for creating a CARTESIAN PRODUCT (aka a 'Cross-Join') over the data set.
211 - This is probably one of the only times EVER that a developer would _want_ to create a cartesian product.
212 - All other times a cartesian product is to be avoided at all costs...
213 - Whether or not the column `new_column_name` exists or not on the `dataframe`:
214 - The process is a little different for if the `new_column_name` is existing or not...
215 - If it is existing, we need to:
216 - Extract the `#!sql distinct` values from that column,
217 - Create a duplicate copy of the raw table,
218 - Loop through all values in `by_list`,
219 - Check if that `value` from `by_list` is already existing in the extracted values from the `new_column_name` column,
220 - If it is already existing, proceed to next iteration,
221 - If it is not existing, take the raw table, update `new_column_name` to be the `value` from that iteration of `by_list`, then `#!sql union` that to the copy of the raw table,
222 - Continue to iterate through all values in `by_list` until they're all `#!sql union`'ed together.
223 - If it is not existing, we need to:
224 - Add a new column to `dataframe` that has the name from `new_column_name`, and a single literal value from the zero'th index of the `by_list`,
225 - Then to go through the same process as if the column were existing.
226 - Having now achieved this, the final output `dataframe` will now have all the updated duplicate values that we require.
228 ???+ warning "Warning"
229 Obviously, it's easy to see how this function will blow out the size of a table to tremendious sizes. So be careful!
230 """
232 def _self_union_dataframe_with_column_existing(
233 dataframe: psDataFrame,
234 by_list: str_list,
235 new_column_name: str,
236 ) -> psDataFrame:
237 values_in_col: list = extract_column_values(
238 dataframe=dataframe,
239 column=new_column_name,
240 distinct=True,
241 return_type="flat_list",
242 )
243 new_df: psDataFrame = dataframe
244 for value in by_list:
245 if value in values_in_col: # type: ignore
246 continue
247 new_df = new_df.unionAll(dataframe.withColumn(new_column_name, F.lit(value)))
248 return new_df
250 def _self_union_dataframe_with_column_missing(
251 dataframe: psDataFrame,
252 by_list: str_list,
253 new_column_name: str,
254 ) -> psDataFrame:
255 new_df = dataframe.withColumn(new_column_name, F.lit(by_list[0]))
256 return _self_union_dataframe_with_column_existing(
257 dataframe=new_df,
258 by_list=by_list,
259 new_column_name=new_column_name,
260 )
262 if new_column_name in dataframe.columns:
263 return _self_union_dataframe_with_column_existing(
264 dataframe=dataframe,
265 by_list=by_list,
266 new_column_name=new_column_name,
267 )
268 else:
269 return _self_union_dataframe_with_column_missing(
270 dataframe=dataframe,
271 by_list=by_list,
272 new_column_name=new_column_name,
273 )
276@typechecked
277def union_all(dfs: list[psDataFrame]) -> psDataFrame:
278 """
279 !!! note "Summary"
280 Take a list of `dataframes`, and union them all together.
282 ???+ abstract "Details"
283 If any columns are missing or added in any of the `dataframes` within `dfs`, then they will be automatically handled with the `allowMissingColumns` parameter, and any of the other `dataframes` will simply contain `#!sql null` values for those columns which they are missing.
285 Params:
286 dfs (list[psDataFrame]):
287 The list of `dataframe`'s to union together.
289 Raises:
290 TypeError:
291 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
293 Returns:
294 (psDataFrame):
295 A single `dataframe` containing a union of all the `dataframe`s.
297 ???+ example "Examples"
298 ```{.py .python linenums="1" title="Set up"}
299 >>> # Imports
300 >>> import pandas as pd
301 >>> from pyspark.sql import SparkSession
302 >>> from toolbox_pyspark.duplication import duplicate_union_dataframe
303 >>>
304 >>> # Instantiate Spark
305 >>> spark = SparkSession.builder.getOrCreate()
306 >>>
307 >>> # Create data
308 >>> df1 = spark.createDataFrame(
309 ... pd.DataFrame(
310 ... {
311 ... "a": [1, 2, 3, 4],
312 ... "b": ["a", "b", "c", "d"],
313 ... "c": [1, 1, 1, 1],
314 ... "d": [2, 2, 2, 2],
315 ... })
316 ... )
317 >>> df2 = spark.createDataFrame(
318 ... pd.DataFrame(
319 ... {
320 ... "a': [1, 2, 3, 4],
321 ... "b': ["a", "b", "c", "d"],
322 ... "c': [1, 1, 1, 1],
323 ... })
324 ... )
325 >>> df3 = spark.createDataFrame(
326 ... pd.DataFrame(
327 ... {
328 ... "a': [1, 2, 3, 4],
329 ... "b': ["a", "b", "c", "d"],
330 ... "c': [1, 1, 1, 1],
331 ... "e': [3, 3, 3, 3],
332 ... })
333 ... )
334 >>> dfs = [df1, df2, df3]
335 >>>
336 >>> # Check
337 >>> for df in dfs:
338 ... df.show()
339 ```
340 <div class="result" markdown>
341 ```{.txt .text title="Terminal"}
342 +---+---+---+---+
343 | a | b | c | d |
344 +---+---+---+---+
345 | 1 | a | 1 | 2 |
346 | 2 | b | 1 | 2 |
347 | 3 | c | 1 | 2 |
348 | 4 | d | 1 | 2 |
349 +---+---+---+---+
350 ```
351 ```{.txt .text title="Terminal"}
352 +---+---+---+
353 | a | b | c |
354 +---+---+---+
355 | 1 | a | 1 |
356 | 2 | b | 1 |
357 | 3 | c | 1 |
358 | 4 | d | 1 |
359 +---+---+---+
360 ```
361 ```{.txt .text title="Terminal"}
362 +---+---+---+---+
363 | a | b | c | e |
364 +---+---+---+---+
365 | 1 | a | 1 | 3 |
366 | 2 | b | 1 | 3 |
367 | 3 | c | 1 | 3 |
368 | 4 | d | 1 | 3 |
369 +---+---+---+---+
370 ```
371 </div>
373 ```{.py .python linenums="1" title="Example 1: Basic usage"}
374 >>> union_all(dfs).show()
375 ```
376 <div class="result" markdown>
377 ```{.txt .text title="Terminal"}
378 +---+---+---+------+------+
379 | a | b | c | d | e |
380 +---+---+---+------+------+
381 | 1 | a | 1 | 2 | null |
382 | 2 | b | 1 | 2 | null |
383 | 3 | c | 1 | 2 | null |
384 | 4 | d | 1 | 2 | null |
385 | 1 | a | 1 | null | null |
386 | 2 | b | 1 | null | null |
387 | 3 | c | 1 | null | null |
388 | 4 | d | 1 | null | null |
389 | 1 | a | 1 | null | 3 |
390 | 2 | b | 1 | null | 3 |
391 | 3 | c | 1 | null | 3 |
392 | 4 | d | 1 | null | 3 |
393 +---+---+---+------+------+
394 ```
395 !!! success "Conclusion: Successfully unioned all data frames together."
396 </div>
397 """
398 if len(dfs) > 1:
399 return dfs[0].unionByName(union_all(dfs[1:]), allowMissingColumns=True)
400 else:
401 return dfs[0]