Coverage for src/toolbox_pyspark/dimensions.py: 100%

56 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-25 23:08 +0000

1# ============================================================================ # 

2# # 

3# Title : Dimensions # 

4# Purpose : Check the dimensions of a`pyspark` `dataframes`. # 

5# # 

6# ============================================================================ # 

7 

8 

9# ---------------------------------------------------------------------------- # 

10# # 

11# Overview #### 

12# # 

13# ---------------------------------------------------------------------------- # 

14 

15 

16# ---------------------------------------------------------------------------- # 

17# Description #### 

18# ---------------------------------------------------------------------------- # 

19 

20 

21""" 

22!!! note "Summary" 

23 The `dimensions` module is used for checking the dimensions of `pyspark` `dataframe`'s. 

24""" 

25 

26 

27# ---------------------------------------------------------------------------- # 

28# # 

29# Setup #### 

30# # 

31# ---------------------------------------------------------------------------- # 

32 

33 

34# ---------------------------------------------------------------------------- # 

35# Imports #### 

36# ---------------------------------------------------------------------------- # 

37 

38 

39# ## Python StdLib Imports ---- 

40from copy import deepcopy 

41from typing import Dict, Optional, Union 

42 

43# ## Python Third Party Imports ---- 

44import numpy as np 

45from pandas import DataFrame as pdDataFrame 

46from pyspark.sql import DataFrame as psDataFrame, functions as F 

47from toolbox_python.checkers import is_type 

48from toolbox_python.collection_types import str_collection, str_list 

49from typeguard import typechecked 

50 

51# ## Local First Party Imports ---- 

52from toolbox_pyspark.checks import assert_column_exists, assert_columns_exists 

53 

54 

55# ---------------------------------------------------------------------------- # 

56# Exports #### 

57# ---------------------------------------------------------------------------- # 

58 

59 

60__all__: str_list = [ 

61 "get_dims", 

62 "get_dims_of_tables", 

63 "make_dimension_table", 

64 "replace_columns_with_dimension_id", 

65] 

66 

67 

68# ---------------------------------------------------------------------------- # 

69# # 

70# Functions #### 

71# # 

72# ---------------------------------------------------------------------------- # 

73 

74 

75# ---------------------------------------------------------------------------- # 

76# Functions #### 

77# ---------------------------------------------------------------------------- # 

78 

79 

80@typechecked 

81def get_dims( 

82 dataframe: psDataFrame, 

83 use_names: bool = True, 

84 use_comma: bool = True, 

85) -> Union[dict[str, str], dict[str, int], tuple[str, str], tuple[int, int]]: 

