Coverage for src/toolbox_pyspark/cleaning.py: 100%

71 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-25 23:08 +0000

1# ============================================================================ # 

2# # 

3# Title : Dataframe Cleaning # 

4# Purpose : Clean, fix, and fetch various aspects on a given DataFrame. # 

5# # 

6# ============================================================================ # 

7 

8 

9# ---------------------------------------------------------------------------- # 

10# # 

11# Overview #### 

12# # 

13# ---------------------------------------------------------------------------- # 

14 

15 

16# ---------------------------------------------------------------------------- # 

17# Description #### 

18# ---------------------------------------------------------------------------- # 

19 

20 

21""" 

22!!! note "Summary" 

23 The `cleaning` module is used to clean, fix, and fetch various aspects on a given DataFrame. 

24""" 

25 

26 

27# ---------------------------------------------------------------------------- # 

28# # 

29# Setup #### 

30# # 

31# ---------------------------------------------------------------------------- # 

32 

33 

34# ---------------------------------------------------------------------------- # 

35# Imports #### 

36# ---------------------------------------------------------------------------- # 

37 

38 

39# ## Python StdLib Imports ---- 

40from typing import Optional, Union 

41 

42# ## Python Third Party Imports ---- 

43from numpy import ndarray as npArray 

44from pandas import DataFrame as pdDataFrame 

45from pyspark.sql import ( 

46 Column, 

47 DataFrame as psDataFrame, 

48 SparkSession, 

49 functions as F, 

50 types as T, 

51) 

52from toolbox_python.checkers import is_type 

53from toolbox_python.collection_types import str_collection, str_list 

54from toolbox_python.lists import flatten 

55from typeguard import typechecked 

56 

57# ## Local First Party Imports ---- 

58from toolbox_pyspark.checks import assert_column_exists, assert_columns_exists 

59from toolbox_pyspark.columns import get_columns 

60from toolbox_pyspark.constants import ( 

61 LITERAL_LIST_OBJECT_NAMES, 

62 LITERAL_NUMPY_ARRAY_NAMES, 

63 LITERAL_PANDAS_DATAFRAME_NAMES, 

64 LITERAL_PYSPARK_DATAFRAME_NAMES, 

65 VALID_LIST_OBJECT_NAMES, 

66 VALID_NUMPY_ARRAY_NAMES, 

67 VALID_PANDAS_DATAFRAME_NAMES, 

68 VALID_PYAPARK_JOIN_TYPES, 

69 VALID_PYSPARK_DATAFRAME_NAMES, 

70 WHITESPACE_CHARACTERS as WHITESPACES, 

71) 

72 

73 

74# ---------------------------------------------------------------------------- # 

75# Exports #### 

76# ---------------------------------------------------------------------------- # 

77 

78 

79__all__: str_list = [ 

80 "create_empty_dataframe", 

81 "keep_first_record_by_columns", 

82 "convert_dataframe", 

83 "update_nullability", 

84 "trim_spaces_from_column", 

85 "trim_spaces_from_columns", 

86 "apply_function_to_column", 

87 "apply_function_to_columns", 

88 "drop_matching_rows", 

89] 

90 

91 

92# ---------------------------------------------------------------------------- # 

93# # 

94# Functions #### 

95# # 

96# ---------------------------------------------------------------------------- # 

97 

98 

99# ---------------------------------------------------------------------------- # 

100# Empty DataFrame #### 

101# ---------------------------------------------------------------------------- # 

102 

103 

104@typechecked 

105def create_empty_dataframe(spark_session: SparkSession) -> psDataFrame: 

106 return spark_session.createDataFrame([], T.StructType([])) 

107 

108 

109# ---------------------------------------------------------------------------- # 

110# Column processes #### 

111# ---------------------------------------------------------------------------- # 

112 

113 

114@typechecked 

115def keep_first_record_by_columns( 

116 dataframe: psDataFrame, 

117 columns: Union[str, str_collection], 

118) -> psDataFrame: 

119 """ 

120 !!! note "Summary" 

121 For a given Spark `#!py DataFrame`, keep the first record given by the column(s) specified in `#!py columns`. 

122 

123 ???+ abstract "Details" 

124 The necessity for this function arose when we needed to perform a `#!py distinct()` function for a given `#!py DataFrame`; however, we still wanted to retain data provided in the other columns. 

125 

126 Params: 

127 dataframe (psDataFrame): 

128 The DataFrame that you want to filter. 

129 columns (Union[str, str_collection]): 

130 The single or multiple columns by which you want to extract the distinct values from. 

131 

132 Raises: 

133 TypeError: 

134 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

135 ColumnDoesNotExistError: 

136 If any of the `#!py columns` do not exist within `#!py dataframe.columns`. 

137 

138 Returns: 

139 (psDataFrame): 

140 The updated dataframe, retaining only the first unique set of records from the columns specified in `#!py columns`. 

141 

142 ???+ example "Examples" 

143 

144 ```{.py .python linenums="1" title="Set up"} 

145 >>> # Imports 

146 >>> import pandas as pd 

147 >>> from pyspark.sql import SparkSession 

148 >>> from toolbox_pyspark.cleaning import keep_first_record_by_columns 

149 >>> 

150 >>> # Instantiate Spark 

151 >>> spark = SparkSession.builder.getOrCreate() 

152 >>> 

153 >>> # Create data 

154 >>> df = spark.createDataFrame( 

155 ... pd.DataFrame( 

156 ... { 

157 ... "a": [1, 2, 3, 4], 

158 ... "b": ["a", "b", "c", "d"], 

159 ... "c": [1, 1, 2, 2], 

160 ... "d": [1, 2, 2, 2], 

161 ... "e": [1, 1, 2, 3], 

162 ... } 

163 ... ) 

164 ... ) 

165 >>> 

166 >>> # Check 

167 >>> df.show() 

168 ``` 

169 <div class="result" markdown> 

170 ```{.txt .text title="Terminal"} 

171 +---+---+---+---+---+ 

172 | a | b | c | d | e | 

173 +---+---+---+---+---+ 

174 | 1 | a | 1 | 1 | 1 | 

175 | 2 | b | 1 | 2 | 1 | 

176 | 3 | c | 2 | 2 | 2 | 

177 | 4 | d | 2 | 2 | 3 | 

178 +---+---+---+---+---+ 

179 ``` 

180 </div> 

181 

182 ```{.py .python linenums="1" title="Example 1: Distinct by the `c` column"} 

183 >>> keep_first_record_by_columns(df, "c").show() 

184 ``` 

185 <div class="result" markdown> 

186 ```{.txt .text title="Terminal"} 

187 +---+---+---+---+---+ 

188 | a | b | c | d | e | 

189 +---+---+---+---+---+ 

190 | 1 | a | 1 | 1 | 1 | 

191 | 3 | c | 2 | 2 | 2 | 

192 +---+---+---+---+---+ 

193 ``` 

194 !!! success "Conclusion: Successfully kept first records by the `c` column." 

195 </div> 

196 

197 ```{.py .python linenums="1" title="Example 2: Distinct by the `d` column"} 

198 >>> keep_first_record_by_columns(df, "d").show() 

199 ``` 

200 <div class="result" markdown> 

201 ```{.txt .text title="Terminal"} 

202 +---+---+---+---+---+ 

203 | a | b | c | d | e | 

204 +---+---+---+---+---+ 

205 | 1 | a | 1 | 1 | 1 | 

206 | 2 | b | 1 | 2 | 1 | 

207 +---+---+---+---+---+ 

208 ``` 

209 !!! success "Conclusion: Successfully kept first records by the `d` column." 

210 </div> 

211 

212 ```{.py .python linenums="1" title="Example 3: Distinct by the `e` column"} 

213 >>> keep_first_record_by_columns(df, "e").show() 

214 ``` 

215 <div class="result" markdown> 

216 ```{.txt .text title="Terminal"} 

217 +---+---+---+---+---+ 

218 | a | b | c | d | e | 

219 +---+---+---+---+---+ 

220 | 1 | a | 1 | 1 | 1 | 

221 | 3 | c | 2 | 2 | 2 | 

222 | 4 | d | 2 | 2 | 3 | 

223 +---+---+---+---+---+ 

224 ``` 

225 !!! success "Conclusion: Successfully kept first records by the `e` column." 

226 </div> 

227 

228 ```{.py .python linenums="1" title="Example 4: Distinct by the `c` & `d` columns"} 

229 >>> keep_first_record_by_columns(df, ["c", "d"]).show() 

230 ``` 

231 <div class="result" markdown> 

232 ```{.txt .text title="Terminal"} 

233 +---+---+---+---+---+ 

234 | a | b | c | d | e | 

235 +---+---+---+---+---+ 

236 | 1 | a | 1 | 1 | 1 | 

237 | 2 | b | 1 | 2 | 1 | 

238 | 3 | c | 2 | 2 | 2 | 

239 +---+---+---+---+---+ 

240 ``` 

241 !!! success "Conclusion: Successfully kept first records by the `c` & `d` columns." 

242 </div> 

243 

244 ```{.py .python linenums="1" title="Example 5: Distinct by the `c` & `e` columns"} 

245 >>> keep_first_record_by_columns(df, ["c", "e"]).show() 

246 ``` 

247 <div class="result" markdown> 

248 ```{.txt .text title="Terminal"} 

249 +---+---+---+---+---+ 

250 | a | b | c | d | e | 

251 +---+---+---+---+---+ 

252 | 1 | a | 1 | 1 | 1 | 

253 | 3 | c | 2 | 2 | 2 | 

254 | 4 | d | 2 | 2 | 3 | 

255 +---+---+---+---+---+ 

256 ``` 

257 !!! success "Conclusion: Successfully kept first records by the `c` & `e` columns." 

258 </div> 

259 

260 ```{.py .python linenums="1" title="Example 6: Distinct by the `d` & `e` columns"} 

261 >>> keep_first_record_by_columns(df, ["d", "e"]).show() 

262 ``` 

263 <div class="result" markdown> 

264 ```{.txt .text title="Terminal"} 

265 +---+---+---+---+---+ 

266 | a | b | c | d | e | 

267 +---+---+---+---+---+ 

268 | 1 | a | 1 | 1 | 1 | 

269 | 2 | b | 1 | 2 | 1 | 

270 | 3 | c | 2 | 2 | 2 | 

271 | 4 | d | 2 | 2 | 3 | 

272 +---+---+---+---+---+ 

273 ``` 

274 !!! success "Conclusion: Successfully kept first records by the `d` & `e` columns." 

275 </div> 

276 

277 ```{.py .python linenums="1" title="Example 7: Distinct by the `c`, `d` & `e` columns"} 

278 >>> keep_first_record_by_columns(df, ["c", "d", "e"]).show() 

279 ``` 

280 <div class="result" markdown> 

281 ```{.txt .text title="Terminal"} 

282 +---+---+---+---+---+ 

283 | a | b | c | d | e | 

284 +---+---+---+---+---+ 

285 | 1 | a | 1 | 1 | 1 | 

286 | 2 | b | 1 | 2 | 1 | 

287 | 3 | c | 2 | 2 | 2 | 

288 | 4 | d | 2 | 2 | 3 | 

289 +---+---+---+---+---+ 

290 ``` 

291 !!! success "Conclusion: Successfully kept first records by the `c`, `d` & `e` columns." 

292 !!! failure "Conclusion: Failure." 

293 </div> 

294 

295 ```{.py .python linenums="1" title="Example 8: Column missing"} 

296 >>> keep_first_record_by_columns(df, "f") 

297 ``` 

298 <div class="result" markdown> 

299 ```{.txt .text title="Terminal"} 

300 ColumnDoesNotExistError: Column 'f' does not exist in the DataFrame. 

301 Try one of: ["a", "b", "c", "d", "e"] 

302 ``` 

303 !!! failure "Conclusion: Column missing." 

304 </div> 

305 

306 ??? info "Notes" 

307 The way this process will retain only the first record in the given `#!py columns` is by: 

308 

309 1. Add a new column called `RowNum` 

310 1. This `RowNum` column uses the SparkSQL function `#!sql ROW_NUMBER()` 

311 1. The window-function `#!sql OVER` clause will then: 

312 1. `#!sql PARTITION BY` the `#!py columns`, 

313 1. `#!sql ORDER BY` the `#!py columns`. 

314 1. Filter so that `#!sql RowNum=1`. 

315 1. Drop the `#!py RowNum` column. 

316 

317 ??? tip "See Also" 

318 - [`toolbox_pyspark.checks.assert_columns_exists()`][toolbox_pyspark.checks.assert_columns_exists] 

319 """ 

