Coverage for src/toolbox_pyspark/duplication.py: 100%

26 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-25 23:08 +0000

1# ============================================================================ # 

2# # 

3# Title : Duplication # 

4# Purpose : Duplicate from an existing `dataframe`, or union multiple # 

5# `dataframe`'s together. # 

6# # 

7# ============================================================================ # 

8 

9 

10# ---------------------------------------------------------------------------- # 

11# # 

12# Overview #### 

13# # 

14# ---------------------------------------------------------------------------- # 

15 

16 

17# ---------------------------------------------------------------------------- # 

18# Description #### 

19# ---------------------------------------------------------------------------- # 

20 

21 

22""" 

23!!! note "Summary" 

24 The `duplication` module is used for duplicating data from an existing `dataframe`, or unioning multiple `dataframe`'s together. 

25""" 

26 

27 

28# ---------------------------------------------------------------------------- # 

29# # 

30# Setup #### 

31# # 

32# ---------------------------------------------------------------------------- # 

33 

34 

35# ---------------------------------------------------------------------------- # 

36# Imports #### 

37# ---------------------------------------------------------------------------- # 

38 

39 

40# ## Python Third Party Imports ---- 

41from pyspark.sql import ( 

42 DataFrame as psDataFrame, 

43 functions as F, 

44) 

45from toolbox_python.collection_types import str_list 

46from typeguard import typechecked 

47 

48# ## Local First Party Imports ---- 

49from toolbox_pyspark.info import extract_column_values 

50 

51 

52# ---------------------------------------------------------------------------- # 

53# Exports #### 

54# ---------------------------------------------------------------------------- # 

55 

56 

57__all__: str_list = [ 

58 "duplicate_union_dataframe", 

59 "union_all", 

60] 

61 

62 

63# ---------------------------------------------------------------------------- # 

64# # 

65# Functions #### 

66# # 

67# ---------------------------------------------------------------------------- # 

68 

69 

70# ---------------------------------------------------------------------------- # 

71# Firstly #### 

72# ---------------------------------------------------------------------------- # 

73 

74 

75@typechecked 

76def duplicate_union_dataframe( 

77 dataframe: psDataFrame, 

78 by_list: str_list, 

79 new_column_name: str, 

80) -> psDataFrame: 