86 """ 

87 !!! note "Summary" 

88 Extract the dimensions of a given `dataframe`. 

89 

90 Params: 

91 dataframe (psDataFrame): 

92 The table to check. 

93 use_names (bool, optional): 

94 Whether or not to add `names` to the returned object.<br> 

95 If `#!py True`, then will return a `#!py dict` with two keys only, for the number of `rows` and `cols`.<br> 

96 Defaults to `#!py True`. 

97 use_comma (bool, optional): 

98 Whether or not to add a comma `,` to the returned object.<br> 

99 Defaults to `#!py True`. 

100 

101 Raises: 

102 TypeError: 

103 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

104 

105 Returns: 

106 (Union[Dict[str, Union[str, int]], tuple[str, ...], tTuple[int, ...]]): 

107 The dimensions of the given `dataframe`. 

108 

109 ???+ example "Examples" 

110 

111 ```{.py .python linenums="1" title="Set up"} 

112 >>> # Imports 

113 >>> import pandas as pd 

114 >>> from pyspark.sql import SparkSession 

115 >>> from toolbox_pyspark.dimensions import get_dims 

116 >>> 

117 >>> # Instantiate Spark 

118 >>> spark = SparkSession.builder.getOrCreate() 

119 >>> 

120 >>> # Create data 

121 >>> df = spark.createDataFrame( 

122 ... pd.DataFrame({ 

123 ... 'a': range(5000), 

124 ... 'b': range(5000), 

125 ... }) 

126 ... ) 

127 >>> 

128 >>> # Check 

129 >>> print(df.count()) 

130 >>> print(len(df.columns)) 

131 ``` 

132 <div class="result" markdown> 

133 ```{.txt .text title="Terminal"} 

134 5000 

135 ``` 

136 ```{.txt .text title="Terminal"} 

137 2 

138 ``` 

139 </div> 

140 

141 ```{.py .python linenums="1" title="Names and commas"} 

142 >>> print(get_dims(dataframe=df, use_names=True, use_commas=True)) 

143 ``` 

144 <div class="result" markdown> 

145 ```{.sh .shell title="Terminal"} 

146 {"rows": "5,000", "cols": "2"} 

147 ``` 

148 </div> 

149 

150 ```{.py .python linenums="1" title="Names but no commas"} 

151 >>> print(get_dims(dataframe=df, use_names=True, use_commas=False)) 

152 ``` 

153 <div class="result" markdown> 

154 ```{.sh .shell title="Terminal"} 

155 {"rows": 5000, "cols": 2} 

156 ``` 

157 </div> 

158 

159 ```{.py .python linenums="1" title="Commas but no names"} 

160 >>> print(get_dims(dataframe=df, use_names=False, use_commas=True)) 

161 ``` 

162 <div class="result" markdown> 

163 ```{.sh .shell title="Terminal"} 

164 ("5,000", "2") 

165 ``` 

166 </div> 

167 

168 ```{.py .python linenums="1" title="Neither names nor commas"} 

169 >>> print(get_dims(dataframe=df, use_names=False, use_commas=False)) 

170 ``` 

171 <div class="result" markdown> 

172 ```{.sh .shell title="Terminal"} 

173 (5000, 2) 

174 ``` 

175 </div> 

176 """ 

177 dims: tuple[int, int] = (dataframe.count(), len(dataframe.columns)) 

178 if use_names and use_comma: 

179 return {"rows": f"{dims[0]:,}", "cols": f"{dims[1]:,}"} 

180 elif use_names and not use_comma: 

181 return {"rows": dims[0], "cols": dims[1]} 

182 elif not use_names and use_comma: 

183 return (f"{dims[0]:,}", f"{dims[1]:,}") 

184 else: 

185 return dims 

186 

187 

188@typechecked 

189def get_dims_of_tables( 

190 tables: str_list, 

191 scope: Optional[dict] = None, 

192 use_comma: bool = True, 

193) -> pdDataFrame: 