320 columns = [columns] if is_type(columns, str) else columns 

321 assert_columns_exists(dataframe, columns) 

322 return ( 

323 dataframe.withColumn( 

324 colName="RowNum", 

325 col=F.expr( 

326 f""" 

327 ROW_NUMBER() 

328 OVER 

329 ( 

330 PARTITION BY {','.join(columns)} 

331 ORDER BY {','.join(columns)} 

332 ) 

333 """ 

334 ), 

335 ) 

336 .where("RowNum=1") 

337 .drop("RowNum") 

338 ) 

339 

340 

341@typechecked 

342def convert_dataframe( 

343 dataframe: psDataFrame, 

344 return_type: Union[ 

345 LITERAL_PYSPARK_DATAFRAME_NAMES, 

346 LITERAL_PANDAS_DATAFRAME_NAMES, 

347 LITERAL_NUMPY_ARRAY_NAMES, 

348 LITERAL_LIST_OBJECT_NAMES, 

349 str, 

350 ] = "pd", 

351) -> Optional[Union[psDataFrame, pdDataFrame, npArray, list]]: 

352 """ 

353 !!! note "Summary" 

354 Convert a PySpark DataFrame to the desired return type. 

355 

356 ???+ abstract "Details" 

357 This function converts a PySpark DataFrame to one of the supported return types, including: 

358 

359 PySpark DataFrame: 

360 

361 <div class="mdx-four-columns" markdown> 

362 

363 - `#!py "spark.DataFrame"` 

364 - `#!py "pyspark.DataFrame"` 

365 - `#!py "pyspark"` 

366 - `#!py "spark"` 

367 - `#!py "ps.DataFrame"` 

368 - `#!py "ps.df"` 

369 - `#!py "psdf"` 

370 - `#!py "psDataFrame"` 

371 - `#!py "psDF"` 

372 - `#!py "ps"` 

373 

374 </div> 

375 

376 Pandas DataFrame: 

377 

378 <div class="mdx-four-columns" markdown> 

379 

380 - `#!py "pandas.DataFrame"` 

381 - `#!py "pandas"` 

382 - `#!py "pd.DataFrame"` 

383 - `#!py "pd.df"` 

384 - `#!py "pddf"` 

385 - `#!py "pdDataFrame"` 

386 - `#!py "pdDF"` 

387 - `#!py "pd"` 

388 

389 </div> 

390 

391 NumPy array: 

392 

393 <div class="mdx-four-columns" markdown> 

394 

395 - `#!py "numpy.array"` 

396 - `#!py "np.array"` 

397 - `#!py "np"` 

398 - `#!py "numpy"` 

399 - `#!py "nparr"` 

400 - `#!py "npa"` 

401 - `#!py "np.arr"` 

402 - `#!py "np.a"` 

403 

404 </div> 

405 

406 Python list: 

407 

408 <div class="mdx-four-columns" markdown> 

409 

410 - `#!py "list"` 

411 - `#!py "lst"` 

412 - `#!py "l"` 

413 - `#!py "flat_list"` 

414 - `#!py "flatten_list"` 

415 

416 </div> 

417 

418 Params: 

419 dataframe (psDataFrame): 

420 The PySpark DataFrame to be converted. 

421 return_type (Union[LITERAL_LIST_OBJECT_NAMES, LITERAL_PANDAS_DATAFRAME_NAMES, LITERAL_PYSPARK_DATAFRAME_NAMES, LITERAL_NUMPY_ARRAY_NAMES, str], optional): 

422 The desired return type.<br> 

423 Options: 

424 

425 - `#!py "ps"`: Return the PySpark DataFrame. 

426 - `#!py "pd"`: Return a Pandas DataFrame. 

427 - `#!py "np"`: Return a NumPy array. 

428 - `#!py "list"`: Return a Python list. 

429 - `#!py "list_flat"`: Return a flat Python list (1D). 

430 

431 Defaults to `#!py "pd"` (Pandas DataFrame). 

432 

433 Raises: 

434 TypeError: 

435 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

436 ValueError: 

437 If any of the values parsed to `return_type` are not valid options. 

438 

439 Returns: 

440 (Optional[Union[psDataFrame, pdDataFrame, npArray, list]]): 

441 The converted data in the specified return type. 

442 

443 ???+ example "Examples" 

444 

445 ```{.py .python linenums="1" title="Set up"} 

446 >>> # Imports 

447 >>> import pandas as pd 

448 >>> from pyspark.sql import SparkSession 

449 >>> from toolbox_pyspark.cleaning import convert_dataframe 

450 >>> 

451 >>> # Instantiate Spark 

452 >>> spark = SparkSession.builder.getOrCreate() 

453 >>> 

454 >>> # Create data 

455 >>> df = spark.createDataFrame( 

456 ... pdDataFrame( 

457 ... { 

458 ... "a": [1, 2, 3, 4], 

459 ... "b": ["a", "b", "c", "d"], 

460 ... } 

461 ... ) 

462 ... ) 

463 >>> 

464 >>> # Check 

465 >>> df.show() 

466 ``` 

467 <div class="result" markdown> 

468 ```{.txt .text title="Terminal"} 

469 +---+---+ 

470 | a | b | 

471 +---+---+ 

472 | 0 | a | 

473 | 1 | b | 

474 | 2 | c | 

475 | 3 | d | 

476 +---+---+ 

477 ``` 

478 

479 ```{.py .python linenums="1" title="Example 1: Convert to PySpark"} 

480 >>> new_df = convert_dataframe(df, "ps") 

481 >>> print(type(new_df)) 

482 >>> new_df.show() 

483 ``` 

484 <div class="result" markdown> 

485 ```{.txt .text title="Terminal"} 

486 <class 'pyspark.sql.dataframe.DataFrame'> 

487 ``` 

488 ```{.txt .text title="Terminal"} 

489 +---+---+ 

490 | a | b | 

491 +---+---+ 

492 | 0 | a | 

493 | 1 | b | 

494 | 2 | c | 

495 | 3 | d | 

496 +---+---+ 

497 ``` 

498 !!! success "Conclusion: Successfully converted to PySpark." 

499 </div> 

500 

501 ```{.py .python linenums="1" title="Example 2: Convert to Pandas"} 

502 >>> new_df = convert_dataframe(df, "pd") 

503 >>> print(type(new_df)) 

504 >>> print(new_df) 

505 ``` 

506 <div class="result" markdown> 

507 ```{.txt .text title="Terminal"} 

508 <class 'pandas.core.frame.DataFrame'> 

509 ``` 

510 ```{.txt .text title="Terminal"} 

511 a b 

512 0 0 a 

513 1 1 b 

514 2 2 c 

515 3 3 d 

516 ``` 

517 !!! success "Conclusion: Successfully converted to Pandas." 

518 </div> 

519 

520 ```{.py .python linenums="1" title="Example 3: Convert to Numpy"} 

521 >>> new_df = convert_dataframe(df, "np") 

522 >>> print(type(new_df)) 

523 >>> print(new_df) 

524 ``` 

525 <div class="result" markdown> 

526 ```{.txt .text title="Terminal"} 

527 <class 'numpy.ndarray'> 

528 ``` 

529 ```{.txt .text title="Terminal"} 

530 [[0 "a"] 

531 [1 "b"] 

532 [2 "c"] 

533 [3 "d"]] 

534 ``` 

535 !!! success "Conclusion: Successfully converted to Numpy." 

536 </div> 

537 

538 ```{.py .python linenums="1" title="Example 4: List"} 

539 >>> new_df = convert_dataframe(df, "list") 

540 >>> print(type(new_df)) 

541 >>> print(new_df) 

542 ``` 

543 <div class="result" markdown> 

544 ```{.txt .text title="Terminal"} 

545 <class 'list'> 

546 ``` 

547 ```{.txt .text title="Terminal"} 

548 [ 

549 [0, "a"], 

550 [1, "b"], 

551 [2, "c"], 

552 [3, "d"], 

553 ] 

554 ``` 

555 !!! success "Conclusion: Successfully converted to List." 

556 </div> 

557 

558 ```{.py .python linenums="1" title="Example 5: Convert to single column as list"} 

559 >>> new_df = convert_dataframe(df.select("b"), "flat_list") 

560 >>> print(type(new_df)) 

561 >>> print(new_df) 

562 ``` 

563 <div class="result" markdown> 

564 ```{.txt .text title="Terminal"} 

565 <class 'list'> 

566 ``` 

567 ```{.txt .text title="Terminal"} 

568 ["a", "b", "c", "d"] 

569 ``` 

570 !!! success "Conclusion: Successfully converted to flat List." 

571 </div> 

572 

573 ```{.py .python linenums="1" title="Example 6: Invalid return type"} 

574 >>> convert_dataframe(df, "invalid") 

575 ``` 

576 <div class="result" markdown> 

577 ```{.txt .text title="Terminal"} 

578 ValueError: Unknown return type: 'invalid'. 

579 Must be one of: ['pd', 'ps', 'np', 'list']. 

580 For more info, check the `constants` module. 

581 ``` 

582 !!! failure "Conclusion: Invalid return type." 

583 </div> 

584 

585 ??? tip "See Also" 

586 - [`toolbox_pyspark.constants`][toolbox_pyspark.constants] 

587 """ 

