Skip to content

Duplication

toolbox_pyspark.duplication 🔗

Summary

The duplication module is used for duplicating data from an existing dataframe, or unioning multiple dataframe's together.

duplicate_union_dataframe 🔗

duplicate_union_dataframe(
    dataframe: psDataFrame,
    by_list: str_list,
    new_column_name: str,
) -> psDataFrame

Summary

The purpose here is to take a given table and duplicate it entirely multiple times from values in a list, then union them all together.

Details

There are sometimes instances where we need to duplicate an entire table multiple times, with no change to the underlying data. Sometimes this is to maintain the structure of the data, but duplicate it to match a different table structure. This function is designed to do just that.
The dataframe is the table to be duplicated, the by_list is the list of values to loop over, and the new_column_name is the new column to hold the loop values.

Parameters:

Name Type Description Default
dataframe DataFrame

The table to be duplicated.

required
by_list str_list

The list to loop over.

required
new_column_name str

The new column to hold the loop values.

required

Raises:

Type Description
TypeError

If any of the inputs parsed to the parameters of this function are not the correct type. Uses the @typeguard.typechecked decorator.

AttributeError

If any given value in the by_list list is not a string.

Returns:

Type Description
DataFrame

The updated DataFrame.

Examples

Set up
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
>>> # Imports
>>> import pandas as pd
>>> from pyspark.sql import SparkSession
>>> from toolbox_pyspark.duplication import duplicate_union_dataframe
>>>
>>> # Instantiate Spark
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> # Create data
>>> df = spark.createDataFrame(
...     pd.DataFrame(
...         {
...             "a": [1, 2, 3, 4],
...             "b": ["a", "b", "c", "d"],
...             "c": ["x", "x", "x", "x"],
...             "d": [2, 2, 2, 2],
...         }
...     )
... )
>>>
>>> # Check
```{.py .python linenums="1" title="Check"}
>>> df.show()
Terminal
+---+---+---+---+
| a | b | c | d |
+---+---+---+---+
| 1 | a | x | 2 |
| 2 | b | x | 2 |
| 3 | c | x | 2 |
| 4 | d | x | 2 |
+---+---+---+---+

Example 1: Column missing
1
2
3
4
5
>>> duplicate_union_dataframe(
...     dataframe=df,
...     by_list=["x", "y", "z"],
...     new_column_name="n",
... ).show()
Terminal
+---+---+---+---+---+
| a | b | c | d | n |
+---+---+---+---+---+
| 1 | a | x | 2 | x |
| 2 | b | x | 2 | x |
| 3 | c | x | 2 | x |
| 4 | d | x | 2 | x |
| 1 | a | x | 2 | y |
| 2 | b | x | 2 | y |
| 3 | c | x | 2 | y |
| 4 | d | x | 2 | y |
| 1 | a | x | 2 | z |
| 2 | b | x | 2 | z |
| 3 | c | x | 2 | z |
| 4 | d | x | 2 | z |
+---+---+---+---+---+

Conclusion: Successfully duplicated data frame multiple times.

Example 2: Column existing
1
2
3
4
5
>>> duplicate_union_dataframe(
...     dataframe=df,
...     by_list=["x", "y", "z"],
...     new_column_name="c",
... ).show()
Terminal
+---+---+---+---+
| a | b | c | d |
+---+---+---+---+
| 1 | a | x | 2 |
| 2 | b | x | 2 |
| 3 | c | x | 2 |
| 4 | d | x | 2 |
| 1 | a | y | 2 |
| 2 | b | y | 2 |
| 3 | c | y | 2 |
| 4 | d | y | 2 |
| 1 | a | z | 2 |
| 2 | b | z | 2 |
| 3 | c | z | 2 |
| 4 | d | z | 2 |
+---+---+---+---+

Conclusion: Successfully duplicated data frame multiple times.

Notes
  • How the union is performed:
    • Currently this function uses the loop and append method.
    • It was written this way because it's a lot easier and more logical for humans to understand.
    • However, there's probably a more computationally efficient method for doing this by using SQL Joins.
    • More specifically, for creating a CARTESIAN PRODUCT (aka a 'Cross-Join') over the data set.
    • This is probably one of the only times EVER that a developer would want to create a cartesian product.
    • All other times a cartesian product is to be avoided at all costs...
  • Whether or not the column new_column_name exists or not on the dataframe:
    • The process is a little different for if the new_column_name is existing or not...
    • If it is existing, we need to:
      • Extract the distinct values from that column,
      • Create a duplicate copy of the raw table,
      • Loop through all values in by_list,
      • Check if that value from by_list is already existing in the extracted values from the new_column_name column,
      • If it is already existing, proceed to next iteration,
      • If it is not existing, take the raw table, update new_column_name to be the value from that iteration of by_list, then union that to the copy of the raw table,
      • Continue to iterate through all values in by_list until they're all union'ed together.
    • If it is not existing, we need to:
      • Add a new column to dataframe that has the name from new_column_name, and a single literal value from the zero'th index of the by_list,
      • Then to go through the same process as if the column were existing.
    • Having now achieved this, the final output dataframe will now have all the updated duplicate values that we require.
Warning

Obviously, it's easy to see how this function will blow out the size of a table to tremendious sizes. So be careful!

Source code in src/toolbox_pyspark/duplication.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
@typechecked
def duplicate_union_dataframe(
    dataframe: psDataFrame,
    by_list: str_list,
    new_column_name: str,
) -> psDataFrame:
    """
    !!! note "Summary"
        The purpose here is to take a given table and duplicate it entirely multiple times from values in a list, then union them all together.

    ???+ abstract "Details"
        There are sometimes instances where we need to duplicate an entire table multiple times, with no change to the underlying data. Sometimes this is to maintain the structure of the data, but duplicate it to match a different table structure. This function is designed to do just that.<br>
        The `dataframe` is the table to be duplicated, the `by_list` is the list of values to loop over, and the `new_column_name` is the new column to hold the loop values.

    Params:
        dataframe (psDataFrame):
            The table to be duplicated.
        by_list (str_list):
            The list to loop over.
        new_column_name (str):
            The new column to hold the loop values.

    Raises:
        TypeError:
            If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
        AttributeError:
            If any given value in the `by_list` list is not a string.

    Returns:
        (psDataFrame):
            The updated DataFrame.

    ???+ example "Examples"

        ```{.py .python linenums="1" title="Set up"}
        >>> # Imports
        >>> import pandas as pd
        >>> from pyspark.sql import SparkSession
        >>> from toolbox_pyspark.duplication import duplicate_union_dataframe
        >>>
        >>> # Instantiate Spark
        >>> spark = SparkSession.builder.getOrCreate()
        >>>
        >>> # Create data
        >>> df = spark.createDataFrame(
        ...     pd.DataFrame(
        ...         {
        ...             "a": [1, 2, 3, 4],
        ...             "b": ["a", "b", "c", "d"],
        ...             "c": ["x", "x", "x", "x"],
        ...             "d": [2, 2, 2, 2],
        ...         }
        ...     )
        ... )
        >>>
        >>> # Check
        ```{.py .python linenums="1" title="Check"}
        >>> df.show()
        ```
        <div class="result" markdown>
        ```{.txt .text title="Terminal"}
        +---+---+---+---+
        | a | b | c | d |
        +---+---+---+---+
        | 1 | a | x | 2 |
        | 2 | b | x | 2 |
        | 3 | c | x | 2 |
        | 4 | d | x | 2 |
        +---+---+---+---+
        ```
        </div>

        ```{.py .python linenums="1" title="Example 1: Column missing"}
        >>> duplicate_union_dataframe(
        ...     dataframe=df,
        ...     by_list=["x", "y", "z"],
        ...     new_column_name="n",
        ... ).show()
        ```
        <div class="result" markdown>
        ```{.txt .text title="Terminal"}
        +---+---+---+---+---+
        | a | b | c | d | n |
        +---+---+---+---+---+
        | 1 | a | x | 2 | x |
        | 2 | b | x | 2 | x |
        | 3 | c | x | 2 | x |
        | 4 | d | x | 2 | x |
        | 1 | a | x | 2 | y |
        | 2 | b | x | 2 | y |
        | 3 | c | x | 2 | y |
        | 4 | d | x | 2 | y |
        | 1 | a | x | 2 | z |
        | 2 | b | x | 2 | z |
        | 3 | c | x | 2 | z |
        | 4 | d | x | 2 | z |
        +---+---+---+---+---+
        ```
        !!! success "Conclusion: Successfully duplicated data frame multiple times."
        </div>

        ```{.py .python linenums="1" title="Example 2: Column existing"}
        >>> duplicate_union_dataframe(
        ...     dataframe=df,
        ...     by_list=["x", "y", "z"],
        ...     new_column_name="c",
        ... ).show()
        ```
        <div class="result" markdown>
        ```{.txt .text title="Terminal"}
        +---+---+---+---+
        | a | b | c | d |
        +---+---+---+---+
        | 1 | a | x | 2 |
        | 2 | b | x | 2 |
        | 3 | c | x | 2 |
        | 4 | d | x | 2 |
        | 1 | a | y | 2 |
        | 2 | b | y | 2 |
        | 3 | c | y | 2 |
        | 4 | d | y | 2 |
        | 1 | a | z | 2 |
        | 2 | b | z | 2 |
        | 3 | c | z | 2 |
        | 4 | d | z | 2 |
        +---+---+---+---+
        ```
        !!! success "Conclusion: Successfully duplicated data frame multiple times."
        </div>

    ??? info "Notes"
        - How the `union` is performed:
            - Currently this function uses the `loop` and `append` method.
            - It was written this way because it's a lot easier and more logical for humans to understand.
            - However, there's probably a more computationally efficient method for doing this by using SQL Joins.
            - More specifically, for creating a CARTESIAN PRODUCT (aka a 'Cross-Join') over the data set.
            - This is probably one of the only times EVER that a developer would _want_ to create a cartesian product.
            - All other times a cartesian product is to be avoided at all costs...
        - Whether or not the column `new_column_name` exists or not on the `dataframe`:
            - The process is a little different for if the `new_column_name` is existing or not...
            - If it is existing, we need to:
                - Extract the `#!sql distinct` values from that column,
                - Create a duplicate copy of the raw table,
                - Loop through all values in `by_list`,
                - Check if that `value` from `by_list` is already existing in the extracted values from the `new_column_name` column,
                - If it is already existing, proceed to next iteration,
                - If it is not existing, take the raw table, update `new_column_name` to be the `value` from that iteration of `by_list`, then `#!sql union` that to the copy of the raw table,
                - Continue to iterate through all values in `by_list` until they're all `#!sql union`'ed together.
            - If it is not existing, we need to:
                - Add a new column to `dataframe` that has the name from `new_column_name`, and a single literal value from the zero'th index of the `by_list`,
                - Then to go through the same process as if the column were existing.
            - Having now achieved this, the final output `dataframe` will now have all the updated duplicate values that we require.

    ???+ warning "Warning"
        Obviously, it's easy to see how this function will blow out the size of a table to tremendious sizes. So be careful!
    """

    def _self_union_dataframe_with_column_existing(
        dataframe: psDataFrame,
        by_list: str_list,
        new_column_name: str,
    ) -> psDataFrame:
        values_in_col: list = extract_column_values(
            dataframe=dataframe,
            column=new_column_name,
            distinct=True,
            return_type="flat_list",
        )
        new_df: psDataFrame = dataframe
        for value in by_list:
            if value in values_in_col:  # type: ignore
                continue
            new_df = new_df.unionAll(dataframe.withColumn(new_column_name, F.lit(value)))
        return new_df

    def _self_union_dataframe_with_column_missing(
        dataframe: psDataFrame,
        by_list: str_list,
        new_column_name: str,
    ) -> psDataFrame:
        new_df = dataframe.withColumn(new_column_name, F.lit(by_list[0]))
        return _self_union_dataframe_with_column_existing(
            dataframe=new_df,
            by_list=by_list,
            new_column_name=new_column_name,
        )

    if new_column_name in dataframe.columns:
        return _self_union_dataframe_with_column_existing(
            dataframe=dataframe,
            by_list=by_list,
            new_column_name=new_column_name,
        )
    else:
        return _self_union_dataframe_with_column_missing(
            dataframe=dataframe,
            by_list=by_list,
            new_column_name=new_column_name,
        )

union_all 🔗

union_all(dfs: list[psDataFrame]) -> psDataFrame

Summary

Take a list of dataframes, and union them all together.

Details

If any columns are missing or added in any of the dataframes within dfs, then they will be automatically handled with the allowMissingColumns parameter, and any of the other dataframes will simply contain null values for those columns which they are missing.

Parameters:

Name Type Description Default
dfs list[DataFrame]

The list of dataframe's to union together.

required

Raises:

Type Description
TypeError

If any of the inputs parsed to the parameters of this function are not the correct type. Uses the @typeguard.typechecked decorator.

Returns:

Type Description
DataFrame

A single dataframe containing a union of all the dataframes.

Examples

Set up
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
>>> # Imports
>>> import pandas as pd
>>> from pyspark.sql import SparkSession
>>> from toolbox_pyspark.duplication import duplicate_union_dataframe
>>>
>>> # Instantiate Spark
>>> spark = SparkSession.builder.getOrCreate()
>>>
>>> # Create data
>>> df1 = spark.createDataFrame(
...     pd.DataFrame(
...         {
...             "a": [1, 2, 3, 4],
...             "b": ["a", "b", "c", "d"],
...             "c": [1, 1, 1, 1],
...             "d": [2, 2, 2, 2],
...         })
... )
>>> df2 = spark.createDataFrame(
...     pd.DataFrame(
...         {
...             "a': [1, 2, 3, 4],
...             "b': ["a", "b", "c", "d"],
...             "c': [1, 1, 1, 1],
...     })
... )
>>> df3 = spark.createDataFrame(
...     pd.DataFrame(
...         {
...             "a': [1, 2, 3, 4],
...             "b': ["a", "b", "c", "d"],
...             "c': [1, 1, 1, 1],
...             "e': [3, 3, 3, 3],
...     })
... )
>>> dfs = [df1, df2, df3]
>>>
>>> # Check
>>> for df in dfs:
...     df.show()
Terminal
+---+---+---+---+
| a | b | c | d |
+---+---+---+---+
| 1 | a | 1 | 2 |
| 2 | b | 1 | 2 |
| 3 | c | 1 | 2 |
| 4 | d | 1 | 2 |
+---+---+---+---+
Terminal
+---+---+---+
| a | b | c |
+---+---+---+
| 1 | a | 1 |
| 2 | b | 1 |
| 3 | c | 1 |
| 4 | d | 1 |
+---+---+---+
Terminal
+---+---+---+---+
| a | b | c | e |
+---+---+---+---+
| 1 | a | 1 | 3 |
| 2 | b | 1 | 3 |
| 3 | c | 1 | 3 |
| 4 | d | 1 | 3 |
+---+---+---+---+

Example 1: Basic usage
1
>>> union_all(dfs).show()
Terminal
+---+---+---+------+------+
| a | b | c |    d |    e |
+---+---+---+------+------+
| 1 | a | 1 |    2 | null |
| 2 | b | 1 |    2 | null |
| 3 | c | 1 |    2 | null |
| 4 | d | 1 |    2 | null |
| 1 | a | 1 | null | null |
| 2 | b | 1 | null | null |
| 3 | c | 1 | null | null |
| 4 | d | 1 | null | null |
| 1 | a | 1 | null |    3 |
| 2 | b | 1 | null |    3 |
| 3 | c | 1 | null |    3 |
| 4 | d | 1 | null |    3 |
+---+---+---+------+------+

Conclusion: Successfully unioned all data frames together.

Source code in src/toolbox_pyspark/duplication.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
@typechecked
def union_all(dfs: list[psDataFrame]) -> psDataFrame:
    """
    !!! note "Summary"
        Take a list of `dataframes`, and union them all together.

    ???+ abstract "Details"
        If any columns are missing or added in any of the `dataframes` within `dfs`, then they will be automatically handled with the `allowMissingColumns` parameter, and any of the other `dataframes` will simply contain `#!sql null` values for those columns which they are missing.

    Params:
        dfs (list[psDataFrame]):
            The list of `dataframe`'s to union together.

    Raises:
        TypeError:
            If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.

    Returns:
        (psDataFrame):
            A single `dataframe` containing a union of all the `dataframe`s.

    ???+ example "Examples"
        ```{.py .python linenums="1" title="Set up"}
        >>> # Imports
        >>> import pandas as pd
        >>> from pyspark.sql import SparkSession
        >>> from toolbox_pyspark.duplication import duplicate_union_dataframe
        >>>
        >>> # Instantiate Spark
        >>> spark = SparkSession.builder.getOrCreate()
        >>>
        >>> # Create data
        >>> df1 = spark.createDataFrame(
        ...     pd.DataFrame(
        ...         {
        ...             "a": [1, 2, 3, 4],
        ...             "b": ["a", "b", "c", "d"],
        ...             "c": [1, 1, 1, 1],
        ...             "d": [2, 2, 2, 2],
        ...         })
        ... )
        >>> df2 = spark.createDataFrame(
        ...     pd.DataFrame(
        ...         {
        ...             "a': [1, 2, 3, 4],
        ...             "b': ["a", "b", "c", "d"],
        ...             "c': [1, 1, 1, 1],
        ...     })
        ... )
        >>> df3 = spark.createDataFrame(
        ...     pd.DataFrame(
        ...         {
        ...             "a': [1, 2, 3, 4],
        ...             "b': ["a", "b", "c", "d"],
        ...             "c': [1, 1, 1, 1],
        ...             "e': [3, 3, 3, 3],
        ...     })
        ... )
        >>> dfs = [df1, df2, df3]
        >>>
        >>> # Check
        >>> for df in dfs:
        ...     df.show()
        ```
        <div class="result" markdown>
        ```{.txt .text title="Terminal"}
        +---+---+---+---+
        | a | b | c | d |
        +---+---+---+---+
        | 1 | a | 1 | 2 |
        | 2 | b | 1 | 2 |
        | 3 | c | 1 | 2 |
        | 4 | d | 1 | 2 |
        +---+---+---+---+
        ```
        ```{.txt .text title="Terminal"}
        +---+---+---+
        | a | b | c |
        +---+---+---+
        | 1 | a | 1 |
        | 2 | b | 1 |
        | 3 | c | 1 |
        | 4 | d | 1 |
        +---+---+---+
        ```
        ```{.txt .text title="Terminal"}
        +---+---+---+---+
        | a | b | c | e |
        +---+---+---+---+
        | 1 | a | 1 | 3 |
        | 2 | b | 1 | 3 |
        | 3 | c | 1 | 3 |
        | 4 | d | 1 | 3 |
        +---+---+---+---+
        ```
        </div>

        ```{.py .python linenums="1" title="Example 1: Basic usage"}
        >>> union_all(dfs).show()
        ```
        <div class="result" markdown>
        ```{.txt .text title="Terminal"}
        +---+---+---+------+------+
        | a | b | c |    d |    e |
        +---+---+---+------+------+
        | 1 | a | 1 |    2 | null |
        | 2 | b | 1 |    2 | null |
        | 3 | c | 1 |    2 | null |
        | 4 | d | 1 |    2 | null |
        | 1 | a | 1 | null | null |
        | 2 | b | 1 | null | null |
        | 3 | c | 1 | null | null |
        | 4 | d | 1 | null | null |
        | 1 | a | 1 | null |    3 |
        | 2 | b | 1 | null |    3 |
        | 3 | c | 1 | null |    3 |
        | 4 | d | 1 | null |    3 |
        +---+---+---+------+------+
        ```
        !!! success "Conclusion: Successfully unioned all data frames together."
        </div>
    """
    if len(dfs) > 1:
        return dfs[0].unionByName(union_all(dfs[1:]), allowMissingColumns=True)
    else:
        return dfs[0]