194 """ 

195 !!! note "Summary" 

196 Take in a list of the names of some tables, and for each of them, check their dimensions. 

197 

198 ???+ abstract "Details" 

199 This function will check against the `#!py global()` scope. So you need to be careful if you're dealing with massive amounts of data in memory. 

200 

201 Params: 

202 tables (str_list): 

203 The list of the tables that will be checked. 

204 scope (dict, optional): 

205 This is the scope against which the tables will be checked.<br> 

206 If `#!py None`, then it will use the `#!py global()` scope by default..<br> 

207 Defaults to `#!py None`. 

208 use_comma (bool, optional): 

209 Whether or not the dimensions from the tables should be formatted as a string with a comma as the thousandths delimiter.<br> 

210 Defaults to `#!py True`. 

211 

212 Raises: 

213 TypeError: 

214 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

215 

216 Returns: 

217 (pdDataFrame): 

218 A `pandas` `dataframe` with four columns: `#!py ["table", "type", "rows", "cols"]`. 

219 

220 ???+ example "Examples" 

221 

222 ```{.py .python linenums="1" title="Set up"} 

223 >>> # Imports 

224 >>> import pandas as pd 

225 >>> from pyspark.sql import SparkSession 

226 >>> from toolbox_pyspark.dimensions import get_dims_of_tables, get_dims 

227 >>> 

228 >>> # Instantiate Spark 

229 >>> spark = SparkSession.builder.getOrCreate() 

230 >>> 

231 >>> # Create data 

232 >>> df1 = spark.createDataFrame( 

233 ... pd.DataFrame({ 

234 ... 'a': range(5000), 

235 ... 'b': range(5000), 

236 ... }) 

237 ... ) 

238 >>> df2 = spark.createDataFrame( 

239 ... pd.DataFrame({ 

240 ... 'a': range(10000), 

241 ... 'b': range(10000), 

242 ... 'c': range(10000), 

243 ... }) 

244 ... ) 

245 >>> 

246 >>> # Check 

247 >>> print(get_dims(df1)) 

248 >>> print(get_dims(df1)) 

249 ``` 

250 <div class="result" markdown> 

251 ```{.txt .text title="Terminal"} 

252 {"rows": "5000", "cols": "2"} 

253 ``` 

254 ```{.txt .text title="Terminal"} 

255 {"rows": "10000", "cols": "3"} 

256 ``` 

257 </div> 

258 

259 ```{.py .python linenums="1" title="Basic usage"} 

260 >>> print(get_dims_of_tables(['df1', 'df2'])) 

261 ``` 

262 <div class="result" markdown> 

263 ```{.txt .text} 

264 table type rows cols 

265 0 df1 5,000 2 

266 1 df2 1,000 3 

267 ``` 

268 </div> 

269 

270 ```{.py .python linenums="1" title="No commas"} 

271 >>> print(get_dims_of_tables(['df1', 'df2'], use_commas=False)) 

272 ``` 

273 <div class="result" markdown> 

274 ```{.txt .text} 

275 table type rows cols 

276 0 df1 5000 2 

277 1 df2 1000 3 

278 ``` 

279 </div> 

280 

281 ```{.py .python linenums="1" title="Missing DF"} 

282 >>> display(get_dims_of_tables(['df1', 'df2', 'df3'], use_comma=False)) 

283 ``` 

284 <div class="result" markdown> 

285 ```{.txt .text} 

286 table type rows cols 

287 0 df1 5000 2 

288 1 df2 1000 3 

289 1 df3 NaN NaN 

290 ``` 

291 </div> 

292 

293 ??? info "Notes" 

294 - The first column of the returned table is the name of the table from the `scope` provided. 

295 - The second column of the returned table is the `type` of the table. That is, whether the table is one of `#!py ["prd", "arc", "acm"]`, which are for 'production', 'archive', accumulation' categories. This is designated by the table containing an underscore (`_`), and having a suffic of either one of: `#!py "prd"`, `#!py "arc"`, or `#!py "acm"`. If the table does not contain this info, then the value in this second column will just be blank. 

296 - If one of the tables given in the `tables` list does not exist in the `scope`, then the values given in the `rows` and `cols` columns will either be the values: `#!py np.nan` or `#!py "Did not load"`. 

297 """ 

298 sizes: Dict[str, list] = { 

299 "table": list(), 

300 "type": list(), 

301 "rows": list(), 

302 "cols": list(), 

303 } 

304 rows: Union[str, int, float] 

305 cols: Union[str, int, float] 

306 for tbl, typ in [ 

307 ( 

308 table.rsplit("_", 1) 

309 if "_" in table and table.endswith(("acm", "arc", "prd")) 

310 else (table, "") 

311 ) 

312 for table in tables 

313 ]: 

314 try: 

315 tmp: psDataFrame = eval( 

316 f"{tbl}{f'_{typ}' if typ!='' else ''}", 

317 globals() if scope is None else scope, 

318 ) 

319 rows, cols = get_dims(tmp, use_names=False, use_comma=use_comma) 

320 except Exception: 

321 if use_comma: 

322 rows = cols = "Did not load" 

323 else: 

324 rows = cols = np.nan 

325 sizes["table"].append(tbl) 