588 if return_type in VALID_PYSPARK_DATAFRAME_NAMES: 

589 return dataframe 

590 elif return_type in VALID_PANDAS_DATAFRAME_NAMES: 

591 return dataframe.toPandas() 

592 elif return_type in VALID_NUMPY_ARRAY_NAMES: 

593 return dataframe.toPandas().values # type:ignore 

594 elif return_type in VALID_LIST_OBJECT_NAMES: 

595 if "flat" in return_type: 

596 return flatten(dataframe.toPandas().values.tolist()) # type:ignore 

597 else: 

598 return dataframe.toPandas().values.tolist() # type:ignore 

599 else: 

600 raise ValueError( 

601 f"Unknown return type: '{return_type}'.\n" 

602 f"Must be one of: {['pd', 'ps', 'np', 'list']}.\n" 

603 f"For more info, check the `constants` module." 

604 ) 

605 

606 

607@typechecked 

608def update_nullability( 

609 dataframe: psDataFrame, 

610 columns: Optional[Union[str, str_collection]] = None, 

611 nullable: bool = True, 

612) -> psDataFrame: 

613 """ 

614 !!! note "Summary" 

615 Update the nullability of specified columns in a PySpark DataFrame. 

616 

617 ???+ abstract "Details" 

618 This function updates the nullability of the specified columns in a PySpark DataFrame. If no columns are specified, it updates the nullability of all columns. 

619 

620 Params: 

621 dataframe (psDataFrame): 

622 The input PySpark DataFrame. 

623 columns (Optional[Union[str, str_collection]], optional): 

624 The columns for which to update nullability. If not provided, all columns will be updated.<br> 

625 Defaults to `#!py None`. 

626 nullable (bool, optional): 

627 Whether to set the columns as nullable or not.<br> 

628 Defaults to `#!py True`. 

629 

630 Raises: 

631 TypeError: 

632 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

633 ColumnDoesNotExistError: 

634 If any of the `#!py columns` do not exist within `#!py dataframe.columns`. 

635 

636 Returns: 

637 (psDataFrame): 

638 The updated DataFrame with the specified columns' nullability updated. 

639 

640 ???+ example "Examples" 

641 

642 ```{.py .python linenums="1" title="Set up"} 

643 >>> # Imports 

644 >>> import pandas as pd 

645 >>> from pyspark.sql import SparkSession 

646 >>> from toolbox_pyspark.cleaning import update_nullability 

647 >>> 

648 >>> # Instantiate Spark 

649 >>> spark = SparkSession.builder.getOrCreate() 

650 >>> 

651 >>> # Create data 

652 >>> df = spark.createDataFrame( 

653 ... pd.DataFrame( 

654 ... { 

655 ... "a": [1, 2, 3, 4], 

656 ... "b": ["a", "b", "c", "d"], 

657 ... "c": [1.1, 2.2, 3.3, 4.4], 

658 ... } 

659 ... ) 

660 ... ) 

661 >>> 

662 >>> # Check 

663 >>> df.show() 

664 >>> print(df.schema) 

665 >>> df.printSchema() 

666 ``` 

667 <div class="result" markdown> 

668 ```{.txt .text title="Terminal"} 

669 +---+---+-----+ 

670 | a | b | c | 

671 +---+---+-----+ 

672 | 1 | a | 1.1 | 

673 | 2 | b | 2.2 | 

674 | 3 | c | 3.3 | 

675 | 4 | d | 4.4 | 

676 +---+---+-----+ 

677 ``` 

678 ```{.sh .shell title="Terminal"} 

679 StructType( 

680 [ 

681 StructField("a", LongType(), True), 

682 StructField("b", StringType(), True), 

683 StructField("c", DoubleType(), True), 

684 ] 

685 ) 

686 ``` 

687 ```{.txt .text title="Terminal"} 

688 root 

689 |-- a: long (nullable = true) 

690 |-- b: string (nullable = true) 

691 |-- c: double (nullable = true) 

692 ``` 

693 </div> 

694 

695 ```{.py .python linenums="1" title="Example 1: Update nullability of all columns"} 

696 >>> new_df = update_nullability(df, nullable=False) 

697 >>> new_df.printSchema() 

698 ``` 

699 <div class="result" markdown> 

700 ```{.txt .text title="Terminal"} 

701 root 

702 |-- a: long (nullable = false) 

703 |-- b: string (nullable = false) 

704 |-- c: double (nullable = false) 

705 ``` 

706 !!! success "Conclusion: Successfully updated nullability of all columns." 

707 </div> 

708 

709 ```{.py .python linenums="1" title="Example 2: Update nullability of specific columns"} 

710 >>> new_df = update_nullability(df, columns=["a", "c"], nullable=False) 

711 >>> new_df.printSchema() 

712 ``` 

713 <div class="result" markdown> 

714 ```{.txt .text title="Terminal"} 

715 root 

716 |-- a: long (nullable = false) 

717 |-- b: string (nullable = true) 

718 |-- c: double (nullable = false) 

719 ``` 

720 !!! success "Conclusion: Successfully updated nullability of specific columns." 

721 </div> 

722 

723 ```{.py .python linenums="1" title="Example 3: Column does not exist"} 

724 >>> update_nullability(df, columns="d", nullable=False) 

725 ``` 

726 <div class="result" markdown> 

727 ```{.txt .text title="Terminal"} 

728 ColumnDoesNotExistError: Column 'd' does not exist in the DataFrame. 

729 Try one of: ["a", "b", "c"] 

730 ``` 

731 !!! failure "Conclusion: Column does not exist." 

732 </div> 

733 

734 ??? success "Credit" 

735 All credit goes to: https://stackoverflow.com/questions/46072411/can-i-change-the-nullability-of-a-column-in-my-spark-dataframe#answer-51821437. 

736 

737 ??? tip "See Also" 

738 - [`toolbox_pyspark.checks.assert_columns_exists()`][toolbox_pyspark.checks.assert_columns_exists] 

739 """ 

740 columns = get_columns(dataframe, columns) 

741 assert_columns_exists(dataframe=dataframe, columns=columns) 

742 schema: T.StructType = dataframe.schema 

743 for struct_field in schema: 

744 if struct_field.name in columns: 

745 struct_field.nullable = nullable 

746 return dataframe.sparkSession.createDataFrame(data=dataframe.rdd, schema=dataframe.schema) 

747 

748 

749# ---------------------------------------------------------------------------- # 

750# Trimming #### 

751# ---------------------------------------------------------------------------- # 

752 

753 

754@typechecked 

755def trim_spaces_from_column( 

756 dataframe: psDataFrame, 

757 column: str, 

758) -> psDataFrame: 