81 """ 

82 !!! note "Summary" 

83 The purpose here is to take a given table and duplicate it entirely multiple times from values in a list, then union them all together. 

84 

85 ???+ abstract "Details" 

86 There are sometimes instances where we need to duplicate an entire table multiple times, with no change to the underlying data. Sometimes this is to maintain the structure of the data, but duplicate it to match a different table structure. This function is designed to do just that.<br> 

87 The `dataframe` is the table to be duplicated, the `by_list` is the list of values to loop over, and the `new_column_name` is the new column to hold the loop values. 

88 

89 Params: 

90 dataframe (psDataFrame): 

91 The table to be duplicated. 

92 by_list (str_list): 

93 The list to loop over. 

94 new_column_name (str): 

95 The new column to hold the loop values. 

96 

97 Raises: 

98 TypeError: 

99 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

100 AttributeError: 

101 If any given value in the `by_list` list is not a string. 

102 

103 Returns: 

104 (psDataFrame): 

105 The updated DataFrame. 

106 

107 ???+ example "Examples" 

108 

109 ```{.py .python linenums="1" title="Set up"} 

110 >>> # Imports 

111 >>> import pandas as pd 

112 >>> from pyspark.sql import SparkSession 

113 >>> from toolbox_pyspark.duplication import duplicate_union_dataframe 

114 >>> 

115 >>> # Instantiate Spark 

116 >>> spark = SparkSession.builder.getOrCreate() 

117 >>> 

118 >>> # Create data 

119 >>> df = spark.createDataFrame( 

120 ... pd.DataFrame( 

121 ... { 

122 ... "a": [1, 2, 3, 4], 

123 ... "b": ["a", "b", "c", "d"], 

124 ... "c": ["x", "x", "x", "x"], 

125 ... "d": [2, 2, 2, 2], 

126 ... } 

127 ... ) 

128 ... ) 

129 >>> 

130 >>> # Check 

131 ```{.py .python linenums="1" title="Check"} 

132 >>> df.show() 

133 ``` 

134 <div class="result" markdown> 

135 ```{.txt .text title="Terminal"} 

136 +---+---+---+---+ 

137 | a | b | c | d | 

138 +---+---+---+---+ 

139 | 1 | a | x | 2 | 

140 | 2 | b | x | 2 | 

141 | 3 | c | x | 2 | 

142 | 4 | d | x | 2 | 

143 +---+---+---+---+ 

144 ``` 

145 </div> 

146 

147 ```{.py .python linenums="1" title="Example 1: Column missing"} 

148 >>> duplicate_union_dataframe( 

149 ... dataframe=df, 

150 ... by_list=["x", "y", "z"], 

151 ... new_column_name="n", 

152 ... ).show() 

153 ``` 

154 <div class="result" markdown> 

155 ```{.txt .text title="Terminal"} 

156 +---+---+---+---+---+ 

157 | a | b | c | d | n | 

158 +---+---+---+---+---+ 

159 | 1 | a | x | 2 | x | 

160 | 2 | b | x | 2 | x | 

161 | 3 | c | x | 2 | x | 

162 | 4 | d | x | 2 | x | 

163 | 1 | a | x | 2 | y | 

164 | 2 | b | x | 2 | y | 

165 | 3 | c | x | 2 | y | 

166 | 4 | d | x | 2 | y | 

167 | 1 | a | x | 2 | z | 

168 | 2 | b | x | 2 | z | 

169 | 3 | c | x | 2 | z | 

170 | 4 | d | x | 2 | z | 

171 +---+---+---+---+---+ 

172 ``` 

173 !!! success "Conclusion: Successfully duplicated data frame multiple times." 

174 </div> 

175 

176 ```{.py .python linenums="1" title="Example 2: Column existing"} 

177 >>> duplicate_union_dataframe( 

178 ... dataframe=df, 

179 ... by_list=["x", "y", "z"], 

180 ... new_column_name="c", 

181 ... ).show() 

182 ``` 

183 <div class="result" markdown> 

184 ```{.txt .text title="Terminal"} 

185 +---+---+---+---+ 

186 | a | b | c | d | 

187 +---+---+---+---+ 

188 | 1 | a | x | 2 | 

189 | 2 | b | x | 2 | 

190 | 3 | c | x | 2 | 

191 | 4 | d | x | 2 | 

192 | 1 | a | y | 2 | 

193 | 2 | b | y | 2 | 

194 | 3 | c | y | 2 | 

195 | 4 | d | y | 2 | 

196 | 1 | a | z | 2 | 

197 | 2 | b | z | 2 | 

198 | 3 | c | z | 2 | 

199 | 4 | d | z | 2 | 

200 +---+---+---+---+ 

201 ``` 

202 !!! success "Conclusion: Successfully duplicated data frame multiple times." 

203 </div> 

204 

205 ??? info "Notes" 

206 - How the `union` is performed: 

207 - Currently this function uses the `loop` and `append` method. 

208 - It was written this way because it's a lot easier and more logical for humans to understand. 

209 - However, there's probably a more computationally efficient method for doing this by using SQL Joins. 

210 - More specifically, for creating a CARTESIAN PRODUCT (aka a 'Cross-Join') over the data set. 

211 - This is probably one of the only times EVER that a developer would _want_ to create a cartesian product. 

212 - All other times a cartesian product is to be avoided at all costs... 

213 - Whether or not the column `new_column_name` exists or not on the `dataframe`: 

214 - The process is a little different for if the `new_column_name` is existing or not... 

215 - If it is existing, we need to: 

216 - Extract the `#!sql distinct` values from that column, 

217 - Create a duplicate copy of the raw table, 

218 - Loop through all values in `by_list`, 

219 - Check if that `value` from `by_list` is already existing in the extracted values from the `new_column_name` column, 

220 - If it is already existing, proceed to next iteration, 

221 - If it is not existing, take the raw table, update `new_column_name` to be the `value` from that iteration of `by_list`, then `#!sql union` that to the copy of the raw table, 

222 - Continue to iterate through all values in `by_list` until they're all `#!sql union`'ed together. 

223 - If it is not existing, we need to: 

224 - Add a new column to `dataframe` that has the name from `new_column_name`, and a single literal value from the zero'th index of the `by_list`, 

225 - Then to go through the same process as if the column were existing. 

226 - Having now achieved this, the final output `dataframe` will now have all the updated duplicate values that we require. 

227 

228 ???+ warning "Warning" 

229 Obviously, it's easy to see how this function will blow out the size of a table to tremendious sizes. So be careful! 

230 """ 

231 

232 def _self_union_dataframe_with_column_existing( 

233 dataframe: psDataFrame, 

234 by_list: str_list, 

235 new_column_name: str, 

236 ) -> psDataFrame: 

237 values_in_col: list = extract_column_values( 

238 dataframe=dataframe, 

239 column=new_column_name, 

240 distinct=True, 

241 return_type="flat_list", 

242 ) 

243 new_df: psDataFrame = dataframe 

244 for value in by_list: 

245 if value in values_in_col: # type: ignore 

246 continue 

247 new_df = new_df.unionAll(dataframe.withColumn(new_column_name, F.lit(value))) 

248 return new_df 

249 

250 def _self_union_dataframe_with_column_missing( 

251 dataframe: psDataFrame, 

252 by_list: str_list, 

253 new_column_name: str, 

254 ) -> psDataFrame: 

255 new_df = dataframe.withColumn(new_column_name, F.lit(by_list[0])) 

256 return _self_union_dataframe_with_column_existing( 

257 dataframe=new_df, 

258 by_list=by_list, 

259 new_column_name=new_column_name, 

260 ) 

261 

262 if new_column_name in dataframe.columns: 

263 return _self_union_dataframe_with_column_existing( 

264 dataframe=dataframe, 

265 by_list=by_list, 

266 new_column_name=new_column_name, 

267 ) 

268 else: 