326 sizes["type"].append(typ) 

327 sizes["rows"].append(rows) 

328 sizes["cols"].append(cols) 

329 return pdDataFrame(sizes) 

330 

331 

332@typechecked 

333def make_dimension_table( 

334 dataframe: psDataFrame, 

335 columns: Union[str, str_collection], 

336 index_prefix: str = "id", 

337) -> psDataFrame: 

338 """ 

339 !!! note "Summary" 

340 Create a dimension table from the specified columns of a given `pyspark` dataframe. 

341 

342 ???+ abstract "Details" 

343 This function will create a dimension table from the specified columns of a given `pyspark` dataframe. The dimension table will contain the unique values of the specified columns, along with an index column that will be used to replace the original columns in the original dataframe. 

344 

345 index column will be named according to the `index_prefix` parameter. If only one column is specified, then the index column will be named according to the `index_prefix` parameter followed by the name of the column. If multiple columns are specified, then the index column will be named according to the `index_prefix` parameter only. The index column will be created by using the `#!py row_number()` window function over the specified columns. 

346 

347 The dimension table will be created by selecting the specified columns from the original dataframe, then applying the `#!py distinct()` function to get the unique values, and finally applying the `#!py row_number()` window function to create the index column. 

348 

349 Params: 

350 dataframe (psDataFrame): 

351 The DataFrame to create the dimension table from. 

352 columns (Union[str, str_collection]): 

353 The column(s) to include in the dimension table. 

354 index_prefix (str, optional): 

355 The prefix to use for the index column.<br> 

356 Defaults to `#!py "id"`. 

357 

358 Raises: 

359 TypeError: 

360 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

361 ColumnDoesNotExistError: 

362 If any of the columns specified do not exist in the dataframe. 

363 

364 Returns: 

365 (psDataFrame): 

366 The dimension table. 

367 

368 ???+ example "Examples" 

369 

370 ```{.py .python linenums="1" title="Set up"} 

371 >>> # Imports 

372 >>> import pandas as pd 

373 >>> from pyspark.sql import SparkSession 

374 >>> from toolbox_pyspark.dimensions import make_dimension_table 

375 >>> 

376 >>> # Instantiate Spark 

377 >>> spark = SparkSession.builder.getOrCreate() 

378 >>> 

379 >>> # Create data 

380 >>> df = spark.createDataFrame( 

381 ... pd.DataFrame( 

382 ... { 

383 ... "a": [1, 2, 3, 4], 

384 ... "b": ["a", "b", "c", "d"], 

385 ... "c": [1, 1, 2, 2], 

386 ... "d": ["a", "b", "b", "b"], 

387 ... "e": ["x", "x", "y", "z"], 

388 ... } 

389 ... ) 

390 ... ) 

391 >>> 

392 >>> # Check 

393 >>> df.show() 

394 ``` 

395 <div class="result" markdown> 

396 ```{.txt .text title="Terminal"} 

397 +---+---+---+---+---+ 

398 | a | b | c | d | e | 

399 +---+---+---+---+---+ 

400 | 1 | a | 1 | a | x | 

401 | 2 | b | 1 | b | x | 

402 | 3 | c | 2 | b | y | 

403 | 4 | d | 2 | b | z | 

404 +---+---+---+---+---+ 

405 ``` 

406 </div> 

407 

408 ```{.py .python linenums="1" title="Example 1: Create dimension table with single column"} 

409 >>> dim_table = make_dimension_table(df, "d") 

410 >>> dim_table.show() 

411 ``` 

412 <div class="result" markdown> 

413 ```{.txt .text title="Terminal"} 

414 +------+---+ 

415 | id_d | d | 

416 +------+---+ 

417 | 1 | a | 

418 | 2 | b | 

419 +------+---+ 

420 ``` 

421 !!! success "Conclusion: Successfully created dimension table with single column." 

422 </div> 

423 

424 ```{.py .python linenums="1" title="Example 2: Create dimension table with multiple columns"} 

425 >>> dim_table = make_dimension_table(df, ["c", "d"]) 

426 >>> dim_table.show() 

427 ``` 

428 <div class="result" markdown> 

429 ```{.txt .text title="Terminal"} 

430 +----+---+---+ 

431 | id | c | d | 

432 +----+---+---+ 

433 | 1 | 1 | a | 

434 | 2 | 1 | b | 

435 | 3 | 2 | b | 

436 +----+---+---+ 

437 ``` 

438 !!! success "Conclusion: Successfully created dimension table with multiple columns." 

439 </div> 

440 

441 ```{.py .python linenums="1" title="Example 3: Use different prefix"} 

442 >>> dim_table = make_dimension_table(df, "e", "index") 

443 >>> dim_table.show() 

444 ``` 

445 <div class="result" markdown> 

446 ```{.txt .text title="Terminal"} 

447 +---------+---+ 

448 | index_e | e | 

449 +---------+---+ 

450 | 1 | x | 

451 | 2 | y | 

452 | 3 | z | 

453 +---------+---+ 

454 ``` 

455 !!! success "Conclusion: Successfully created dimension table with different prefix." 

456 </div> 

457 

458 ```{.py .python linenums="1" title="Example 4: Invalid column"} 

459 >>> dim_table = make_dimension_table(df, "123") 

460 ``` 

461 <div class="result" markdown> 

462 ```{.txt .text title="Terminal"} 

463 ColumnDoesNotExistError: Column '123' does not exist in the DataFrame. 

464 ``` 

465 !!! failure "Conclusion: Failed to create dimension table due to invalid column name." 

466 </div> 

467 

468 ??? tip "See Also" 

469 - [`replace_columns_with_dimension_id`][toolbox_pyspark.dimensions.replace_columns_with_dimension_id] 

470 """ 