759 """ 

760 !!! note "Summary" 

761 For a given list of columns, trim all of the excess white spaces from them. 

762 

763 Params: 

764 dataframe (psDataFrame): 

765 The DataFrame to update. 

766 column (str): 

767 The column to clean. 

768 

769 Raises: 

770 TypeError: 

771 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

772 ColumnDoesNotExistError: 

773 If the `#!py column` does not exist within `#!py dataframe.columns`. 

774 

775 Returns: 

776 (psDataFrame): 

777 The updated Data Frame. 

778 

779 ???+ example "Examples" 

780 

781 ```{.py .python linenums="1" title="Set up"} 

782 >>> # Imports 

783 >>> import pandas as pd 

784 >>> from pyspark.sql import SparkSession 

785 >>> from toolbox_pyspark.cleaning import trim_spaces_from_column 

786 >>> 

787 >>> # Instantiate Spark 

788 >>> spark = SparkSession.builder.getOrCreate() 

789 >>> 

790 >>> # Create data 

791 >>> df = spark.createDataFrame( 

792 ... pd.DataFrame( 

793 ... { 

794 ... "a": [1, 2, 3, 4], 

795 ... "b": ["a", "b", "c", "d"], 

796 ... "c": ["1 ", "1 ", "1 ", "1 "], 

797 ... "d": [" 2", " 2", " 2", " 2"], 

798 ... "e": [" 3 ", " 3 ", " 3 ", " 3 "], 

799 ... } 

800 ... ) 

801 ... ) 

802 >>> 

803 >>> # Check 

804 ```{.py .python linenums="1" title="Check"} 

805 >>> df.show() 

806 ``` 

807 <div class="result" markdown> 

808 ```{.txt .text title="Terminal"} 

809 +---+---+------+------+---------+ 

810 | a | b | c | d | e | 

811 +---+---+------+------+---------+ 

812 | 1 | a | 1 | 2 | 3 | 

813 | 2 | b | 1 | 2 | 3 | 

814 | 3 | c | 1 | 2 | 3 | 

815 | 4 | d | 1 | 2 | 3 | 

816 +---+---+------+------+---------+ 

817 ``` 

818 </div> 

819 

820 ```{.py .python linenums="1" title="Example 1: Trim column"} 

821 >>> trim_spaces_from_column(df, "c").show() 

822 ``` 

823 <div class="result" markdown> 

824 ```{.txt .text title="Terminal"} 

825 +---+---+---+------+--------+ 

826 | a | b | c | d | e | 

827 +---+---+---+------+--------+ 

828 | 1 | a | 1 | 2 | 2 | 

829 | 2 | b | 1 | 2 | 2 | 

830 | 3 | c | 1 | 2 | 2 | 

831 | 4 | d | 1 | 2 | 2 | 

832 +---+---+---+------+--------+ 

833 ``` 

834 !!! success "Conclusion: Successfully trimmed the `c` column." 

835 </div> 

836 

837 ```{.py .python linenums="1" title="Example 2: Invalid column"} 

838 >>> trim_spaces_from_column(df, "f") 

839 ``` 

840 <div class="result" markdown> 

841 ```{.txt .text title="Terminal"} 

842 ColumnDoesNotExistError: Column 'f' does not exist in the DataFrame. 

843 Try one of: ["a", "b", "c", "d", "e"] 

844 ``` 

845 !!! failure "Conclusion: Column does not exist." 

846 </div> 

847 

848 ??? info "Notes" 

849 

850 ???+ info "Justification" 

851 - The main reason for this function is because when the data was exported from the Legacy WMS's, there's a _whole bunch_ of trailing spaces in the data fields. My theory is because of the data type in the source system. That is, if it's originally stored as 'char' type, then it will maintain the data length. This issues doesn't seem to be affecting the `varchar` fields. Nonetheless, this function will strip the white spaces from the data; thus reducing the total size of the data stored therein. 

852 - The reason why it is necessary to write this out as a custom function, instead of using the [`F.trim()`][trim] function from the PySpark library directly is due to the deficiencies of the Java [`trim()`](https://docs.oracle.com/javase/8/docs/api/java/lang/String.html#trim) function. More specifically, there are 13 different whitespace characters available in our ascii character set. The Java function only cleans about 6 of these. So therefore, we define this function which iterates through all 13 whitespace characters, and formats them in to a regular expression, to then parse it to the [`F.regexp_replace()`][regexp_replace] function to be replaced with an empty string (`""`). Therefore, all 13 characters will be replaced, the strings will be cleaned and trimmed ready for further processing. 

853 

854 [trim]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.trim.html 

855 [regexp_replace]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.regexp_replace.html 

856 

857 ???+ info "Regex definition: `^[...]+|[...]+$`" 

858 - 1st Alternative: '^[...]+' 

859 - '^' asserts position at start of a line 

860 - Match a single character present in the list below '[...]' 

861 - '+' matches the previous token between one and unlimited times, as many times as possible, giving back as needed (greedy) 

862 - matches a single character in the list ' ' (case sensitive) 

863 - matches the character ' ' with index 160 (A0 or 240) literally (case sensitive) 

864 - matches the character ' ' with index 32 (20 or 40) literally (case sensitive) 

865 - ... (repeat for all whitespace characters) 

866 - 2nd Alternative: '[...]+$' 

867 - Match a single character present in the list below '[...]' 

868 - '+' matches the previous token between one and unlimited times, as many times as possible, giving back as needed (greedy) 

869 - matches a single character in the list ' ' (case sensitive) 

870 - matches the character ' ' with index 160 (A0 or 240) literally (case sensitive) 

871 - matches the character ' ' with index 32 (20 or 40) literally (case sensitive) 

872 - ... (repeat for all whitespace characters) 

873 - '$' asserts position at the end of a line 

874 

875 ??? tip "See Also" 

876 - [`trim_spaces_from_columns()`][toolbox_pyspark.cleaning.trim_spaces_from_columns] 

877 - [`ALL_WHITESPACE_CHARACTERS`][toolbox_pyspark.constants.ALL_WHITESPACE_CHARACTERS] 

878 """ 

879 assert_column_exists(dataframe=dataframe, column=column, match_case=True) 

880 space_chars: str_list = [chr(char.ascii) for char in WHITESPACES] 

881 regexp: str = f"^[{''.join(space_chars)}]+|[{''.join(space_chars)}]+$" 

882 return dataframe.withColumn(column, F.regexp_replace(column, regexp, "")) 

883 

884 

885@typechecked 

886def trim_spaces_from_columns( 

887 dataframe: psDataFrame, 

888 columns: Optional[Union[str, str_collection]] = None, 

889) -> psDataFrame: 