269 return _self_union_dataframe_with_column_missing( 

270 dataframe=dataframe, 

271 by_list=by_list, 

272 new_column_name=new_column_name, 

273 ) 

274 

275 

276@typechecked 

277def union_all(dfs: list[psDataFrame]) -> psDataFrame: 

278 """ 

279 !!! note "Summary" 

280 Take a list of `dataframes`, and union them all together. 

281 

282 ???+ abstract "Details" 

283 If any columns are missing or added in any of the `dataframes` within `dfs`, then they will be automatically handled with the `allowMissingColumns` parameter, and any of the other `dataframes` will simply contain `#!sql null` values for those columns which they are missing. 

284 

285 Params: 

286 dfs (list[psDataFrame]): 

287 The list of `dataframe`'s to union together. 

288 

289 Raises: 

290 TypeError: 

291 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

292 

293 Returns: 

294 (psDataFrame): 

295 A single `dataframe` containing a union of all the `dataframe`s. 

296 

297 ???+ example "Examples" 

298 ```{.py .python linenums="1" title="Set up"} 

299 >>> # Imports 

300 >>> import pandas as pd 

301 >>> from pyspark.sql import SparkSession 

302 >>> from toolbox_pyspark.duplication import duplicate_union_dataframe 

303 >>> 

304 >>> # Instantiate Spark 

305 >>> spark = SparkSession.builder.getOrCreate() 

306 >>> 

307 >>> # Create data 

308 >>> df1 = spark.createDataFrame( 

309 ... pd.DataFrame( 

310 ... { 

311 ... "a": [1, 2, 3, 4], 

312 ... "b": ["a", "b", "c", "d"], 

313 ... "c": [1, 1, 1, 1], 

314 ... "d": [2, 2, 2, 2], 

315 ... }) 

316 ... ) 

317 >>> df2 = spark.createDataFrame( 

318 ... pd.DataFrame( 

319 ... { 

320 ... "a': [1, 2, 3, 4], 

321 ... "b': ["a", "b", "c", "d"], 

322 ... "c': [1, 1, 1, 1], 

323 ... }) 

324 ... ) 

325 >>> df3 = spark.createDataFrame( 

326 ... pd.DataFrame( 

327 ... { 

328 ... "a': [1, 2, 3, 4], 

329 ... "b': ["a", "b", "c", "d"], 

330 ... "c': [1, 1, 1, 1], 

331 ... "e': [3, 3, 3, 3], 

332 ... }) 

333 ... ) 

334 >>> dfs = [df1, df2, df3] 

335 >>> 

336 >>> # Check 

337 >>> for df in dfs: 

338 ... df.show() 

339 ``` 

340 <div class="result" markdown> 

341 ```{.txt .text title="Terminal"} 

342 +---+---+---+---+ 

343 | a | b | c | d | 

344 +---+---+---+---+ 

345 | 1 | a | 1 | 2 | 

346 | 2 | b | 1 | 2 | 

347 | 3 | c | 1 | 2 | 

348 | 4 | d | 1 | 2 | 

349 +---+---+---+---+ 

350 ``` 

351 ```{.txt .text title="Terminal"} 

352 +---+---+---+ 

353 | a | b | c | 

354 +---+---+---+ 

355 | 1 | a | 1 | 

356 | 2 | b | 1 | 

357 | 3 | c | 1 | 

358 | 4 | d | 1 | 

359 +---+---+---+ 

360 ``` 

361 ```{.txt .text title="Terminal"} 

362 +---+---+---+---+ 

363 | a | b | c | e | 

364 +---+---+---+---+ 

365 | 1 | a | 1 | 3 | 

366 | 2 | b | 1 | 3 | 

367 | 3 | c | 1 | 3 | 

368 | 4 | d | 1 | 3 | 

369 +---+---+---+---+ 

370 ``` 

371 </div> 

372 

373 ```{.py .python linenums="1" title="Example 1: Basic usage"} 

374 >>> union_all(dfs).show() 

375 ``` 

376 <div class="result" markdown> 

377 ```{.txt .text title="Terminal"} 

378 +---+---+---+------+------+ 

379 | a | b | c | d | e | 

380 +---+---+---+------+------+ 

381 | 1 | a | 1 | 2 | null | 

382 | 2 | b | 1 | 2 | null | 

383 | 3 | c | 1 | 2 | null | 

384 | 4 | d | 1 | 2 | null | 

385 | 1 | a | 1 | null | null | 

386 | 2 | b | 1 | null | null | 

387 | 3 | c | 1 | null | null | 

388 | 4 | d | 1 | null | null | 

389 | 1 | a | 1 | null | 3 | 

390 | 2 | b | 1 | null | 3 | 

391 | 3 | c | 1 | null | 3 | 

392 | 4 | d | 1 | null | 3 | 

393 +---+---+---+------+------+ 

394 ``` 

395 !!! success "Conclusion: Successfully unioned all data frames together." 

396 </div> 

397 """ 

398 if len(dfs) > 1: 

399 return dfs[0].unionByName(union_all(dfs[1:]), allowMissingColumns=True) 

400 else: 

401 return dfs[0]