471 columns = [columns] if is_type(columns, str) else columns 

472 assert_columns_exists(dataframe, columns) 

473 index_name: str = f"{index_prefix}_{columns[0]}" if len(columns) == 1 else index_prefix 

474 return ( 

475 dataframe.select(*columns) 

476 .distinct() 

477 .withColumn( 

478 index_name, 

479 F.expr(f"row_number() over (order by {', '.join(columns)})").cast("int"), 

480 ) 

481 .select(index_name, *columns) 

482 ) 

483 

484 

485@typechecked 

486def replace_columns_with_dimension_id( 

487 fct_dataframe: psDataFrame, 

488 dim_dataframe: psDataFrame, 

489 cols_to_replace: Union[str, str_collection], 

490 dim_id_col: Optional[str] = None, 

491) -> psDataFrame: 

492 """ 

493 !!! note "Summary" 

494 Replace the specified columns in a given `pyspark` dataframe with the corresponding dimension table IDs. 

495 

496 ???+ abstract "Details" 

497 This function will replace the specified columns in a given `pyspark` dataframe with the corresponding dimension table IDs. The dimension table IDs will be obtained by joining the dimension table with the original dataframe on the specified columns. The original columns will then be dropped from the original dataframe. 

498 

499 The dimension table IDs will be added to the original dataframe to replace the columns specified in `cols_to_replace`. The dimension table IDs will be obtained by joining the dimension table with the original dataframe on the specified columns. 

500 

501 The join will be performed using a left join, so that any rows in the original dataframe that do not have a corresponding row in the dimension table will have a `#!sql null` value for the dimension table ID. The original columns will be dropped from the original dataframe after the join. The resulting dataframe will have the same number of rows as the original dataframe, but with the specified columns replaced by the dimension table IDs. 

502 

503 Params: 

504 fct_dataframe (psDataFrame): 

505 The DataFrame to replace the columns in. 

506 dim_dataframe (psDataFrame): 

507 The dimension table containing the IDs. 

508 cols_to_replace (Union[str, str_collection]): 

509 The column(s) to replace with the dimension table IDs. 

510 dim_id_col (str, optional): 

511 The name of the column in the dimension table containing the IDs.<br> 

512 If `#!py None`, then will use the first column of the dimension table.<br> 

513 Defaults to `#!py None`. 

514 

515 Raises: 

516 TypeError: 

517 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

518 ColumnDoesNotExistError: 

519 If any of the columns specified do not exist in the dataframes. 

520 

521 Returns: 

522 (psDataFrame): 

523 The DataFrame with the columns replaced by the dimension table IDs. 

524 

525 ???+ example "Examples" 

526 

527 ```{.py .python linenums="1" title="Set up"} 

528 >>> # Imports 

529 >>> import pandas as pd 

530 >>> from pyspark.sql import SparkSession 

531 >>> 

532 >>> # Instantiate Spark 

533 >>> from toolbox_pyspark.dimensions import make_dimension_table, replace_columns_with_dimension_id 

534 >>> spark = SparkSession.builder.getOrCreate() 

535 >>> 

536 >>> # Create data 

537 >>> df = spark.createDataFrame( 

538 ... pd.DataFrame( 

539 ... { 

540 ... "a": [1, 2, 3, 4], 

541 ... "b": ["a", "b", "c", "d"], 

542 ... "c": [1, 1, 2, 2], 

543 ... "d": ["a", "b", "b", "b"], 

544 ... "e": ["x", "x", "y", "z"], 

545 ... } 

546 ... ) 

547 ... ) 

548 >>> dim_table1 = make_dimension_table(df, "d") 

549 >>> dim_table2 = make_dimension_table(df, "e") 

550 >>> dim_table3 = make_dimension_table(df, ("c", "d")) 

551 >>> 

552 >>> # Check 

553 >>> df.show() 

554 >>> dim_table1.show() 

555 >>> dim_table2.show() 

556 >>> dim_table3.show() 

557 ``` 

558 <div class="result" markdown> 

559 ```{.txt .text title="Terminal"} 

560 +---+---+---+---+---+ 

561 | a | b | c | d | e | 

562 +---+---+---+---+---+ 

563 | 1 | a | 1 | a | x | 

564 | 2 | b | 1 | b | x | 

565 | 3 | c | 2 | b | y | 

566 | 4 | d | 2 | b | z | 

567 +---+---+---+---+---+ 

568 ``` 

569 ```{.txt .text title="Terminal"} 

570 +------+---+ 

571 | id_d | d | 

572 +------+---+ 

573 | 1 | a | 

574 | 2 | b | 

575 +------+---+ 

576 ``` 

577 ```{.txt .text title="Terminal"} 

578 +------+---+ 

579 | id_e | e | 

580 +------+---+ 

581 | 1 | x | 

582 | 2 | y | 

583 | 3 | z | 

584 +------+---+ 

585 ``` 

586 ```{.txt .text title="Terminal"} 

587 +----+---+---+ 

588 | id | c | d | 

589 +----+---+---+ 

590 | 1 | 1 | a | 

591 | 2 | 1 | b | 

592 | 3 | 2 | b | 

593 +----+---+---+ 

594 ``` 

595 </div> 

596 

597 ```{.py .python linenums="1" title="Example 1: Replace single column with dimension ID"} 

598 >>> df_replaced = replace_columns_with_dimension_id(df, dim_table1, "d") 

599 >>> df_replaced.show() 

600 ``` 

601 <div class="result" markdown> 

602 ```{.txt .text title="Terminal"} 

603 +---+---+---+------+---+ 

604 | a | b | c | id_d | e | 

605 +---+---+---+------+---+ 

606 | 1 | a | 1 | 1 | x | 

607 | 2 | b | 1 | 2 | x | 

608 | 3 | c | 2 | 2 | y | 

609 | 4 | d | 2 | 2 | z | 

610 +---+---+---+------+---+ 

611 ``` 

612 !!! success "Conclusion: Successfully replaced single column with dimension ID." 

613 </div> 

614 

615 ```{.py .python linenums="1" title="Example 2: Replace single column with dimension ID"} 

616 >>> df_replaced = replace_columns_with_dimension_id(df, dim_table2, "e") 

617 >>> df_replaced.show() 

618 ``` 

619 <div class="result" markdown> 

620 ```{.txt .text title="Terminal"} 

621 +---+---+---+---+------+ 

622 | a | b | c | d | id_e | 

623 +---+---+---+---+------+ 

624 | 1 | a | 1 | a | 1 | 

625 | 2 | b | 1 | b | 1 | 

626 | 3 | c | 2 | b | 2 | 

627 | 4 | d | 2 | b | 3 | 

628 +---+---+---+---+------+ 

629 ``` 

630 !!! success "Conclusion: Successfully replaced single column with dimension ID." 

631 </div> 

632 

633 ```{.py .python linenums="1" title="Example 3: Replace multiple columns with dimension IDs"} 

634 >>> df_replaced_multi = replace_columns_with_dimension_id(df, dim_table3, ["c", "d"]) 

635 >>> df_replaced_multi.show() 

636 ``` 

637 <div class="result" markdown> 

638 ```{.txt .text title="Terminal"} 

639 +---+---+----+---+ 

640 | a | b | id | e | 

641 +---+---+----+---+ 

642 | 1 | a | 1 | x | 

643 | 2 | b | 2 | x | 

644 | 3 | c | 3 | y | 

645 | 4 | d | 3 | z | 

646 +---+---+----+---+ 

647 ``` 

648 !!! success "Conclusion: Successfully replaced multiple columns with dimension IDs." 

649 </div> 

650 

651 ```{.py .python linenums="1" title="Example 4: Invalid column type"} 

652 >>> df_replaced = replace_columns_with_dimension_id(df, dim_table, "123") 

653 ``` 

654 <div class="result" markdown> 

655 ```{.txt .text title="Terminal"} 

656 ColumnDoesNotExistError: Column '123' does not exist in the DataFrame. 

657 ``` 

658 !!! failure "Conclusion: Failed to replace columns due to invalid column type." 

659 </div> 

660 

661 ??? tip "See Also" 

662 - [`make_dimension_table`][toolbox_pyspark.dimensions.make_dimension_table] 

663 """ 