890 """ 

891 !!! note "Summary" 

892 For a given list of columns, trim all of the excess white spaces from them. 

893 

894 Params: 

895 dataframe (psDataFrame): 

896 The DataFrame to be updated. 

897 columns (Optional[Union[str, str_collection]], optional): 

898 The list of columns to be updated. 

899 Must be valid columns on `dataframe`. 

900 If given as a string, will be executed as a single column (ie. one-element long list). 

901 If not given, will apply to all columns in `dataframe` which have the data-type `string`. 

902 It is also possible to parse the values `#!py "all"` or `#!py "all_string"`, which will also apply this function to all columns in `dataframe` which have the data-type `string`.<br> 

903 Defaults to `#!py None`. 

904 

905 Returns: 

906 (psDataFrame): 

907 The updated DataFrame. 

908 

909 ???+ example "Examples" 

910 

911 ```{.py .python linenums="1" title="Set up"} 

912 >>> # Imports 

913 >>> import pandas as pd 

914 >>> from pyspark.sql import SparkSession 

915 >>> from toolbox_pyspark.cleaning import trim_spaces_from_columns 

916 >>> 

917 >>> # Instantiate Spark 

918 >>> spark = SparkSession.builder.getOrCreate() 

919 >>> 

920 >>> # Create data 

921 >>> df = spark.createDataFrame( 

922 ... pd.DataFrame( 

923 ... { 

924 ... "a": [1, 2, 3, 4], 

925 ... "b": ["a", "b", "c", "d"], 

926 ... "c": ["1 ", "1 ", "1 ", "1 "], 

927 ... "d": [" 2", " 2", " 2", " 2"], 

928 ... "e": [" 3 ", " 3 ", " 3 ", " 3 "], 

929 ... } 

930 ... ) 

931 ... ) 

932 >>> 

933 >>> # Check 

934 ```{.py .python linenums="1" title="Check"} 

935 >>> df.show() 

936 ``` 

937 <div class="result" markdown> 

938 ```{.txt .text title="Terminal"} 

939 +---+---+------+------+---------+ 

940 | a | b | c | d | e | 

941 +---+---+------+------+---------+ 

942 | 1 | a | 1 | 2 | 3 | 

943 | 2 | b | 1 | 2 | 3 | 

944 | 3 | c | 1 | 2 | 3 | 

945 | 4 | d | 1 | 2 | 3 | 

946 +---+---+------+------+---------+ 

947 ``` 

948 </div> 

949 

950 ```{.py .python linenums="1" title="Example 1: One column as list"} 

951 >>> trim_spaces_from_columns(df, ["c"]).show() 

952 ``` 

953 <div class="result" markdown> 

954 ```{.txt .text title="Terminal"} 

955 +---+---+---+------+---------+ 

956 | a | b | c | d | e | 

957 +---+---+---+------+---------+ 

958 | 1 | a | 1 | 2 | 3 | 

959 | 2 | b | 1 | 2 | 3 | 

960 | 3 | c | 1 | 2 | 3 | 

961 | 4 | d | 1 | 2 | 3 | 

962 +---+---+---+------+---------+ 

963 ``` 

964 !!! success "Conclusion: Successfully trimmed the `c` column." 

965 </div> 

966 

967 ```{.py .python linenums="1" title="Example 2: Single column as string"} 

968 >>> trim_spaces_from_columns(df, "d").show() 

969 ``` 

970 <div class="result" markdown> 

971 ```{.txt .text title="Terminal"} 

972 +---+---+------+---+---------+ 

973 | a | b | c | d | e | 

974 +---+---+------+---+---------+ 

975 | 1 | a | 1 | 2 | 3 | 

976 | 2 | b | 1 | 2 | 3 | 

977 | 3 | c | 1 | 2 | 3 | 

978 | 4 | d | 1 | 2 | 3 | 

979 +---+---+------+---+---------+ 

980 ``` 

981 !!! success "Conclusion: Successfully trimmed the `d` column." 

982 </div> 

983 

984 ```{.py .python linenums="1" title="Example 3: Multiple columns"} 

985 >>> trim_spaces_from_columns(df, ["c", "d"]).show() 

986 ``` 

987 <div class="result" markdown> 

988 ```{.txt .text title="Terminal"} 

989 +---+---+---+---+---------+ 

990 | a | b | c | d | e | 

991 +---+---+---+---+---------+ 

992 | 1 | a | 1 | 2 | 3 | 

993 | 2 | b | 1 | 2 | 3 | 

994 | 3 | c | 1 | 2 | 3 | 

995 | 4 | d | 1 | 2 | 3 | 

996 +---+---+---+---+---------+ 

997 ``` 

998 !!! success "Conclusion: Successfully trimmed the `c` and `d` columns." 

999 </div> 

1000 

1001 ```{.py .python linenums="1" title="Example 4: All columns"} 

1002 >>> trim_spaces_from_columns(df, "all").show() 

1003 ``` 

1004 <div class="result" markdown> 

1005 ```{.txt .text title="Terminal"} 

1006 +---+---+---+---+---+ 

1007 | a | b | c | d | e | 

1008 +---+---+---+---+---+ 

1009 | 1 | a | 1 | 2 | 3 | 

1010 | 2 | b | 1 | 2 | 3 | 

1011 | 3 | c | 1 | 2 | 3 | 

1012 | 4 | d | 1 | 2 | 3 | 

1013 +---+---+---+---+---+ 

1014 ``` 

1015 !!! success "Conclusion: Successfully trimmed all columns." 

1016 </div> 

1017 

1018 ```{.py .python linenums="1" title="Example 5: Default config"} 

1019 >>> trim_spaces_from_columns(df).show() 

1020 ``` 

1021 <div class="result" markdown> 

1022 ```{.txt .text title="Terminal"} 

1023 +---+---+---+---+---+ 

1024 | a | b | c | d | e | 

1025 +---+---+---+---+---+ 

1026 | 1 | a | 1 | 2 | 3 | 

1027 | 2 | b | 1 | 2 | 3 | 

1028 | 3 | c | 1 | 2 | 3 | 

1029 | 4 | d | 1 | 2 | 3 | 

1030 +---+---+---+---+---+ 

1031 ``` 

1032 !!! success "Conclusion: Successfully trimmed all columns." 

1033 </div> 

1034 

1035 ```{.py .python linenums="1" title="Example 6: Invalid column"} 

1036 >>> trim_spaces_from_columns(df, ["f"]) 

1037 ``` 

1038 <div class="result" markdown> 

1039 ```{.txt .text title="Terminal"} 

1040 ColumnDoesNotExistError: Columns ['f'] do not exist in the DataFrame. 

1041 Try one of: ["a", "b", "c", "d", "e"] 

1042 ``` 

1043 !!! failure "Conclusion: Columns do not exist." 

1044 </div> 

1045 

1046 ???+ info "Notes" 

1047 

1048 ???+ info "Justification" 

1049 - The main reason for this function is because when the data was exported from the Legacy WMS's, there's a _whole bunch_ of trailing spaces in the data fields. My theory is because of the data type in the source system. That is, if it's originally stored as 'char' type, then it will maintain the data length. This issues doesn't seem to be affecting the `varchar` fields. Nonetheless, this function will strip the white spaces from the data; thus reducing the total size of the data stored therein. 

1050 - The reason why it is necessary to write this out as a custom function, instead of using the [`F.trim()`][trim] function from the PySpark library directly is due to the deficiencies of the Java [`trim()`](https://docs.oracle.com/javase/8/docs/api/java/lang/String.html#trim) function. More specifically, there are 13 different whitespace characters available in our ascii character set. The Java function only cleans about 6 of these. So therefore, we define this function which iterates through all 13 whitespace characters, and formats them in to a regular expression, to then parse it to the [`F.regexp_replace()`][regexp_replace] function to be replaced with an empty string (`""`). Therefore, all 13 characters will be replaced, the strings will be cleaned and trimmed ready for further processing. 

1051 - The reason why this function exists as a standalone, and does not call [`trim_spaces_from_column()`][toolbox_pyspark.cleaning.trim_spaces_from_column] from within a loop is because [`trim_spaces_from_column()`][toolbox_pyspark.cleaning.trim_spaces_from_column] utilises the [`.withColumn()`][withColumn] method to implement the [`F.regexp_replace()`][regexp_replace] function on columns individually. When implemented iteratively, this process will create huge DAG's for the RDD, and blow out the complexity to a huge extend. Whereas this [`trim_spaces_from_columns()`][toolbox_pyspark.cleaning.trim_spaces_from_columns] function will utilise the [`.withColumns()`][withColumns] method to implement the [`F.regexp_replace()`][regexp_replace] function over all columns at once. This [`.withColumns()`][withColumns] method projects the function down to the underlying dataset in one single execution; not a different execution per column. Therefore, it is more simpler and more efficient. 

1052 

1053 [withColumn]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumn.html 

1054 [withColumns]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumns.html 

1055 [trim]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.trim.html 

1056 [regexp_replace]: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.regexp_replace.html 

1057 

1058 ???+ info "Regex definition: `^[...]+|[...]+$`" 

1059 - 1st Alternative: `^[...]+` 

1060 - `^` asserts position at start of a line 

1061 - Match a single character present in the list below `[...]` 

1062 - `+` matches the previous token between one and unlimited times, as many times as possible, giving back as needed (greedy) 

1063 - matches a single character in the list ` ` (case sensitive) 

1064 - matches the character ` ` with index 160 (A0 or 240) literally (case sensitive) 

1065 - matches the character ` ` with index 32 (20 or 40) literally (case sensitive) 

1066 - ... (repeat for all whitespace characters) 

1067 - 2nd Alternative: `[...]+$` 

1068 - Match a single character present in the list below `[...]` 

1069 - `+` matches the previous token between one and unlimited times, as many times as possible, giving back as needed (greedy) 

1070 - matches a single character in the list ` ` (case sensitive) 

1071 - matches the character ` ` with index 160 (A0 or 240) literally (case sensitive) 

1072 - matches the character ` ` with index 32 (20 or 40) literally (case sensitive) 

1073 - ... (repeat for all whitespace characters) 

1074 

1075 ??? tip "See Also" 

1076 - [`trim_spaces_from_column()`][toolbox_pyspark.cleaning.trim_spaces_from_column] 

1077 - [`ALL_WHITESPACE_CHARACTERS`][toolbox_pyspark.constants.ALL_WHITESPACE_CHARACTERS] 

1078 """ 

1079 columns = get_columns(dataframe, columns) 

1080 assert_columns_exists(dataframe=dataframe, columns=columns, match_case=True) 

1081 space_chars: str_list = WHITESPACES.to_list("chr") # type:ignore 

1082 regexp: str = f"^[{''.join(space_chars)}]+|[{''.join(space_chars)}]+$" 

1083 cols_exprs: dict[str, Column] = {col: F.regexp_replace(col, regexp, "") for col in columns} 

1084 return dataframe.withColumns(cols_exprs) 

1085 

1086 

1087# ---------------------------------------------------------------------------- # 

1088# Applying functions #### 

1089# ---------------------------------------------------------------------------- # 

1090 

1091 

1092@typechecked 

1093def apply_function_to_column( 

1094 dataframe: psDataFrame, 

1095 column: str, 

1096 function: str = "upper", 

1097 *function_args, 

1098 **function_kwargs, 

1099) -> psDataFrame: 

1100 """ 

1101 !!! note "Summary" 

1102 Apply a given PySpark `function` to a single `column` on `dataframe`. 

1103 

1104 ???+ abstract "Details" 

1105 Under the hood, this function will simply call the [`.withColumn()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumn.html) method to apply the function named in `function` from the PySpark [`functions`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html) module. 

1106 ```py 

1107 return dataframe.withColumn(column, getattr(F, function)(column, *function_args, **function_kwargs)) 

1108 ``` 

1109 

1110 Params: 

1111 dataframe (psDataFrame): 

1112 The DataFrame to update. 

1113 column (str): 

1114 The column to update. 

1115 function (str, optional): 

1116 The function to execute. Must be a valid function from the PySpark [`functions`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html) module.<br> 

1117 Defaults to `#!py "upper"`. 

1118 *function_args (Any, optional): 

1119 The arguments to push down to the underlying `function`. 

1120 **function_kwargs (Any, optional): 

1121 The keyword arguments to push down to the underlying `function`. 

1122 

1123 Returns: 

1124 (psDataFrame): 

1125 The updated DataFrame. 

1126 

1127 ???+ example "Examples" 

1128 

1129 ```{.py .python linenums="1" title="Set up"} 

1130 >>> # Imports 

1131 >>> import pandas as pd 

1132 >>> from pyspark.sql import SparkSession 

1133 >>> from toolbox_pyspark.cleaning import apply_function_to_column 

1134 >>> 

1135 >>> # Instantiate Spark 

1136 >>> spark = SparkSession.builder.getOrCreate() 

1137 >>> 

1138 >>> # Create data 

1139 >>> df = spark.createDataFrame( 

1140 ... pd.DataFrame( 

1141 ... { 

1142 ... "a": [0, 1, 2, 3], 

1143 ... "b": ["a", "b", "c", "d"], 

1144 ... "c": ["c", "c", "c", "c"], 

1145 ... "d": ["d", "d", "d", "d"], 

1146 ... } 

1147 ... ) 

1148 ... ) 

1149 >>> 

1150 >>> # Check 

1151 ```{.py .python linenums="1" title="Check"} 

1152 >>> df.show() 

1153 ``` 

1154 <div class="result" markdown> 

1155 ```{.txt .text title="Terminal"} 

1156 +---+---+---+---+ 

1157 | a | b | c | d | 

1158 +---+---+---+---+ 

1159 | 0 | a | c | d | 

1160 | 1 | b | c | d | 

1161 | 2 | c | c | d | 

1162 | 3 | d | c | d | 

1163 +---+---+---+---+ 

1164 ``` 

1165 </div> 

1166 

1167 ```{.py .python linenums="1" title="Example 1: Default params"} 

1168 >>> apply_function_to_column(df, "c").show() 

1169 ``` 

1170 <div class="result" markdown> 

1171 ```{.txt .text title="Terminal"} 

1172 +---+---+---+---+ 

1173 | a | b | c | d | 

1174 +---+---+---+---+ 

1175 | 0 | a | C | d | 

1176 | 1 | b | C | d | 

1177 | 2 | c | C | d | 

1178 | 3 | d | C | d | 

1179 +---+---+---+---+ 

1180 ``` 

1181 !!! success "Conclusion: Successfully applied the `upper` function to the `c` column." 

1182 </div> 

1183 

1184 ```{.py .python linenums="1" title="Example 2: Simple function"} 

1185 >>> apply_function_to_column(df, "c", "lower").show() 

1186 ``` 

1187 <div class="result" markdown> 

1188 ```{.txt .text title="Terminal"} 

1189 +---+---+---+---+ 

1190 | a | b | c | d | 

1191 +---+---+---+---+ 

1192 | 0 | a | c | d | 

1193 | 1 | b | c | d | 

1194 | 2 | c | c | d | 

1195 | 3 | d | c | d | 

1196 +---+---+---+---+ 

1197 ``` 

1198 !!! success "Conclusion: Successfully applied the `lower` function to the `c` column." 

1199 </div> 

1200 

1201 ```{.py .python linenums="1" title="Example 3: Complex function, using args"} 

1202 >>> apply_function_to_column(df, "d", "lpad", 5, "?").show() 

1203 ``` 

1204 <div class="result" markdown> 

1205 ```{.txt .text title="Terminal"} 

1206 +---+---+---+-------+ 

1207 | a | b | c | d | 

1208 +---+---+---+-------+ 

1209 | 0 | a | c | ????d | 

1210 | 1 | b | c | ????d | 

1211 | 2 | c | c | ????d | 

1212 | 3 | d | c | ????d | 

1213 +---+---+---+-------+ 

1214 ``` 

1215 !!! success "Conclusion: Successfully applied the `lpad` function to the `d` column." 

1216 </div> 

1217 

1218 ```{.py .python linenums="1" title="Example 4: Complex function, using kwargs"} 

1219 >>> new_df = apply_function_to_column( 

1220 ... dataframe=df, 

1221 ... column="d", 

1222 ... function="lpad", 

1223 ... len=5, 

1224 ... pad="?", 

1225 ... ).show() 

1226 ``` 

1227 <div class="result" markdown> 

1228 ```{.txt .text title="Terminal"} 

1229 +---+---+---+-------+ 

1230 | a | b | c | d | 

1231 +---+---+---+-------+ 

1232 | 0 | a | c | ????d | 

1233 | 1 | b | c | ????d | 

1234 | 2 | c | c | ????d | 

1235 | 3 | d | c | ????d | 

1236 +---+---+---+-------+ 

1237 ``` 

1238 !!! success "Conclusion: Successfully applied the `lpad` function to the `d` column." 

1239 </div> 

1240 

1241 ```{.py .python linenums="1" title="Example 5: Different complex function, using kwargs"} 

1242 >>> new_df = apply_function_to_column( 

1243 ... dataframe=df, 

1244 ... column="b", 

1245 ... function="regexp_replace", 

1246 ... pattern="c", 

1247 ... replacement="17", 

1248 ... ).show() 

1249 ``` 

1250 <div class="result" markdown> 

1251 ```{.txt .text title="Terminal"} 

1252 +---+----+---+---+ 

1253 | a | b | c | d | 

1254 +---+----+---+---+ 

1255 | 0 | a | c | d | 

1256 | 1 | b | c | d | 

1257 | 2 | 17 | c | d | 

1258 | 3 | d | c | d | 

1259 +---+----+---+---+ 

1260 ``` 

1261 !!! success "Conclusion: Successfully applied the `regexp_replace` function to the `b` column." 

1262 </div> 

1263 

1264 ```{.py .python linenums="1" title="Example 6: Part of pipe"} 

1265 >>> new_df = df.transform( 

1266 ... func=apply_function_to_column, 

1267 ... column="d", 

1268 ... function="lpad", 

1269 ... len=5, 

1270 ... pad="?", 

1271 ... ).show() 

1272 ``` 

1273 <div class="result" markdown> 

1274 ```{.txt .text title="Terminal"} 

1275 +---+---+---+-------+ 

1276 | a | b | c | d | 

1277 +---+---+---+-------+ 

1278 | 0 | a | c | ????d | 

1279 | 1 | b | c | ????d | 

1280 | 2 | c | c | ????d | 

1281 | 3 | d | c | ????d | 

1282 +---+---+---+-------+ 

1283 ``` 

1284 !!! success "Conclusion: Successfully applied the `lpad` function to the `d` column." 

1285 </div> 

1286 

1287 ```{.py .python linenums="1" title="Example 7: Column name in different case"} 

1288 >>> new_df = df.transform( 

1289 ... func=apply_function_to_column, 

1290 ... column="D", 

1291 ... function="upper", 

1292 ... ).show() 

1293 ``` 

1294 <div class="result" markdown> 

1295 ```{.txt .text title="Terminal"} 

1296 +---+---+---+---+ 

1297 | a | b | c | d | 

1298 +---+---+---+---+ 

1299 | 0 | a | c | D | 

1300 | 1 | b | c | D | 

1301 | 2 | c | c | D | 

1302 | 3 | d | c | D | 

1303 +---+---+---+---+ 

1304 ``` 

1305 !!! success "Conclusion: Successfully applied the `upper` function to the `D` column." 

1306 </div> 

1307 

1308 ```{.py .python linenums="1" title="Example 8: Invalid column"} 

1309 >>> apply_function_to_column(df, "f") 

1310 ``` 

1311 <div class="result" markdown> 

1312 ```{.txt .text title="Terminal"} 

1313 ColumnDoesNotExistError: Column 'f' does not exist in the DataFrame. 

1314 Try one of: ["a", "b", "c", "d"] 

1315 ``` 

1316 !!! failure "Conclusion: Column does not exist." 

1317 </div> 

1318 

1319 ??? info "Notes" 

1320 - We have to name the `function` parameter as the full name because when this function is executed as part of a chain (by using the PySpark [`.transform()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.transform.html) method), that one uses the `func` parameter. 

1321 

1322 ??? tip "See Also" 

1323 - [`apply_function_to_columns()`][toolbox_pyspark.cleaning.apply_function_to_columns] 

1324 """ 

1325 assert_column_exists(dataframe, column, False) 

1326 return dataframe.withColumn( 

1327 colName=column, 

1328 col=getattr(F, function)(column, *function_args, **function_kwargs), 

1329 ) 

1330 

1331 

1332@typechecked 

1333def apply_function_to_columns( 

1334 dataframe: psDataFrame, 

1335 columns: Union[str, str_collection], 

1336 function: str = "upper", 

1337 *function_args, 

1338 **function_kwargs, 

1339) -> psDataFrame: 