664 

665 # Generate variables ---- 

666 cols_to_replace: str_list = ( 

667 [cols_to_replace] if is_type(cols_to_replace, str) else list(cols_to_replace) 

668 ) 

669 fct_cols: str_list = fct_dataframe.columns 

670 dim_cols: str_list = dim_dataframe.columns 

671 dim_id_col = dim_id_col or dim_cols[0] 

672 

673 # Check variables ---- 

674 assert_columns_exists(fct_dataframe, cols_to_replace) 

675 assert_columns_exists(dim_dataframe, cols_to_replace) 

676 assert_column_exists(dim_dataframe, dim_id_col) 

677 

678 # Perform the replacement ---- 

679 index_of_first_col: int = fct_cols.index(cols_to_replace[0]) 

680 fct_new_cols: str_list = deepcopy(fct_cols) 

681 fct_new_cols = [ 

682 *fct_new_cols[:index_of_first_col], 

683 dim_id_col, 

684 *fct_new_cols[index_of_first_col + 1 :], 

685 ] 

686 fct_removed_cols: str_list = [col for col in fct_new_cols if col not in cols_to_replace] 

687 

688 # Return ---- 

689 return ( 

690 fct_dataframe.alias("a") 

691 .join( 

692 other=dim_dataframe.alias("b"), 

693 on=[F.col(f"a.{col}") == F.col(f"b.{col}") for col in cols_to_replace], 

694 how="left", 

695 ) 

696 .select("a.*", f"b.{dim_id_col}") 

697 .select(*fct_removed_cols) 

698 )