1340 """ 

1341 !!! note "Summary" 

1342 Apply a given PySpark `function` over multiple `columns` on a given `dataframe`. 

1343 

1344 ???+ abstract "Details" 

1345 Under the hood, this function will simply call the [`.withColumns()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumns.html) method to apply the function named in `function` from the PySpark [`functions`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html) module. 

1346 ```py 

1347 return dataframe.withColumns( 

1348 {column: getattr(F, function)(column, *args, **kwargs) for column in columns} 

1349 ) 

1350 ``` 

1351 

1352 Params: 

1353 dataframe (psDataFrame): 

1354 The DataFrame to update. 

1355 columns (Union[str, str_collection]): 

1356 The columns to update. 

1357 function (str, optional): 

1358 The function to use.<br> 

1359 Defaults to `#!py "upper"`. 

1360 

1361 Returns: 

1362 (psDataFrame): 

1363 The updated DataFrame. 

1364 

1365 ???+ example "Examples" 

1366 

1367 ```{.py .python linenums="1" title="Set up"} 

1368 >>> # Imports 

1369 >>> import pandas as pd 

1370 >>> from pyspark.sql import SparkSession 

1371 >>> from toolbox_pyspark.cleaning import apply_function_to_columns 

1372 >>> 

1373 >>> # Instantiate Spark 

1374 >>> spark = SparkSession.builder.getOrCreate() 

1375 >>> 

1376 >>> # Create data 

1377 >>> df = spark.createDataFrame( 

1378 ... pd.DataFrame( 

1379 ... { 

1380 ... "a": [0, 1, 2, 3], 

1381 ... "b": ["a", "b", "c", "d"], 

1382 ... "c": ["c", "c", "c", "c"], 

1383 ... "d": ["d", "d", "d", "d"], 

1384 ... } 

1385 ... ) 

1386 ... ) 

1387 >>> 

1388 >>> # Check 

1389 ```{.py .python linenums="1" title="Check"} 

1390 >>> df.show() 

1391 ``` 

1392 <div class="result" markdown> 

1393 ```{.txt .text title="Terminal"} 

1394 +---+---+---+---+ 

1395 | a | b | c | d | 

1396 +---+---+---+---+ 

1397 | 0 | a | c | d | 

1398 | 1 | b | c | d | 

1399 | 2 | c | c | d | 

1400 | 3 | d | c | d | 

1401 +---+---+---+---+ 

1402 ``` 

1403 </div> 

1404 

1405 ```{.py .python linenums="1" title="Example 1: Default params"} 

1406 >>> apply_function_to_columns(df, ["b", "c"]).show() 

1407 ``` 

1408 <div class="result" markdown> 

1409 ```{.txt .text title="Terminal"} 

1410 +---+---+---+---+ 

1411 | a | b | c | d | 

1412 +---+---+---+---+ 

1413 | 0 | A | C | d | 

1414 | 1 | B | C | d | 

1415 | 2 | C | C | d | 

1416 | 3 | D | C | d | 

1417 +---+---+---+---+ 

1418 ``` 

1419 !!! success "Conclusion: Successfully applied the `upper` function to the `b` and `c` columns." 

1420 </div> 

1421 

1422 ```{.py .python linenums="1" title="Example 2: Simple function"} 

1423 >>> apply_function_to_columns(df, ["b", "c"], "lower").show() 

1424 ``` 

1425 <div class="result" markdown> 

1426 ```{.txt .text title="Terminal"} 

1427 +---+---+---+---+ 

1428 | a | b | c | d | 

1429 +---+---+---+---+ 

1430 | 0 | a | c | d | 

1431 | 1 | b | c | d | 

1432 | 2 | c | c | d | 

1433 | 3 | d | c | d | 

1434 +---+---+---+---+ 

1435 ``` 

1436 !!! success "Conclusion: Successfully applied the `lower` function to the `b` and `c` columns." 

1437 </div> 

1438 

1439 ```{.py .python linenums="1" title="Example 3: Complex function, with args"} 

1440 >>> apply_function_to_columns(df, ["b", "c", "d"], "lpad", 5, "?").show() 

1441 ``` 

1442 <div class="result" markdown> 

1443 ```{.txt .text title="Terminal"} 

1444 +---+-------+-------+-------+ 

1445 | a | b | c | d | 

1446 +---+-------+-------+-------+ 

1447 | 0 | ????a | ????c | ????d | 

1448 | 1 | ????b | ????c | ????d | 

1449 | 2 | ????c | ????c | ????d | 

1450 | 3 | ????d | ????c | ????d | 

1451 +---+-------+-------+-------+ 

1452 ``` 

1453 !!! success "Conclusion: Successfully applied the `lpad` function to the `b`, `c` and `d` columns." 

1454 </div> 

1455 

1456 ```{.py .python linenums="1" title="Example 4: Complex function, with kwargs"} 

1457 >>> apply_function_to_columns( 

1458 ... dataframe=df, 

1459 ... columns=["b", "c", "d"], 

1460 ... function="lpad", 

1461 ... len=5, 

1462 ... pad="?", 

1463 ... ).show() 

1464 ``` 

1465 <div class="result" markdown> 

1466 ```{.txt .text title="Terminal"} 

1467 +---+-------+-------+-------+ 

1468 | a | b | c | d | 

1469 +---+-------+-------+-------+ 

1470 | 0 | ????a | ????c | ????d | 

1471 | 1 | ????b | ????c | ????d | 

1472 | 2 | ????c | ????c | ????d | 

1473 | 3 | ????d | ????c | ????d | 

1474 +---+-------+-------+-------+ 

1475 ``` 

1476 !!! success "Conclusion: Successfully applied the `lpad` function to the `b`, `c` and `d` columns." 

1477 </div> 

1478 

1479 ```{.py .python linenums="1" title="Example 5: Different complex function, with kwargs"} 

1480 >>> apply_function_to_columns( 

1481 ... dataframe=df, 

1482 ... columns=["b", "c", "d"], 

1483 ... function="regexp_replace", 

1484 ... pattern="c", 

1485 ... replacement="17", 

1486 ... ).show() 

1487 ``` 

1488 <div class="result" markdown> 

1489 ```{.txt .text title="Terminal"} 

1490 +---+----+----+---+ 

1491 | a | b | c | d | 

1492 +---+----+----+---+ 

1493 | 0 | a | 17 | d | 

1494 | 1 | b | 17 | d | 

1495 | 2 | 17 | 17 | d | 

1496 | 3 | d | 17 | d | 

1497 +---+----+----+---+ 

1498 ``` 

1499 !!! success "Conclusion: Successfully applied the `regexp_replace` function to the `b`, `c` and `d` columns." 

1500 </div> 

1501 

1502 ```{.py .python linenums="1" title="Example 6: Part of pipe"} 

1503 >>> df.transform( 

1504 ... func=apply_function_to_columns, 

1505 ... columns=["b", "c", "d"], 

1506 ... function="lpad", 

1507 ... len=5, 

1508 ... pad="?", 

1509 ... ).show() 

1510 ``` 

1511 <div class="result" markdown> 

1512 ```{.txt .text title="Terminal"} 

1513 +---+-------+-------+-------+ 

1514 | a | b | c | d | 

1515 +---+-------+-------+-------+ 

1516 | 0 | ????a | ????c | ????d | 

1517 | 1 | ????b | ????c | ????d | 

1518 | 2 | ????c | ????c | ????d | 

1519 | 3 | ????d | ????c | ????d | 

1520 +---+-------+-------+-------+ 

1521 ``` 

1522 !!! success "Conclusion: Successfully applied the `lpad` function to the `b`, `c` and `d` columns." 

1523 </div> 

1524 

1525 ```{.py .python linenums="1" title="Example 7: Column name in different case"} 

1526 >>> apply_function_to_columns( 

1527 ... dataframe=df, 

1528 ... columns=["B", "c", "D"], 

1529 ... function="upper", 

1530 ... ).show() 

1531 ``` 

1532 <div class="result" markdown> 

1533 ```{.txt .text title="Terminal"} 

1534 +---+---+---+---+ 

1535 | a | b | c | d | 

1536 +---+---+---+---+ 

1537 | 0 | A | C | D | 

1538 | 1 | B | C | D | 

1539 | 2 | C | C | D | 

1540 | 3 | D | C | D | 

1541 +---+---+---+---+ 

1542 ``` 

1543 !!! success "Conclusion: Successfully applied the `upper` function to the `B`, `c` and `D` columns." 

1544 </div> 

1545 

1546 ```{.py .python linenums="1" title="Example 8: Invalid columns"} 

1547 >>> apply_function_to_columns(df, ["f"]) 

1548 ``` 

1549 <div class="result" markdown> 

1550 ```{.txt .text title="Terminal"} 

1551 ColumnDoesNotExistError: Columns ['f'] do not exist in the DataFrame. 

1552 Try one of: ["a", "b", "c", "d"] 

1553 ``` 

1554 !!! failure "Conclusion: Columns do not exist." 

1555 </div> 

1556 

1557 ??? info "Notes" 

1558 - We have to name the `function` parameter as the full name because when this function is executed as part of a chain (by using the PySpark [`.transform()`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.transform.html) method), that one uses the `func` parameter. 

1559 

1560 ??? tip "See Also" 

1561 - [`apply_function_to_column()`][toolbox_pyspark.cleaning.apply_function_to_column] 

1562 """ 

1563 columns = get_columns(dataframe, columns) 

1564 assert_columns_exists(dataframe, columns, False) 

1565 return dataframe.withColumns( 

1566 { 

1567 column: getattr(F, function)(column, *function_args, **function_kwargs) 

1568 for column in columns 

1569 } 

1570 ) 

1571 

1572 

1573# ---------------------------------------------------------------------------- # 

1574# Clean across tables #### 

1575# ---------------------------------------------------------------------------- # 

1576 

1577 

1578@typechecked 

1579def drop_matching_rows( 

1580 left_table: psDataFrame, 

1581 right_table: psDataFrame, 

1582 on_keys: Union[str, str_collection], 

1583 join_type: VALID_PYAPARK_JOIN_TYPES = "left_anti", 

1584 where_clause: Optional[str] = None, 

1585) -> psDataFrame: 

1586 """ 

1587 !!! note "Summary" 

1588 This function is designed to _remove_ any rows on the `left_table` which _are_ existing on the `right_table`. That's why the `join_type` should always be `left_anti`. 

1589 

1590 ???+ abstract "Details" 

1591 The intention behind this function is originating from the `Accumulation` layer in the BigDaS environment. The process on this table layer is to only _insert_ rows from the `left_table` to the `right_table` with are **not existing** on the `right_table`. We include the `where_clause` here so that we can control any updated rows. Specifically, we check the `editdatetime` field between the `left_table` and the `right_table`, and any record on the `left_table` where the `editdatetime` field is _greater than_ the `editdatetime` value on the `right_table`, then this row will _remain_ on the `left_table`, and will later be _updated_ on the `right_table`. 

1592 

1593 It's important to specify here that this function was created to handle the _same table_ between the `left_table` and the `right_table`, which are existing between different layers in the ADLS environment. Logically, it can be used for other purposes (it's generic enough); however, the intention was specifically for cleaning during the data pipeline processes. 

1594 

1595 Params: 

1596 left_table (psDataFrame): 

1597 The DataFrame _from which_ you will be deleting the records. 

1598 right_table (psDataFrame): 

1599 The DataFrame _from which_ to check for existing records. If any matching `on_keys` are existing on both the `right_table` and the `left_table`, then those records will be deleted from the `left_table`. 

1600 on_keys (Union[str, str_collection]): 

1601 The matching keys between the two tables. These keys (aka columns) must be existing on both the `left_table` and the `right_table`. 

1602 join_type (VALID_PYAPARK_JOIN_TYPES, optional): 

1603 The type of join to use for this process. For the best performance, keep it as the default value.<br> 

1604 Defaults to `#!py "left_anti"`. 

1605 where_clause (Optional[str], optional): 

1606 Any additional conditions to place on this join. Any records which **match** this condition will be **kept** on the `left_table`.<br> 

1607 Defaults to `#!py None`. 

1608 

1609 Returns: 

1610 (psDataFrame): 

1611 The `left_table` after it has had it's rows deleted and cleaned by the `right_table`. 

1612 

1613 ???+ example "Examples" 

1614 

1615 ```{.py .python linenums="1" title="Set up"} 

1616 >>> # Imports 

1617 >>> import pandas as pd 

1618 >>> from pyspark.sql import SparkSession 

1619 >>> from toolbox_pyspark.cleaning import drop_matching_rows 

1620 >>> 

1621 >>> # Instantiate Spark 

1622 >>> spark = SparkSession.builder.getOrCreate() 

1623 >>> 

1624 >>> # Create data 

1625 >>> left = spark.createDataFrame( 

1626 ... pd.DataFrame( 

1627 ... { 

1628 ... "a": [0, 1, 2, 3], 

1629 ... "b": ["a", "b", "c", "d"], 

1630 ... "c": [1, 1, 1, 1], 

1631 ... "d": ["2", "2", "2", "2"], 

1632 ... "n": ["a", "b", "c", "d"], 

1633 ... } 

1634 ... ) 

1635 ... ) 

1636 ... right = left.where("a in ("1", "2")") 

1637 >>> 

1638 >>> # Check 

1639 ```{.py .python linenums="1" title="Check"} 

1640 >>> left.show() 

1641 >>> right.show() 

1642 ``` 

1643 <div class="result" markdown> 

1644 ```{.txt .text title="Terminal"} 

1645 +---+---+---+---+---+ 

1646 | a | b | c | d | n | 

1647 +---+---+---+---+---+ 

1648 | 1 | a | 1 | 2 | a | 

1649 | 2 | b | 1 | 2 | b | 

1650 | 3 | c | 1 | 2 | c | 

1651 | 4 | d | 1 | 2 | d | 

1652 +---+---+---+---+---+ 

1653 ``` 

1654 ```{.txt .text title="Terminal"} 

1655 +---+---+---+---+---+ 

1656 | a | b | c | d | n | 

1657 +---+---+---+---+---+ 

1658 | 1 | a | 1 | 2 | a | 

1659 | 2 | b | 1 | 2 | b | 

1660 +---+---+---+---+---+ 

1661 ``` 

1662 </div> 

1663 

1664 ```{.py .python linenums="1" title="Example 1: Single column"} 

1665 >>> drop_matching_rows( 

1666 ... left_table=left, 

1667 ... right_table=right, 

1668 ... on_keys=["a"], 

1669 ... ).show() 

1670 ``` 

1671 <div class="result" markdown> 

1672 ```{.txt .text title="Terminal"} 

1673 +---+---+---+---+---+ 

1674 | a | b | c | d | n | 

1675 +---+---+---+---+---+ 

1676 | 3 | c | 1 | 2 | c | 

1677 | 4 | d | 1 | 2 | d | 

1678 +---+---+---+---+---+ 

1679 ``` 

1680 !!! success "Conclusion: Successfully removed the records from the `left_table` which are existing on the `right_table`." 

1681 </div> 

1682 

1683 ```{.py .python linenums="1" title="Example 2: Single column as string"} 

1684 >>> left.transform( 

1685 ... drop_matching_rows, 

1686 ... right_table=right, 

1687 ... on_keys="a", 

1688 ... ).show() 

1689 ``` 

1690 <div class="result" markdown> 

1691 ```{.txt .text title="Terminal"} 

1692 +---+---+---+---+---+ 

1693 | a | b | c | d | n | 

1694 +---+---+---+---+---+ 

1695 | 3 | c | 1 | 2 | c | 

1696 | 4 | d | 1 | 2 | d | 

1697 +---+---+---+---+---+ 

1698 ``` 

1699 !!! success "Conclusion: Successfully removed the records from the `left_table` which are existing on the `right_table`." 

1700 </div> 

1701 

1702 ```{.py .python linenums="1" title="Example 3: Multiple key columns"} 

1703 >>> drop_matching_rows( 

1704 ... left_table=left, 

1705 ... right_table=right, 

1706 ... on_keys=["a", "b"], 

1707 ... ).show() 

1708 ``` 

1709 <div class="result" markdown> 

1710 ```{.txt .text title="Terminal"} 

1711 +---+---+---+---+---+ 

1712 | a | b | c | d | n | 

1713 +---+---+---+---+---+ 

1714 | 3 | c | 1 | 2 | c | 

1715 | 4 | d | 1 | 2 | d | 

1716 +---+---+---+---+---+ 

1717 ``` 

1718 !!! success "Conclusion: Successfully removed the records from the `left_table` which are existing on the `right_table`." 

1719 </div> 

1720 

1721 ```{.py .python linenums="1" title="Example 4: Including `where` clause"} 

1722 >>> drop_matching_rows( 

1723 ... left_table=left, 

1724 ... right_table=right, 

1725 ... on_keys=["a"], 

1726 ... where_clause="n <> 'd'", 

1727 ... ).show() 

1728 ``` 

1729 <div class="result" markdown> 

1730 ```{.txt .text title="Terminal"} 

1731 +---+---+---+---+---+ 

1732 | a | b | c | d | n | 

1733 +---+---+---+---+---+ 

1734 | 3 | c | 1 | 2 | c | 

1735 +---+---+---+---+---+ 

1736 ``` 

1737 !!! success "Conclusion: Successfully removed the records from the `left_table` which are existing on the `right_table` and matched the `where` clause." 

1738 </div> 

1739 

1740 ```{.py .python linenums="1" title="Example 5: Part of pipe"} 

1741 >>> left.transform( 

1742 ... func=drop_matching_rows, 

1743 ... right_table=right, 

1744 ... on_keys=["a"], 

1745 ... ).show() 

1746 ``` 

1747 <div class="result" markdown> 

1748 ```{.txt .text title="Terminal"} 

1749 +---+---+---+---+---+ 

1750 | a | b | c | d | n | 

1751 +---+---+---+---+---+ 

1752 | 3 | c | 1 | 2 | c | 

1753 | 4 | d | 1 | 2 | d | 

1754 +---+---+---+---+---+ 

1755 ``` 

1756 !!! success "Conclusion: Successfully removed the records from the `left_table` which are existing on the `right_table`." 

1757 </div> 

1758 

1759 ```{.py .python linenums="1" title="Example 6: Invalid column"} 

1760 >>> drop_matching_rows( 

1761 ... left_table=left, 

1762 ... right_table=right, 

1763 ... on_keys=["f"], 

1764 ... ) 

1765 ``` 

1766 <div class="result" markdown> 

1767 ```{.txt .text title="Terminal"} 

1768 ColumnDoesNotExistError: Columns ['f'] do not exist in the DataFrame. 

1769 Try one of: ["a", "b", "c", "d", "n"] 

1770 ``` 

1771 !!! failure "Conclusion: Columns do not exist." 

1772 </div> 

1773 

1774 ??? info "Notes" 

1775 - The `on_keys` parameter can be a single string or a list of strings. This is to allow for multiple columns to be used as the matching keys. 

1776 - The `where_clause` parameter is optional. If specified, then only the records which match the condition will be kept on the `left_table`. It is applied after the join. If not specified, then all records which are existing on the `right_table` will be removed from the `left_table`. 

1777 

1778 ??? tip "See Also" 

1779 - [`assert_columns_exists()`][toolbox_pyspark.checks.assert_columns_exists] 

1780 """ 

1781 on_keys = [on_keys] if is_type(on_keys, str) else on_keys 

1782 assert_columns_exists(left_table, on_keys, False) 

1783 assert_columns_exists(right_table, on_keys, False) 

1784 return ( 

1785 left_table.alias("left") 

1786 .join(right_table.alias("right"), on=on_keys, how=join_type) 

1787 .where("1=1" if where_clause is None else where_clause) 

1788 .select([f"left.{col}" for col in left_table.columns]) 

1789 )