Coverage for src/toolbox_pyspark/io.py: 100%

96 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-25 23:08 +0000

1# ============================================================================ # 

2# # 

3# Title : IO # 

4# Purpose : Read and write tables to/from directories. # 

5# # 

6# ============================================================================ # 

7 

8 

9# ---------------------------------------------------------------------------- # 

10# # 

11# Overview #### 

12# # 

13# ---------------------------------------------------------------------------- # 

14 

15 

16# ---------------------------------------------------------------------------- # 

17# Description #### 

18# ---------------------------------------------------------------------------- # 

19 

20 

21""" 

22!!! note "Summary" 

23 The `io` module is used for reading and writing tables to/from directories. 

24""" 

25 

26 

27# ---------------------------------------------------------------------------- # 

28# # 

29# Setup #### 

30# # 

31# ---------------------------------------------------------------------------- # 

32 

33 

34# ---------------------------------------------------------------------------- # 

35# Imports #### 

36# ---------------------------------------------------------------------------- # 

37 

38 

39# ## Python StdLib Imports ---- 

40from typing import Literal, Optional, get_args 

41 

42# ## Python Third Party Imports ---- 

43from pyspark.sql import DataFrame as psDataFrame, SparkSession 

44from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter 

45from toolbox_python.checkers import is_type 

46from toolbox_python.collection_types import ( 

47 str_collection, 

48 str_dict, 

49 str_list, 

50 str_tuple, 

51) 

52from typeguard import typechecked 

53 

54# ## Local First Party Imports ---- 

55from toolbox_pyspark.utils.exceptions import ValidationError 

56 

57 

58# ---------------------------------------------------------------------------- # 

59# Exports #### 

60# ---------------------------------------------------------------------------- # 

61 

62 

63__all__: str_list = [ 

64 "SPARK_FORMATS", 

65 "VALID_SPARK_FORMATS", 

66 "WRITE_MODES", 

67 "VALID_WRITE_MODES", 

68 "read_from_path", 

69 "write_to_path", 

70 "transfer_by_path", 

71 "read_from_table", 

72 "write_to_table", 

73 "transfer_by_table", 

74 "read", 

75 "write", 

76 "transfer", 

77 "load_from_path", 

78 "save_to_path", 

79 "load_from_table", 

80 "save_to_table", 

81 "load", 

82 "save", 

83] 

84 

85 

86## --------------------------------------------------------------------------- # 

87## Constants #### 

88## --------------------------------------------------------------------------- # 

89 

90 

91### Data formats ---- 

92SPARK_FORMATS = Literal[ 

93 # Built-in formats 

94 "parquet", 

95 "orc", 

96 "json", 

97 "csv", 

98 "text", 

99 "avro", 

100 # Database formats (requires JDBC drivers) 

101 "jdbc", 

102 "oracle", 

103 "mysql", 

104 "postgresql", 

105 "mssql", 

106 "db2", 

107 # Other formats (requires dependencies) 

108 "delta", # <-- Requires: `io.delta:delta-core` dependency and `delata-spark` package 

109 "xml", # <-- Requires: `com.databricks:spark-xml` dependency and `spark-xml` package 

110 "excel", # <-- Requires: `com.crealytics:spark-excel` dependency and `spark-excel` package 

111 "hive", # <-- Requires: Hive support 

112 "mongodb", # <-- Requires: `org.mongodb.spark:mongo-spark-connector` dependency and `mongo-spark-connector` package 

113 "cassandra", # <-- Requires: `com.datastax.spark:spark-cassandra-connector` dependency and `spark-cassandra-connector` package 

114 "elasticsearch", # <-- Requires: `org.elasticsearch:elasticsearch-hadoop` dependency and `elasticsearch-hadoop` package 

115] 

116""" 

117The valid formats that can be used to read/write data in Spark. 

118 

119PySpark's built-in data source formats: 

120 

121- `parquet` 

122- `orc` 

123- `json` 

124- `csv` 

125- `text` 

126- `avro` 

127 

128Database formats (with proper JDBC drivers): 

129 

130- `jdbc` 

131- `oracle` 

132- `mysql` 

133- `postgresql` 

134- `mssql` 

135- `db2` 

136 

137Other formats with additional dependencies: 

138 

139- `delta` (requires: `io.delta:delta-core` dependency and `delata-spark` package) 

140- `xml` (requires: `com.databricks:spark-xml` dependency and `spark-xml` package) 

141- `excel` (requires: `com.crealytics:spark-excel` dependency and `spark-excel` package) 

142- `hive` (requires: Hive support) 

143- `mongodb` (requires: `org.mongodb.spark:mongo-spark-connector` dependency and `mongo-spark-connector` package) 

144- `cassandra` (requires: `com.datastax.spark:spark-cassandra-connector` dependency and `spark-cassandra-connector` package) 

145- `elasticsearch` (requires: `org.elasticsearch:elasticsearch-hadoop` dependency and `elasticsearch-hadoop` package) 

146""" 

147 

148VALID_SPARK_FORMATS: str_tuple = get_args(SPARK_FORMATS) 

149""" 

150The valid formats that can be used to read/write data in Spark. 

151 

152PySpark's built-in data source formats: 

153 

154- `parquet` 

155- `orc` 

156- `json` 

157- `csv` 

158- `text` 

159- `avro` 

160 

161Database formats (with proper JDBC drivers): 

162 

163- `jdbc` 

164- `oracle` 

165- `mysql` 

166- `postgresql` 

167- `mssql` 

168- `db2` 

169 

170Other formats with additional dependencies: 

171 

172- `delta` (requires: `io.delta:delta-core` dependency and `delata-spark` package) 

173- `xml` (requires: `com.databricks:spark-xml` dependency and `spark-xml` package) 

174- `excel` (requires: `com.crealytics:spark-excel` dependency and `spark-excel` package) 

175- `hive` (requires: Hive support) 

176- `mongodb` (requires: `org.mongodb.spark:mongo-spark-connector` dependency and `mongo-spark-connector` package) 

177- `cassandra` (requires: `com.datastax.spark:spark-cassandra-connector` dependency and `spark-cassandra-connector` package) 

178- `elasticsearch` (requires: `org.elasticsearch:elasticsearch-hadoop` dependency and `elasticsearch-hadoop` package) 

179""" 

180 

181 

182### Write modes ---- 

183WRITE_MODES = Literal["append", "overwrite", "ignore", "error", "errorifexists"] 

184""" 

185The valid modes you can use for writing data frames: 

186 

187- `append` 

188- `overwrite` 

189- `ignore` 

190- `error` 

191- `errorifexists` 

192""" 

193 

194VALID_WRITE_MODES: str_tuple = get_args(WRITE_MODES) 

195""" 

196The valid modes you can use for writing data frames: 

197 

198- `append` 

199- `overwrite` 

200- `ignore` 

201- `error` 

202- `errorifexists` 

203""" 

204 

205 

206# ---------------------------------------------------------------------------- # 

207# # 

208# Path functions #### 

209# # 

210# ---------------------------------------------------------------------------- # 

211 

212 

213# ---------------------------------------------------------------------------- # 

214# Read #### 

215# ---------------------------------------------------------------------------- # 

216 

217 

218@typechecked 

219def read_from_path( 

220 spark_session: SparkSession, 

221 name: str, 

222 path: str, 

223 data_format: Optional[SPARK_FORMATS] = "parquet", 

224 read_options: Optional[str_dict] = None, 

225) -> psDataFrame: 

226 """ 

227 !!! note "Summary" 

228 Read an object from a given `path` in to memory as a `pyspark` dataframe. 

229 

230 Params: 

231 spark_session (SparkSession): 

232 The Spark session to use for the reading. 

233 name (str): 

234 The name of the table to read in. 

235 path (str): 

236 The path from which it will be read. 

237 data_format (Optional[SPARK_FORMATS], optional): 

238 The format of the object at location `path`.<br> 

239 Defaults to `#!py "delta"`. 

240 read_options (Dict[str, str], optional): 

241 Any additional obtions to parse to the Spark reader.<br> 

242 Like, for example:<br> 

243 

244 - If the object is a CSV, you may want to define that it has a header row: `#!py {"header": "true"}`. 

245 - If the object is a Delta table, you may want to query a specific version: `#!py {versionOf": "0"}`. 

246 

247 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameReader.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.options.html).<br> 

248 Defaults to `#!py dict()`. 

249 

250 Raises: 

251 TypeError: 

252 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

253 

254 Returns: 

255 (psDataFrame): 

256 The loaded dataframe. 

257 

258 ???+ example "Examples" 

259 

260 ```{.py .python linenums="1" title="Set up"} 

261 >>> # Imports 

262 >>> import pandas as pd 

263 >>> from pyspark.sql import SparkSession 

264 >>> from toolbox_pyspark.io import read_from_path 

265 >>> 

266 >>> # Instantiate Spark 

267 >>> spark = SparkSession.builder.getOrCreate() 

268 >>> 

269 >>> # Create data 

270 >>> df = pd.DataFrame( 

271 ... { 

272 ... "a": [1, 2, 3, 4], 

273 ... "b": ["a", "b", "c", "d"], 

274 ... "c": [1, 1, 1, 1], 

275 ... "d": ["2", "2", "2", "2"], 

276 ... } 

277 ... ) 

278 >>> 

279 >>> # Write data 

280 >>> df.to_csv("./test/table.csv") 

281 >>> df.to_parquet("./test/table.parquet") 

282 ``` 

283 

284 ```{.py .python linenums="1" title="Check"} 

285 >>> import os 

286 >>> print(os.listdir("./test")) 

287 ``` 

288 <div class="result" markdown> 

289 ```{.sh .shell title="Terminal"} 

290 ["table.csv", "table.parquet"] 

291 ``` 

292 </div> 

293 

294 ```{.py .python linenums="1" title="Example 1: Read CSV"} 

295 >>> df_csv = read_from_path( 

296 ... name="table.csv", 

297 ... path="./test", 

298 ... spark_session=spark, 

299 ... data_format="csv", 

300 ... options={"header": "true"}, 

301 ... ) 

302 >>> 

303 >>> df_csv.show() 

304 ``` 

305 <div class="result" markdown> 

306 ```{.txt .text title="Terminal"} 

307 +---+---+---+---+ 

308 | a | b | c | d | 

309 +---+---+---+---+ 

310 | 1 | a | 1 | 2 | 

311 | 2 | b | 1 | 2 | 

312 | 3 | c | 1 | 2 | 

313 | 4 | d | 1 | 2 | 

314 +---+---+---+---+ 

315 ``` 

316 !!! success "Conclusion: Successfully read CSV." 

317 </div> 

318 

319 ```{.py .python linenums="1" title="Example 2: Read Parquet"} 

320 >>> df_parquet = read_from_path( 

321 ... name="table.parquet", 

322 ... path="./test", 

323 ... spark_session=spark, 

324 ... data_format="parquet", 

325 ... ) 

326 >>> 

327 >>> df_parquet.show() 

328 ``` 

329 <div class="result" markdown> 

330 ```{.txt .text title="Terminal"} 

331 +---+---+---+---+ 

332 | a | b | c | d | 

333 +---+---+---+---+ 

334 | 1 | a | 1 | 2 | 

335 | 2 | b | 1 | 2 | 

336 | 3 | c | 1 | 2 | 

337 | 4 | d | 1 | 2 | 

338 +---+---+---+---+ 

339 ``` 

340 !!! success "Conclusion: Successfully read Parquet." 

341 </div> 

342 

343 ```{.py .python linenums="1" title="Example 3: Invalid Path"} 

344 >>> df_invalid_path = read_from_path( 

345 ... name="invalid_table.csv", 

346 ... path="./invalid_path", 

347 ... spark_session=spark, 

348 ... data_format="csv", 

349 ... options={"header": "true"}, 

350 ... ) 

351 ``` 

352 <div class="result" markdown> 

353 ```{.txt .text title="Terminal"} 

354 Py4JJavaError: An error occurred while calling o45.load. 

355 ``` 

356 !!! failure "Conclusion: Failed to read from invalid path." 

357 </div> 

358 

359 ```{.py .python linenums="1" title="Example 4: Invalid Format"} 

360 >>> df_invalid_format = read_from_path( 

361 ... name="table.csv", 

362 ... path="./test", 

363 ... spark_session=spark, 

364 ... data_format="invalid_format", 

365 ... options={"header": "true"}, 

366 ... ) 

367 ``` 

368 <div class="result" markdown> 

369 ```{.txt .text title="Terminal"} 

370 Py4JJavaError: An error occurred while calling o45.load. 

371 ``` 

372 !!! failure "Conclusion: Failed to read due to invalid format." 

373 </div> 

374 

375 ??? tip "See Also" 

376 - [`load_from_path`][toolbox_pyspark.io.load_from_path] 

377 - [`read`][toolbox_pyspark.io.read] 

378 - [`load`][toolbox_pyspark.io.load] 

379 """ 

380 

381 # Set default options ---- 

382 read_options: str_dict = read_options or dict() 

383 data_format: str = data_format or "parquet" 

384 load_path: str = f"{path}{'/' if not path.endswith('/') else ''}{name}" 

385 

386 # Initialise reader (including data format) ---- 

387 reader: DataFrameReader = spark_session.read.format(data_format) 

388 

389 # Add options (if exists) ---- 

390 if read_options: 

391 reader.options(**read_options) 

392 

393 # Load DataFrame ---- 

394 return reader.load(load_path) 

395 

396 

397## --------------------------------------------------------------------------- # 

398## Write #### 

399## --------------------------------------------------------------------------- # 

400 

401 

402@typechecked 

403def write_to_path( 

404 data_frame: psDataFrame, 

405 name: str, 

406 path: str, 

407 data_format: Optional[SPARK_FORMATS] = "parquet", 

408 mode: Optional[WRITE_MODES] = None, 

409 write_options: Optional[str_dict] = None, 

410 partition_cols: Optional[str_collection] = None, 

411) -> None: 

412 """ 

413 !!! note "Summary" 

414 For a given `table`, write it out to a specified `path` with name `name` and format `format`. 

415 

416 Params: 

417 data_frame (psDataFrame): 

418 The DataFrame to be written. Must be a valid `pyspark` DataFrame ([`pyspark.sql.DataFrame`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)). 

419 name (str): 

420 The name of the table where it will be written. 

421 path (str): 

422 The path location for where to save the table. 

423 data_format (Optional[SPARK_FORMATS], optional): 

424 The format that the `table` will be written to.<br> 

425 Defaults to `#!py "delta"`. 

426 mode (Optional[WRITE_MODES], optional): 

427 The behaviour for when the data already exists.<br> 

428 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.mode`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.mode.html).<br> 

429 Defaults to `#!py None`. 

430 write_options (Dict[str, str], optional): 

431 Any additional settings to parse to the writer class.<br> 

432 Like, for example: 

433 

434 - If you are writing to a Delta object, and wanted to overwrite the schema: `#!py {"overwriteSchema": "true"}`. 

435 - If you"re writing to a CSV file, and wanted to specify the header row: `#!py {"header": "true"}`. 

436 

437 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html).<br> 

438 Defaults to `#!py dict()`. 

439 partition_cols (Optional[Union[str_collection, str]], optional): 

440 The column(s) that the table should partition by.<br> 

441 Defaults to `#!py None`. 

442 

443 Raises: 

444 TypeError: 

445 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

446 

447 Returns: 

448 (type(None)): 

449 Nothing is returned. 

450 

451 ???+ tip "Note" 

452 You know that this function is successful if the table exists at the specified location, and there are no errors thrown. 

453 

454 ???+ example "Examples" 

455 

456 ```{.py .python linenums="1" title="Set up"} 

457 >>> # Imports 

458 >>> import pandas as pd 

459 >>> from pyspark.sql import SparkSession 

460 >>> from toolbox_pyspark.io import write_to_path 

461 >>> from toolbox_pyspark.checks import table_exists 

462 >>> 

463 >>> # Instantiate Spark 

464 >>> spark = SparkSession.builder.getOrCreate() 

465 >>> 

466 >>> # Create data 

467 >>> df = spark.createDataFrame( 

468 ... pd.DataFrame( 

469 ... { 

470 ... "a": [1, 2, 3, 4], 

471 ... "b": ["a", "b", "c", "d"], 

472 ... "c": [1, 1, 1, 1], 

473 ... "d": ["2", "2", "2", "2"], 

474 ... } 

475 ... ) 

476 ... ) 

477 ``` 

478 

479 ```{.py .python linenums="1" title="Check"} 

480 >>> df.show() 

481 ``` 

482 <div class="result" markdown> 

483 ```{.txt .text title="Terminal"} 

484 +---+---+---+---+ 

485 | a | b | c | d | 

486 +---+---+---+---+ 

487 | 1 | a | 1 | 2 | 

488 | 2 | b | 1 | 2 | 

489 | 3 | c | 1 | 2 | 

490 | 4 | d | 1 | 2 | 

491 +---+---+---+---+ 

492 ``` 

493 </div> 

494 

495 ```{.py .python linenums="1" title="Example 1: Write to CSV"} 

496 >>> write_to_path( 

497 ... data_frame=df, 

498 ... name="df.csv", 

499 ... path="./test", 

500 ... data_format="csv", 

501 ... mode="overwrite", 

502 ... options={"header": "true"}, 

503 ... ) 

504 >>> 

505 >>> table_exists( 

506 ... name="df.csv", 

507 ... path="./test", 

508 ... data_format="csv", 

509 ... spark_session=df.sparkSession, 

510 ... ) 

511 ``` 

512 <div class="result" markdown> 

513 ```{.sh .shell title="Terminal"} 

514 True 

515 ``` 

516 !!! success "Conclusion: Successfully written to CSV." 

517 </div> 

518 

519 ```{.py .python linenums="1" title="Example 2: Write to Parquet"} 

520 >>> write_to_path( 

521 ... data_frame=df, 

522 ... name="df.parquet", 

523 ... path="./test", 

524 ... data_format="parquet", 

525 ... mode="overwrite", 

526 ... ) 

527 >>> 

528 >>> table_exists( 

529 ... name="df.parquet", 

530 ... path="./test", 

531 ... data_format="parquet", 

532 ... spark_session=df.sparkSession, 

533 ... ) 

534 ``` 

535 <div class="result" markdown> 

536 ```{.sh .shell title="Terminal"} 

537 True 

538 ``` 

539 !!! success "Conclusion: Successfully written to Parquet." 

540 </div> 

541 

542 ```{.py .python linenums="1" title="Example 3: Invalid Path"} 

543 >>> write_to_path( 

544 ... data_frame=df, 

545 ... name="df.csv", 

546 ... path="./invalid_path", 

547 ... data_format="csv", 

548 ... mode="overwrite", 

549 ... options={"header": "true"}, 

550 ... ) 

551 ``` 

552 <div class="result" markdown> 

553 ```{.txt .text title="Terminal"} 

554 Py4JJavaError: An error occurred while calling o45.save. 

555 ``` 

556 !!! failure "Conclusion: Failed to write to invalid path." 

557 </div> 

558 

559 ```{.py .python linenums="1" title="Example 4: Invalid Format"} 

560 >>> write_to_path( 

561 ... data_frame=df, 

562 ... name="df.csv", 

563 ... path="./test", 

564 ... data_format="invalid_format", 

565 ... mode="overwrite", 

566 ... options={"header": "true"}, 

567 ... ) 

568 ``` 

569 <div class="result" markdown> 

570 ```{.txt .text title="Terminal"} 

571 Py4JJavaError: An error occurred while calling o45.save. 

572 ``` 

573 !!! failure "Conclusion: Failed to write due to invalid format." 

574 </div> 

575 

576 ??? tip "See Also" 

577 - [`save_to_path`][toolbox_pyspark.io.save_to_path] 

578 - [`write`][toolbox_pyspark.io.write] 

579 - [`save`][toolbox_pyspark.io.save] 

580 """ 

581 

582 # Set default options ---- 

583 write_options: str_dict = write_options or dict() 

584 data_format: str = data_format or "parquet" 

585 write_path: str = f"{path}{'/' if not path.endswith('/') else ''}{name}" 

586 

587 # Initialise writer (including data format) ---- 

588 writer: DataFrameWriter = data_frame.write.mode(mode).format(data_format) 

589 

590 # Add options (if exists) ---- 

591 if write_options: 

592 writer.options(**write_options) 

593 

594 # Add partition (if exists) ---- 

595 if partition_cols is not None: 

596 partition_cols = [partition_cols] if is_type(partition_cols, str) else partition_cols 

597 writer = writer.partitionBy(list(partition_cols)) 

598 

599 # Write table ---- 

600 writer.save(write_path) 

601 

602 

603## --------------------------------------------------------------------------- # 

604## Transfer #### 

605## --------------------------------------------------------------------------- # 

606 

607 

608@typechecked 

609def transfer_by_path( 

610 spark_session: SparkSession, 

611 from_table_path: str, 

612 from_table_name: str, 

613 to_table_path: str, 

614 to_table_name: str, 

615 from_table_format: Optional[SPARK_FORMATS] = "parquet", 

616 from_table_options: Optional[str_dict] = None, 

617 to_table_format: Optional[SPARK_FORMATS] = "parquet", 

618 to_table_mode: Optional[WRITE_MODES] = None, 

619 to_table_options: Optional[str_dict] = None, 

620 to_table_partition_cols: Optional[str_collection] = None, 

621) -> None: 

622 """ 

623 !!! note "Summary" 

624 Copy a table from one location to another. 

625 

626 ???+ abstract "Details" 

627 This is a blind transfer. There is no validation, no alteration, no adjustments made at all. Simply read directly from one location and move immediately to another location straight away. 

628 

629 Params: 

630 spark_session (SparkSession): 

631 The spark session to use for the transfer. Necessary in order to instantiate the reading process. 

632 from_table_path (str): 

633 The path from which the table will be read. 

634 from_table_name (str): 

635 The name of the table to be read. 

636 to_table_path (str): 

637 The location where to save the table to. 

638 to_table_name (str): 

639 The name of the table where it will be saved. 

640 from_table_format (Optional[SPARK_FORMATS], optional): 

641 The format of the data at the reading location. 

642 to_table_format (Optional[SPARK_FORMATS], optional): 

643 The format of the saved table. 

644 from_table_options (Dict[str, str], optional): 

645 Any additional obtions to parse to the Spark reader.<br> 

646 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameReader.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.options.html).<br> 

647 Defaults to `#! dict()`. 

648 to_table_mode (Optional[WRITE_MODES], optional): 

649 The behaviour for when the data already exists.<br> 

650 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.mode`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.mode.html).<br> 

651 Defaults to `#!py None`. 

652 to_table_options (Dict[str, str], optional): 

653 Any additional settings to parse to the writer class.<br> 

654 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html).<br> 

655 Defaults to `#! dict()`. 

656 to_table_partition_cols (Optional[Union[str_collection, str]], optional): 

657 The column(s) that the table should partition by.<br> 

658 Defaults to `#!py None`. 

659 

660 Raises: 

661 TypeError: 

662 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

663 

664 Returns: 

665 (type(None)): 

666 Nothing is returned. 

667 

668 ???+ tip "Note" 

669 You know that this function is successful if the table exists at the specified location, and there are no errors thrown. 

670 

671 ???+ example "Examples" 

672 

673 ```{.py .python linenums="1" title="Set up"} 

674 >>> # Imports 

675 >>> import pandas as pd 

676 >>> from pyspark.sql import SparkSession 

677 >>> from toolbox_pyspark.io import transfer_by_path 

678 >>> from toolbox_pyspark.checks import table_exists 

679 >>> 

680 >>> # Instantiate Spark 

681 >>> spark = SparkSession.builder.getOrCreate() 

682 >>> 

683 >>> # Create data 

684 >>> df = pd.DataFrame( 

685 ... { 

686 ... "a": [1, 2, 3, 4], 

687 ... "b": ["a", "b", "c", "d"], 

688 ... "c": [1, 1, 1 1], 

689 ... "d": ["2", "2", "2", "2"], 

690 ... } 

691 ... ) 

692 >>> df.to_csv("./test/table.csv") 

693 >>> df.to_parquet("./test/table.parquet") 

694 ``` 

695 

696 ```{.py .python linenums="1" title="Check"} 

697 >>> import os 

698 >>> print(os.listdir("./test")) 

699 ``` 

700 <div class="result" markdown> 

701 ```{.sh .shell title="Terminal"} 

702 ["table.csv", "table.parquet"] 

703 ``` 

704 </div> 

705 

706 ```{.py .python linenums="1" title="Example 1: Transfer CSV"} 

707 >>> transfer_by_path( 

708 ... spark_session=spark, 

709 ... from_table_path="./test", 

710 ... from_table_name="table.csv", 

711 ... from_table_format="csv", 

712 ... to_table_path="./other", 

713 ... to_table_name="table.csv", 

714 ... to_table_format="csv", 

715 ... from_table_options={"header": "true"}, 

716 ... to_table_mode="overwrite", 

717 ... to_table_options={"header": "true"}, 

718 ... ) 

719 >>> 

720 >>> table_exists( 

721 ... name="df.csv", 

722 ... path="./other", 

723 ... data_format="csv", 

724 ... spark_session=spark, 

725 ... ) 

726 ``` 

727 <div class="result" markdown> 

728 ```{.sh .shell title="Terminal"} 

729 True 

730 ``` 

731 !!! success "Conclusion: Successfully transferred CSV to CSV." 

732 </div> 

733 

734 ```{.py .python linenums="1" title="Example 2: Transfer Parquet"} 

735 >>> transfer_by_path( 

736 ... spark_session=spark, 

737 ... from_table_path="./test", 

738 ... from_table_name="table.parquet", 

739 ... from_table_format="parquet", 

740 ... to_table_path="./other", 

741 ... to_table_name="table.parquet", 

742 ... to_table_format="parquet", 

743 ... to_table_mode="overwrite", 

744 ... to_table_options={"overwriteSchema": "true"}, 

745 ... ) 

746 >>> 

747 >>> table_exists( 

748 ... name="df.parquet", 

749 ... path="./other", 

750 ... data_format="parquet", 

751 ... spark_session=spark, 

752 ... ) 

753 ``` 

754 <div class="result" markdown> 

755 ```{.sh .shell title="Terminal"} 

756 True 

757 ``` 

758 !!! success "Conclusion: Successfully transferred Parquet to Parquet." 

759 </div> 

760 

761 ```{.py .python linenums="1" title="Example 3: Transfer CSV to Parquet"} 

762 >>> transfer_by_path( 

763 ... spark_session=spark, 

764 ... from_table_path="./test", 

765 ... from_table_name="table.csv", 

766 ... from_table_format="csv", 

767 ... to_table_path="./other", 

768 ... to_table_name="table.parquet", 

769 ... to_table_format="parquet", 

770 ... to_table_mode="overwrite", 

771 ... to_table_options={"overwriteSchema": "true"}, 

772 ... ) 

773 >>> 

774 >>> table_exists( 

775 ... name="df.parquet", 

776 ... path="./other", 

777 ... data_format="parquet", 

778 ... spark_session=spark, 

779 ... ) 

780 ``` 

781 <div class="result" markdown> 

782 ```{.sh .shell title="Terminal"} 

783 True 

784 ``` 

785 !!! success "Conclusion: Successfully transferred CSV to Parquet." 

786 </div> 

787 

788 ```{.py .python linenums="1" title="Example 4: Invalid Source Path"} 

789 >>> transfer_by_path( 

790 ... spark_session=spark, 

791 ... from_table_path="./invalid_path", 

792 ... from_table_name="table.csv", 

793 ... from_table_format="csv", 

794 ... to_table_path="./other", 

795 ... to_table_name="table.csv", 

796 ... to_table_format="csv", 

797 ... from_table_options={"header": "true"}, 

798 ... to_table_mode="overwrite", 

799 ... to_table_options={"header": "true"}, 

800 ... ) 

801 ``` 

802 <div class="result" markdown> 

803 ```{.txt .text title="Terminal"} 

804 Py4JJavaError: An error occurred while calling o45.load. 

805 ``` 

806 !!! failure "Conclusion: Failed to transfer due to invalid source path." 

807 </div> 

808 

809 ```{.py .python linenums="1" title="Example 5: Invalid Target Format"} 

810 >>> transfer_by_path( 

811 ... spark_session=spark, 

812 ... from_table_path="./test", 

813 ... from_table_name="table.csv", 

814 ... from_table_format="csv", 

815 ... to_table_path="./other", 

816 ... to_table_name="table.csv", 

817 ... to_table_format="invalid_format", 

818 ... from_table_options={"header": "true"}, 

819 ... to_table_mode="overwrite", 

820 ... to_table_options={"header": "true"}, 

821 ... ) 

822 ``` 

823 <div class="result" markdown> 

824 ```{.txt .text title="Terminal"} 

825 Py4JJavaError: An error occurred while calling o45.save. 

826 ``` 

827 !!! failure "Conclusion: Failed to transfer due to invalid target format." 

828 </div> 

829 

830 ??? tip "See Also" 

831 - [`transfer`][toolbox_pyspark.io.transfer] 

832 """ 

833 

834 # Read from source ---- 

835 from_table: psDataFrame = read_from_path( 

836 name=from_table_name, 

837 path=from_table_path, 

838 spark_session=spark_session, 

839 data_format=from_table_format, 

840 read_options=from_table_options, 

841 ) 

842 

843 # Write to target ---- 

844 write_to_path( 

845 data_frame=from_table, 

846 name=to_table_name, 

847 path=to_table_path, 

848 data_format=to_table_format, 

849 mode=to_table_mode, 

850 write_options=to_table_options, 

851 partition_cols=to_table_partition_cols, 

852 ) 

853 

854 

855# ---------------------------------------------------------------------------- # 

856# # 

857# Table functions #### 

858# # 

859# ---------------------------------------------------------------------------- # 

860 

861 

862def _validate_table_name(table: str) -> None: 

863 if "/" in table: 

864 raise ValidationError(f"Invalid table. Cannot contain `/`: `{table}`.") 

865 if len(table.split(".")) != 2: 

866 raise ValidationError( 

867 f"Invalid table. Should be in the format `schema.table`: `{table}`." 

868 ) 

869 

870 

871## --------------------------------------------------------------------------- # 

872## Read #### 

873## --------------------------------------------------------------------------- # 

874 

875 

876@typechecked 

877def read_from_table( 

878 spark_session: SparkSession, 

879 name: str, 

880 schema: Optional[str] = None, 

881 data_format: Optional[SPARK_FORMATS] = "parquet", 

882 read_options: Optional[str_dict] = None, 

883) -> psDataFrame: 

884 """ 

885 !!! note "Summary" 

886 Read a table from a given `schema` and `name` into memory as a `pyspark` dataframe. 

887 

888 ???+ abstract "Details" 

889 - If `schema` is `#!py None`, then we would expect the `name` to contain both the schema and the table name in the same. Like: `schema.name`, for example `production.orders`. 

890 - Else, if `schema` is not `#! None`, then we would expect the `schema` to (quite logically) contain the name of the schema, and the `name` to contain the name of the table. 

891 

892 Params: 

893 spark_session (SparkSession): 

894 The Spark session to use for the reading. 

895 name (str): 

896 The name of the table to read in. 

897 schema (Optional[str], optional): 

898 The schema of the table to read in.<br> 

899 Defaults to `#!py None`. 

900 data_format (Optional[SPARK_FORMATS], optional): 

901 The format of the table.<br> 

902 Defaults to `#!py "parquet"`. 

903 read_options (Dict[str, str], optional): 

904 Any additional options to parse to the Spark reader.<br> 

905 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameReader.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.options.html).<br> 

906 Defaults to `#!py dict()`. 

907 

908 Raises: 

909 TypeError: 

910 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

911 ValidationError: 

912 If `name` contains `/`, or is structured with three elements like: `source.schema.table`. 

913 

914 Returns: 

915 (psDataFrame): 

916 The loaded dataframe. 

917 

918 ???+ example "Examples" 

919 

920 ```{.py .python linenums="1" title="Set up"} 

921 >>> # Imports 

922 >>> import pandas as pd 

923 >>> from pyspark.sql import SparkSession 

924 >>> from toolbox_pyspark.io import read_from_table 

925 >>> 

926 >>> # Instantiate Spark 

927 >>> spark = SparkSession.builder.getOrCreate() 

928 >>> 

929 >>> # Create data 

930 >>> df = pd.DataFrame( 

931 ... { 

932 ... "a": [1, 2, 3, 4], 

933 ... "b": ["a", "b", "c", "d"], 

934 ... "c": [1, 1, 1, 1], 

935 ... "d": ["2", "2", "2", "2"], 

936 ... } 

937 ... ) 

938 >>> df.to_parquet("./test/table.parquet") 

939 >>> spark.read.parquet("./test/table.parquet").createOrReplaceTempView("test_table") 

940 ``` 

941 

942 ```{.py .python linenums="1" title="Example 1: Read Table"} 

943 >>> df_table = read_from_table( 

944 ... name="test_table", 

945 ... spark_session=spark, 

946 ... data_format="parquet", 

947 ... ) 

948 >>> 

949 >>> df_table.show() 

950 ``` 

951 <div class="result" markdown> 

952 ```{.txt .text title="Terminal"} 

953 +---+---+---+---+ 

954 | a | b | c | d | 

955 +---+---+---+---+ 

956 | 1 | a | 1 | 2 | 

957 | 2 | b | 1 | 2 | 

958 | 3 | c | 1 | 2 | 

959 | 4 | d | 1 | 2 | 

960 +---+---+---+---+ 

961 ``` 

962 !!! success "Conclusion: Successfully read table." 

963 </div> 

964 

965 ```{.py .python linenums="1" title="Example 2: Invalid table structure"} 

966 >>> df_table = read_from_table( 

967 ... name="schema.test_table", 

968 ... schema="source", 

969 ... spark_session=spark, 

970 ... data_format="parquet", 

971 ... ) 

972 ``` 

973 <div class="result" markdown> 

974 ```{.txt .text title="Terminal"} 

975 Invalid table. Should be in the format `schema.table`: `source.schema.test_table`. 

976 ``` 

977 !!! failure "Conclusion: Failed to write to table due to invalid table structure." 

978 </div> 

979 

980 ??? tip "See Also" 

981 - [`save_to_table`][toolbox_pyspark.io.save_to_table] 

982 - [`write`][toolbox_pyspark.io.write] 

983 - [`save`][toolbox_pyspark.io.save] 

984 """ 

985 

986 # Set default options ---- 

987 data_format: str = data_format or "parquet" 

988 table: str = name if not schema else f"{schema}.{name}" 

989 

990 # Validate that `table` is in the correct format ---- 

991 _validate_table_name(table) 

992 

993 # Initialise reader (including data format) ---- 

994 reader: DataFrameReader = spark_session.read.format(data_format) 

995 

996 # Add options (if exists) ---- 

997 if read_options: 

998 reader.options(**read_options) 

999 

1000 # Load DataFrame ---- 

1001 return reader.table(table) 

1002 

1003 

1004## --------------------------------------------------------------------------- # 

1005## Write #### 

1006## --------------------------------------------------------------------------- # 

1007 

1008 

1009@typechecked 

1010def write_to_table( 

1011 data_frame: psDataFrame, 

1012 name: str, 

1013 schema: Optional[str] = None, 

1014 data_format: Optional[SPARK_FORMATS] = "parquet", 

1015 mode: Optional[WRITE_MODES] = None, 

1016 write_options: Optional[str_dict] = None, 

1017 partition_cols: Optional[str_collection] = None, 

1018) -> None: 

1019 """ 

1020 !!! note "Summary" 

1021 For a given `data_frame`, write it out to a specified `schema` and `name` with format `data_format`. 

1022 

1023 ???+ abstract "Details" 

1024 - If `schema` is `#!py None`, then we would expect the `name` to contain both the schema and the table name in the same. Like: `schema.name`, for example `production.orders`. 

1025 - Else, if `schema` is not `#! None`, then we would expect the `schema` to (quite logically) contain the name of the schema, and the `name` to contain the name of the table. 

1026 

1027 Params: 

1028 data_frame (psDataFrame): 

1029 The DataFrame to be written. Must be a valid `pyspark` DataFrame ([`pyspark.sql.DataFrame`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)). 

1030 name (str): 

1031 The name of the table where it will be written. 

1032 schema (Optional[str], optional): 

1033 The schema of the table where it will be written.<br> 

1034 Defaults to `#!py None`. 

1035 data_format (Optional[SPARK_FORMATS], optional): 

1036 The format that the `data_frame` will be written to.<br> 

1037 Defaults to `#!py "parquet"`. 

1038 mode (Optional[WRITE_MODES], optional): 

1039 The behaviour for when the data already exists.<br> 

1040 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.mode`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.mode.html).<br> 

1041 Defaults to `#!py None`. 

1042 write_options (Dict[str, str], optional): 

1043 Any additional settings to parse to the writer class.<br> 

1044 Like, for example: 

1045 

1046 - If you are writing to a Delta object, and wanted to overwrite the schema: `#!py {"overwriteSchema": "true"}`. 

1047 - If you're writing to a CSV file, and wanted to specify the header row: `#!py {"header": "true"}`. 

1048 

1049 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html).<br> 

1050 Defaults to `#!py dict()`. 

1051 partition_cols (Optional[Union[str_collection, str]], optional): 

1052 The column(s) that the table should partition by.<br> 

1053 Defaults to `#!py None`. 

1054 

1055 Raises: 

1056 TypeError: 

1057 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

1058 ValidationError: 

1059 If `name` contains `/`, or is structured with three elements like: `source.schema.table`. 

1060 

1061 Returns: 

1062 (type(None)): 

1063 Nothing is returned. 

1064 

1065 ???+ tip "Note" 

1066 You know that this function is successful if the table exists at the specified location, and there are no errors thrown. 

1067 

1068 ???+ example "Examples" 

1069 

1070 ```{.py .python linenums="1" title="Set up"} 

1071 >>> # Imports 

1072 >>> import pandas as pd 

1073 >>> from pyspark.sql import SparkSession 

1074 >>> from toolbox_pyspark.io import write_to_table 

1075 >>> from toolbox_pyspark.checks import table_exists 

1076 >>> 

1077 >>> # Instantiate Spark 

1078 >>> spark = SparkSession.builder.getOrCreate() 

1079 >>> 

1080 >>> # Create data 

1081 >>> df = spark.createDataFrame( 

1082 ... pd.DataFrame( 

1083 ... { 

1084 ... "a": [1, 2, 3, 4], 

1085 ... "b": ["a", "b", "c", "d"], 

1086 ... "c": [1, 1, 1, 1], 

1087 ... "d": ["2", "2", "2", "2"], 

1088 ... } 

1089 ... ) 

1090 ... ) 

1091 ``` 

1092 

1093 ```{.py .python linenums="1" title="Check"} 

1094 >>> df.show() 

1095 ``` 

1096 <div class="result" markdown> 

1097 ```{.txt .text title="Terminal"} 

1098 +---+---+---+---+ 

1099 | a | b | c | d | 

1100 +---+---+---+---+ 

1101 | 1 | a | 1 | 2 | 

1102 | 2 | b | 1 | 2 | 

1103 | 3 | c | 1 | 2 | 

1104 | 4 | d | 1 | 2 | 

1105 +---+---+---+---+ 

1106 ``` 

1107 </div> 

1108 

1109 ```{.py .python linenums="1" title="Example 1: Write to Table"} 

1110 >>> write_to_table( 

1111 ... data_frame=df, 

1112 ... name="test_table", 

1113 ... schema="default", 

1114 ... data_format="parquet", 

1115 ... mode="overwrite", 

1116 ... ) 

1117 >>> 

1118 >>> table_exists( 

1119 ... name="test_table", 

1120 ... schema="default", 

1121 ... data_format="parquet", 

1122 ... spark_session=df.sparkSession, 

1123 ... ) 

1124 ``` 

1125 <div class="result" markdown> 

1126 ```{.sh .shell title="Terminal"} 

1127 True 

1128 ``` 

1129 !!! success "Conclusion: Successfully written to table." 

1130 </div> 

1131 

1132 ```{.py .python linenums="1" title="Example 2: Invalid table structure"} 

1133 >>> write_to_table( 

1134 ... data_frame=df, 

1135 ... name="schema.test_table", 

1136 ... schema="source", 

1137 ... data_format="parquet", 

1138 ... mode="overwrite", 

1139 ... ) 

1140 ``` 

1141 <div class="result" markdown> 

1142 ```{.txt .text title="Terminal"} 

1143 Invalid table. Should be in the format `schema.table`: `source.schema.test_table`. 

1144 ``` 

1145 !!! failure "Conclusion: Failed to write to table due to invalid table structure." 

1146 </div> 

1147 

1148 ??? tip "See Also" 

1149 - [`save_to_table`][toolbox_pyspark.io.save_to_table] 

1150 - [`write`][toolbox_pyspark.io.write] 

1151 - [`save`][toolbox_pyspark.io.save] 

1152 """ 

1153 

1154 # Set default options ---- 

1155 write_options: str_dict = write_options or dict() 

1156 data_format: str = data_format or "parquet" 

1157 table: str = name if not schema else f"{schema}.{name}" 

1158 

1159 # Validate that `table` is in the correct format ---- 

1160 _validate_table_name(table) 

1161 

1162 # Initialise writer (including data format) ---- 

1163 writer: DataFrameWriter = data_frame.write.mode(mode).format(data_format) 

1164 

1165 # Add options (if exists) ---- 

1166 if write_options: 

1167 writer.options(**write_options) 

1168 

1169 # Add partition (if exists) ---- 

1170 if partition_cols is not None: 

1171 partition_cols = [partition_cols] if is_type(partition_cols, str) else partition_cols 

1172 writer = writer.partitionBy(list(partition_cols)) 

1173 

1174 # Write table ---- 

1175 writer.saveAsTable(table) 

1176 

1177 

1178## --------------------------------------------------------------------------- # 

1179## Transfer #### 

1180## --------------------------------------------------------------------------- # 

1181 

1182 

1183@typechecked 

1184def transfer_by_table( 

1185 spark_session: SparkSession, 

1186 from_table_name: str, 

1187 to_table_name: str, 

1188 from_table_schema: Optional[str] = None, 

1189 from_table_format: Optional[SPARK_FORMATS] = "parquet", 

1190 from_table_options: Optional[str_dict] = None, 

1191 to_table_schema: Optional[str] = None, 

1192 to_table_format: Optional[SPARK_FORMATS] = "parquet", 

1193 to_table_mode: Optional[WRITE_MODES] = None, 

1194 to_table_options: Optional[str_dict] = None, 

1195 to_table_partition_cols: Optional[str_collection] = None, 

1196) -> None: 

1197 """ 

1198 !!! note "Summary" 

1199 Copy a table from one schema and name to another schema and name. 

1200 

1201 ???+ abstract "Details" 

1202 This is a blind transfer. There is no validation, no alteration, no adjustments made at all. Simply read directly from one table and move immediately to another table straight away. 

1203 

1204 Params: 

1205 spark_session (SparkSession): 

1206 The spark session to use for the transfer. Necessary in order to instantiate the reading process. 

1207 from_table_name (str): 

1208 The name of the table to be read. 

1209 to_table_name (str): 

1210 The name of the table where it will be saved. 

1211 from_table_schema (Optional[str], optional): 

1212 The schema of the table to be read.<br> 

1213 Defaults to `#!py None`. 

1214 from_table_format (Optional[SPARK_FORMATS], optional): 

1215 The format of the data at the reading location.<br> 

1216 Defaults to `#!py "parquet"`. 

1217 from_table_options (Dict[str, str], optional): 

1218 Any additional options to parse to the Spark reader.<br> 

1219 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameReader.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.options.html).<br> 

1220 Defaults to `#!py dict()`. 

1221 to_table_schema (Optional[str], optional): 

1222 The schema of the table where it will be saved.<br> 

1223 Defaults to `#!py None`. 

1224 to_table_format (Optional[SPARK_FORMATS], optional): 

1225 The format of the saved table.<br> 

1226 Defaults to `#!py "parquet"`. 

1227 to_table_mode (Optional[WRITE_MODES], optional): 

1228 The behaviour for when the data already exists.<br> 

1229 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.mode`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.mode.html).<br> 

1230 Defaults to `#!py None`. 

1231 to_table_options (Dict[str, str], optional): 

1232 Any additional settings to parse to the writer class.<br> 

1233 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html).<br> 

1234 Defaults to `#!py dict()`. 

1235 to_table_partition_cols (Optional[Union[str_collection, str]], optional): 

1236 The column(s) that the table should partition by.<br> 

1237 Defaults to `#!py None`. 

1238 

1239 Raises: 

1240 TypeError: 

1241 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

1242 

1243 Returns: 

1244 (type(None)): 

1245 Nothing is returned. 

1246 

1247 ???+ tip "Note" 

1248 You know that this function is successful if the table exists at the specified location, and there are no errors thrown. 

1249 

1250 ???+ example "Examples" 

1251 

1252 ```{.py .python linenums="1" title="Set up"} 

1253 >>> # Imports 

1254 >>> import pandas as pd 

1255 >>> from pyspark.sql import SparkSession 

1256 >>> from toolbox_pyspark.io import transfer_by_table 

1257 >>> from toolbox_pyspark.checks import table_exists 

1258 >>> 

1259 >>> # Instantiate Spark 

1260 >>> spark = SparkSession.builder.getOrCreate() 

1261 >>> 

1262 >>> # Create data 

1263 >>> df = pd.DataFrame( 

1264 ... { 

1265 ... "a": [1, 2, 3, 4], 

1266 ... "b": ["a", "b", "c", "d"], 

1267 ... "c": [1, 1, 1, 1], 

1268 ... "d": ["2", "2", "2", "2"], 

1269 ... } 

1270 ... ) 

1271 >>> df.to_parquet("./test/table.parquet") 

1272 >>> spark.read.parquet("./test/table.parquet").createOrReplaceTempView("test_table") 

1273 ``` 

1274 

1275 ```{.py .python linenums="1" title="Example 1: Transfer Table"} 

1276 >>> transfer_by_table( 

1277 ... spark_session=spark, 

1278 ... from_table_name="test_table", 

1279 ... from_table_schema="default", 

1280 ... from_table_format="parquet", 

1281 ... to_table_name="new_table", 

1282 ... to_table_schema="default", 

1283 ... to_table_format="parquet", 

1284 ... to_table_mode="overwrite", 

1285 ... ) 

1286 >>> 

1287 >>> table_exists( 

1288 ... name="new_table", 

1289 ... schema="default", 

1290 ... data_format="parquet", 

1291 ... spark_session=spark, 

1292 ... ) 

1293 ``` 

1294 <div class="result" markdown> 

1295 ```{.sh .shell title="Terminal"} 

1296 True 

1297 ``` 

1298 !!! success "Conclusion: Successfully transferred table." 

1299 </div> 

1300 

1301 ```{.py .python linenums="1" title="Example 2: Invalid table structure"} 

1302 >>> transfer_by_table( 

1303 ... spark_session=spark, 

1304 ... from_table_name="schema.test_table", 

1305 ... from_table_schema="source", 

1306 ... from_table_format="parquet", 

1307 ... to_table_name="new_table", 

1308 ... to_table_schema="default", 

1309 ... to_table_format="parquet", 

1310 ... to_table_mode="overwrite", 

1311 ... ) 

1312 ``` 

1313 <div class="result" markdown> 

1314 ```{.txt .text title="Terminal"} 

1315 Invalid table. Should be in the format `schema.table`: `source.schema.test_table`. 

1316 ``` 

1317 !!! failure "Conclusion: Failed to transfer table due to invalid table structure." 

1318 </div> 

1319 

1320 ??? tip "See Also" 

1321 - [`transfer`][toolbox_pyspark.io.transfer] 

1322 """ 

1323 

1324 # Read from source ---- 

1325 source_table: psDataFrame = read_from_table( 

1326 name=from_table_name, 

1327 schema=from_table_schema, 

1328 spark_session=spark_session, 

1329 data_format=from_table_format, 

1330 read_options=from_table_options, 

1331 ) 

1332 

1333 # Write to target ---- 

1334 write_to_table( 

1335 data_frame=source_table, 

1336 name=to_table_name, 

1337 schema=to_table_schema, 

1338 data_format=to_table_format, 

1339 mode=to_table_mode, 

1340 write_options=to_table_options, 

1341 partition_cols=to_table_partition_cols, 

1342 ) 

1343 

1344 

1345# ---------------------------------------------------------------------------- # 

1346# # 

1347# Combined Functions #### 

1348# # 

1349# ---------------------------------------------------------------------------- # 

1350 

1351 

1352## --------------------------------------------------------------------------- # 

1353## Read #### 

1354## --------------------------------------------------------------------------- # 

1355 

1356 

1357@typechecked 

1358def read( 

1359 spark_session: SparkSession, 

1360 name: str, 

1361 method: Literal["table", "path"], 

1362 path: Optional[str] = None, 

1363 schema: Optional[str] = None, 

1364 data_format: Optional[SPARK_FORMATS] = "parquet", 

1365 read_options: Optional[str_dict] = None, 

1366) -> psDataFrame: 

1367 """ 

1368 !!! note "Summary" 

1369 Read a table or file from a given `path` or `schema` and `name` into memory as a `pyspark` dataframe. 

1370 

1371 ???+ abstract "Details" 

1372 This function serves as a unified interface for reading data into a `pyspark` dataframe. Depending on the `method` parameter, it will either read from a file path or a table. 

1373 

1374 - If `method` is `#!py "path"`, the function will use the `read_from_path` function to read the data from the specified `path` and `name`. 

1375 - If `method` is `#!py "table"`, the function will use the `read_from_table` function to read the data from the specified `schema` and `name`. 

1376 

1377 Params: 

1378 spark_session (SparkSession): 

1379 The Spark session to use for the reading. 

1380 name (str): 

1381 The name of the table or file to read in. 

1382 method (Literal["table", "path"]): 

1383 The method to use for reading the data. Either `#!py "table"` or `#!py "path"`. 

1384 path (Optional[str], optional): 

1385 The path from which the file will be read. Required if `method` is `#!py "path"`.<br> 

1386 Defaults to `#!py None`. 

1387 schema (Optional[str], optional): 

1388 The schema of the table to read in. Required if `method` is `#!py "table"`.<br> 

1389 Defaults to `#!py None`. 

1390 data_format (Optional[SPARK_FORMATS], optional): 

1391 The format of the data.<br> 

1392 Defaults to `#!py "parquet"`. 

1393 read_options (Dict[str, str], optional): 

1394 Any additional options to parse to the Spark reader.<br> 

1395 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameReader.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.options.html).<br> 

1396 Defaults to `#!py dict()`. 

1397 

1398 Raises: 

1399 TypeError: 

1400 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

1401 ValidationError: 

1402 If `name` contains `/`, or is structured with three elements like: `source.schema.table`. 

1403 

1404 Returns: 

1405 (psDataFrame): 

1406 The loaded dataframe. 

1407 

1408 ???+ example "Examples" 

1409 

1410 ```{.py .python linenums="1" title="Set up"} 

1411 >>> # Imports 

1412 >>> import pandas as pd 

1413 >>> from pyspark.sql import SparkSession 

1414 >>> from toolbox_pyspark.io import read 

1415 >>> 

1416 >>> # Instantiate Spark 

1417 >>> spark = SparkSession.builder.getOrCreate() 

1418 >>> 

1419 >>> # Create data 

1420 >>> df = pd.DataFrame( 

1421 ... { 

1422 ... "a": [1, 2, 3, 4], 

1423 ... "b": ["a", "b", "c", "d"], 

1424 ... "c": [1, 1, 1, 1], 

1425 ... "d": ["2", "2", "2", "2"], 

1426 ... } 

1427 ... ) 

1428 >>> df.to_csv("./test/table.csv") 

1429 >>> df.to_parquet("./test/table.parquet") 

1430 >>> spark.read.parquet("./test/table.parquet").createOrReplaceTempView("test_table") 

1431 ``` 

1432 

1433 ```{.py .python linenums="1" title="Example 1: Read from Path"} 

1434 >>> df_path = read( 

1435 ... spark_session=spark, 

1436 ... name="table.csv", 

1437 ... method="path", 

1438 ... path="./test", 

1439 ... data_format="csv", 

1440 ... read_options={"header": "true"}, 

1441 ... ) 

1442 >>> 

1443 >>> df_path.show() 

1444 ``` 

1445 <div class="result" markdown> 

1446 ```{.txt .text title="Terminal"} 

1447 +---+---+---+---+ 

1448 | a | b | c | d | 

1449 +---+---+---+---+ 

1450 | 1 | a | 1 | 2 | 

1451 | 2 | b | 1 | 2 | 

1452 | 3 | c | 1 | 2 | 

1453 | 4 | d | 1 | 2 | 

1454 +---+---+---+---+ 

1455 ``` 

1456 !!! success "Conclusion: Successfully read from path." 

1457 </div> 

1458 

1459 ```{.py .python linenums="1" title="Example 2: Read from Table"} 

1460 >>> df_table = read( 

1461 ... spark_session=spark, 

1462 ... name="test_table", 

1463 ... method="table", 

1464 ... schema="default", 

1465 ... data_format="parquet", 

1466 ... ) 

1467 >>> 

1468 >>> df_table.show() 

1469 ``` 

1470 <div class="result" markdown> 

1471 ```{.txt .text title="Terminal"} 

1472 +---+---+---+---+ 

1473 | a | b | c | d | 

1474 +---+---+---+---+ 

1475 | 1 | a | 1 | 2 | 

1476 | 2 | b | 1 | 2 | 

1477 | 3 | c | 1 | 2 | 

1478 | 4 | d | 1 | 2 | 

1479 +---+---+---+---+ 

1480 ``` 

1481 !!! success "Conclusion: Successfully read from table." 

1482 </div> 

1483 

1484 ```{.py .python linenums="1" title="Example 3: Invalid Path"} 

1485 >>> df_invalid_path = read( 

1486 ... spark_session=spark, 

1487 ... name="invalid_table.csv", 

1488 ... method="path", 

1489 ... path="./invalid_path", 

1490 ... data_format="csv", 

1491 ... read_options={"header": "true"}, 

1492 ... ) 

1493 ``` 

1494 <div class="result" markdown> 

1495 ```{.txt .text title="Terminal"} 

1496 Py4JJavaError: An error occurred while calling o45.load. 

1497 ``` 

1498 !!! failure "Conclusion: Failed to read from invalid path." 

1499 </div> 

1500 

1501 ```{.py .python linenums="1" title="Example 4: Invalid Table Structure"} 

1502 >>> df_invalid_table = read( 

1503 ... spark_session=spark, 

1504 ... name="schema.invalid_table", 

1505 ... method="table", 

1506 ... schema="source", 

1507 ... data_format="parquet", 

1508 ... ) 

1509 ``` 

1510 <div class="result" markdown> 

1511 ```{.txt .text title="Terminal"} 

1512 Invalid table. Should be in the format `schema.table`: `source.schema.invalid_table`. 

1513 ``` 

1514 !!! failure "Conclusion: Failed to read from invalid table structure." 

1515 </div> 

1516 

1517 ??? tip "See Also" 

1518 - [`read_from_path`][toolbox_pyspark.io.read_from_path] 

1519 - [`read_from_table`][toolbox_pyspark.io.read_from_table] 

1520 - [`load`][toolbox_pyspark.io.load] 

1521 """ 

1522 

1523 if method == "table": 

1524 return read_from_table( 

1525 spark_session=spark_session, 

1526 name=name, 

1527 schema=schema, 

1528 data_format=data_format, 

1529 read_options=read_options, 

1530 ) 

1531 if method == "path": 

1532 return read_from_path( 

1533 spark_session=spark_session, 

1534 name=name, 

1535 path=path, 

1536 data_format=data_format, 

1537 read_options=read_options, 

1538 ) 

1539 

1540 

1541## --------------------------------------------------------------------------- # 

1542## Write #### 

1543## --------------------------------------------------------------------------- # 

1544 

1545 

1546@typechecked 

1547def write( 

1548 data_frame: psDataFrame, 

1549 name: str, 

1550 method: Literal["table", "path"], 

1551 path: Optional[str] = None, 

1552 schema: Optional[str] = None, 

1553 data_format: Optional[SPARK_FORMATS] = "parquet", 

1554 mode: Optional[WRITE_MODES] = None, 

1555 write_options: Optional[str_dict] = None, 

1556 partition_cols: Optional[str_collection] = None, 

1557) -> None: 

1558 """ 

1559 !!! note "Summary" 

1560 Write a dataframe to a specified `path` or `schema` and `name` with format `data_format`. 

1561 

1562 ???+ abstract "Details" 

1563 This function serves as a unified interface for writing data from a `pyspark` dataframe. Depending on the `method` parameter, it will either write to a file path or a table. 

1564 

1565 - If `method` is `#!py "path"`, the function will use the `write_to_path` function to write the data to the specified `path` and `name`. 

1566 - If `method` is `#!py "table"`, the function will use the `write_to_table` function to write the data to the specified `schema` and `name`. 

1567 

1568 Params: 

1569 data_frame (psDataFrame): 

1570 The DataFrame to be written. Must be a valid `pyspark` DataFrame ([`pyspark.sql.DataFrame`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)). 

1571 name (str): 

1572 The name of the table or file where it will be written. 

1573 method (Literal["table", "path"]): 

1574 The method to use for writing the data. Either `#!py "table"` or `#!py "path"`. 

1575 path (Optional[str], optional): 

1576 The path location for where to save the table. Required if `method` is `#!py "path"`.<br> 

1577 Defaults to `#!py None`. 

1578 schema (Optional[str], optional): 

1579 The schema of the table where it will be written. Required if `method` is `#!py "table"`.<br> 

1580 Defaults to `#!py None`. 

1581 data_format (Optional[SPARK_FORMATS], optional): 

1582 The format that the `data_frame` will be written to.<br> 

1583 Defaults to `#!py "parquet"`. 

1584 mode (Optional[WRITE_MODES], optional): 

1585 The behaviour for when the data already exists.<br> 

1586 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.mode`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.mode.html).<br> 

1587 Defaults to `#!py None`. 

1588 write_options (Dict[str, str], optional): 

1589 Any additional settings to parse to the writer class.<br> 

1590 Like, for example: 

1591 

1592 - If you are writing to a Delta object, and wanted to overwrite the schema: `#!py {"overwriteSchema": "true"}`. 

1593 - If you're writing to a CSV file, and wanted to specify the header row: `#!py {"header": "true"}`. 

1594 

1595 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html).<br> 

1596 Defaults to `#!py dict()`. 

1597 partition_cols (Optional[Union[str_collection, str]], optional): 

1598 The column(s) that the table should partition by.<br> 

1599 Defaults to `#!py None`. 

1600 

1601 Raises: 

1602 TypeError: 

1603 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

1604 ValidationError: 

1605 If `name` contains `/`, or is structured with three elements like: `source.schema.table`. 

1606 

1607 Returns: 

1608 (type(None)): 

1609 Nothing is returned. 

1610 

1611 ???+ tip "Note" 

1612 You know that this function is successful if the table or file exists at the specified location, and there are no errors thrown. 

1613 

1614 ???+ example "Examples" 

1615 

1616 ```{.py .python linenums="1" title="Set up"} 

1617 >>> # Imports 

1618 >>> import pandas as pd 

1619 >>> from pyspark.sql import SparkSession 

1620 >>> from toolbox_pyspark.io import write 

1621 >>> from toolbox_pyspark.checks import table_exists 

1622 >>> 

1623 >>> # Instantiate Spark 

1624 >>> spark = SparkSession.builder.getOrCreate() 

1625 >>> 

1626 >>> # Create data 

1627 >>> df = spark.createDataFrame( 

1628 ... pd.DataFrame( 

1629 ... { 

1630 ... "a": [1, 2, 3, 4], 

1631 ... "b": ["a", "b", "c", "d"], 

1632 ... "c": [1, 1, 1, 1], 

1633 ... "d": ["2", "2", "2", "2"], 

1634 ... } 

1635 ... ) 

1636 ... ) 

1637 ``` 

1638 

1639 ```{.py .python linenums="1" title="Check"} 

1640 >>> df.show() 

1641 ``` 

1642 <div class="result" markdown> 

1643 ```{.txt .text title="Terminal"} 

1644 +---+---+---+---+ 

1645 | a | b | c | d | 

1646 +---+---+---+---+ 

1647 | 1 | a | 1 | 2 | 

1648 | 2 | b | 1 | 2 | 

1649 | 3 | c | 1 | 2 | 

1650 | 4 | d | 1 | 2 | 

1651 +---+---+---+---+ 

1652 ``` 

1653 </div> 

1654 

1655 ```{.py .python linenums="1" title="Example 1: Write to Path"} 

1656 >>> write( 

1657 ... data_frame=df, 

1658 ... name="df.csv", 

1659 ... method="path", 

1660 ... path="./test", 

1661 ... data_format="csv", 

1662 ... mode="overwrite", 

1663 ... write_options={"header": "true"}, 

1664 ... ) 

1665 >>> 

1666 >>> table_exists( 

1667 ... name="df.csv", 

1668 ... path="./test", 

1669 ... data_format="csv", 

1670 ... spark_session=df.sparkSession, 

1671 ... ) 

1672 ``` 

1673 <div class="result" markdown> 

1674 ```{.sh .shell title="Terminal"} 

1675 True 

1676 ``` 

1677 !!! success "Conclusion: Successfully written to path." 

1678 </div> 

1679 

1680 ```{.py .python linenums="1" title="Example 2: Write to Table"} 

1681 >>> write( 

1682 ... data_frame=df, 

1683 ... name="test_table", 

1684 ... method="table", 

1685 ... schema="default", 

1686 ... data_format="parquet", 

1687 ... mode="overwrite", 

1688 ... ) 

1689 >>> 

1690 >>> table_exists( 

1691 ... name="test_table", 

1692 ... schema="default", 

1693 ... data_format="parquet", 

1694 ... spark_session=df.sparkSession, 

1695 ... ) 

1696 ``` 

1697 <div class="result" markdown> 

1698 ```{.sh .shell title="Terminal"} 

1699 True 

1700 ``` 

1701 !!! success "Conclusion: Successfully written to table." 

1702 </div> 

1703 

1704 ```{.py .python linenums="1" title="Example 3: Invalid Path"} 

1705 >>> write( 

1706 ... data_frame=df, 

1707 ... name="df.csv", 

1708 ... method="path", 

1709 ... path="./invalid_path", 

1710 ... data_format="csv", 

1711 ... mode="overwrite", 

1712 ... write_options={"header": "true"}, 

1713 ... ) 

1714 ``` 

1715 <div class="result" markdown> 

1716 ```{.txt .text title="Terminal"} 

1717 Py4JJavaError: An error occurred while calling o45.save. 

1718 ``` 

1719 !!! failure "Conclusion: Failed to write to invalid path." 

1720 </div> 

1721 

1722 ```{.py .python linenums="1" title="Example 4: Invalid Table Structure"} 

1723 >>> write( 

1724 ... data_frame=df, 

1725 ... name="schema.test_table", 

1726 ... method="table", 

1727 ... schema="source", 

1728 ... data_format="parquet", 

1729 ... mode="overwrite", 

1730 ... ) 

1731 ``` 

1732 <div class="result" markdown> 

1733 ```{.txt .text title="Terminal"} 

1734 Invalid table. Should be in the format `schema.table`: `source.schema.test_table`. 

1735 ``` 

1736 !!! failure "Conclusion: Failed to write to table due to invalid table structure." 

1737 </div> 

1738 

1739 ??? tip "See Also" 

1740 - [`write_to_path`][toolbox_pyspark.io.write_to_path] 

1741 - [`write_to_table`][toolbox_pyspark.io.write_to_table] 

1742 - [`save`][toolbox_pyspark.io.save] 

1743 """ 

1744 

1745 if method == "table": 

1746 write_to_table( 

1747 data_frame=data_frame, 

1748 name=name, 

1749 schema=schema, 

1750 data_format=data_format, 

1751 mode=mode, 

1752 write_options=write_options, 

1753 partition_cols=partition_cols, 

1754 ) 

1755 if method == "path": 

1756 write_to_path( 

1757 data_frame=data_frame, 

1758 name=name, 

1759 path=path, 

1760 data_format=data_format, 

1761 mode=mode, 

1762 write_options=write_options, 

1763 partition_cols=partition_cols, 

1764 ) 

1765 

1766 

1767## --------------------------------------------------------------------------- # 

1768## Transfer #### 

1769## --------------------------------------------------------------------------- # 

1770 

1771 

1772@typechecked 

1773def transfer( 

1774 spark_session: SparkSession, 

1775 from_table_name: str, 

1776 to_table_name: str, 

1777 method: Literal["table", "path"], 

1778 from_table_path: Optional[str] = None, 

1779 from_table_schema: Optional[str] = None, 

1780 from_table_format: Optional[SPARK_FORMATS] = "parquet", 

1781 from_table_options: Optional[str_dict] = None, 

1782 to_table_path: Optional[str] = None, 

1783 to_table_schema: Optional[str] = None, 

1784 to_table_format: Optional[SPARK_FORMATS] = "parquet", 

1785 to_table_mode: Optional[WRITE_MODES] = None, 

1786 to_table_options: Optional[str_dict] = None, 

1787 to_partition_cols: Optional[str_collection] = None, 

1788) -> None: 

1789 """ 

1790 !!! note "Summary" 

1791 Transfer a table or file from one location to another. 

1792 

1793 ???+ abstract "Details" 

1794 This function serves as a unified interface for transferring data from one location to another. Depending on the `method` parameter, it will either transfer from a file path or a table. 

1795 

1796 - If `method` is `#!py "path"`, the function will use the `transfer_by_path` function to transfer the data from the specified `from_table_path` and `from_table_name` to the specified `to_table_path` and `to_table_name`. 

1797 - If `method` is `#!py "table"`, the function will use the `transfer_by_table` function to transfer the data from the specified `from_table_schema` and `from_table_name` to the specified `to_table_schema` and `to_table_name`. 

1798 

1799 Params: 

1800 spark_session (SparkSession): 

1801 The Spark session to use for the transfer. 

1802 from_table_name (str): 

1803 The name of the table or file to be transferred. 

1804 to_table_name (str): 

1805 The name of the table or file where it will be transferred. 

1806 method (Literal["table", "path"]): 

1807 The method to use for transferring the data. Either `#!py "table"` or `#!py "path"`. 

1808 from_table_path (Optional[str], optional): 

1809 The path from which the file will be transferred. Required if `method` is `#!py "path"`.<br> 

1810 Defaults to `#!py None`. 

1811 from_table_schema (Optional[str], optional): 

1812 The schema of the table to be transferred. Required if `method` is `#!py "table"`.<br> 

1813 Defaults to `#!py None`. 

1814 from_table_format (Optional[SPARK_FORMATS], optional): 

1815 The format of the data at the source location.<br> 

1816 Defaults to `#!py "parquet"`. 

1817 from_table_options (Dict[str, str], optional): 

1818 Any additional options to parse to the Spark reader.<br> 

1819 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameReader.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.options.html).<br> 

1820 Defaults to `#!py dict()`. 

1821 to_table_path (Optional[str], optional): 

1822 The path location for where to save the table. Required if `method` is `#!py "path"`.<br> 

1823 Defaults to `#!py None`. 

1824 to_table_schema (Optional[str], optional): 

1825 The schema of the table where it will be saved. Required if `method` is `#!py "table"`.<br> 

1826 Defaults to `#!py None`. 

1827 to_table_format (Optional[SPARK_FORMATS], optional): 

1828 The format of the saved table.<br> 

1829 Defaults to `#!py "parquet"`. 

1830 to_table_mode (Optional[WRITE_MODES], optional): 

1831 The behaviour for when the data already exists.<br> 

1832 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.mode`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.mode.html).<br> 

1833 Defaults to `#!py None`. 

1834 to_table_options (Dict[str, str], optional): 

1835 Any additional settings to parse to the writer class.<br> 

1836 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html).<br> 

1837 Defaults to `#!py dict()`. 

1838 to_partition_cols (Optional[Union[str_collection, str]], optional): 

1839 The column(s) that the table should partition by.<br> 

1840 Defaults to `#!py None`. 

1841 

1842 Raises: 

1843 TypeError: 

1844 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

1845 ValidationError: 

1846 If `from_table_name` or `to_table_name` contains `/`, or is structured with three elements like: `source.schema.table`. 

1847 

1848 Returns: 

1849 (type(None)): 

1850 Nothing is returned. 

1851 

1852 ???+ tip "Note" 

1853 You know that this function is successful if the table or file exists at the specified location, and there are no errors thrown. 

1854 

1855 ???+ example "Examples" 

1856 

1857 ```{.py .python linenums="1" title="Set up"} 

1858 >>> # Imports 

1859 >>> import pandas as pd 

1860 >>> from pyspark.sql import SparkSession 

1861 >>> from toolbox_pyspark.io import transfer 

1862 >>> from toolbox_pyspark.checks import table_exists 

1863 >>> 

1864 >>> # Instantiate Spark 

1865 >>> spark = SparkSession.builder.getOrCreate() 

1866 >>> 

1867 >>> # Create data 

1868 >>> df = pd.DataFrame( 

1869 ... { 

1870 ... "a": [1, 2, 3, 4], 

1871 ... "b": ["a", "b", "c", "d"], 

1872 ... "c": [1, 1, 1, 1], 

1873 ... "d": ["2", "2", "2", "2"], 

1874 ... } 

1875 ... ) 

1876 >>> df.to_csv("./test/table.csv") 

1877 >>> df.to_parquet("./test/table.parquet") 

1878 >>> spark.read.parquet("./test/table.parquet").createOrReplaceTempView("test_table") 

1879 ``` 

1880 

1881 ```{.py .python linenums="1" title="Example 1: Transfer from Path"} 

1882 >>> transfer( 

1883 ... spark_session=spark, 

1884 ... method="path", 

1885 ... from_table_name="table.csv", 

1886 ... from_table_path="./test", 

1887 ... from_table_format="csv", 

1888 ... from_table_options={"header": "true"}, 

1889 ... to_table_name="new_table.csv", 

1890 ... to_table_path="./other", 

1891 ... to_table_format="csv", 

1892 ... to_table_mode="overwrite", 

1893 ... to_table_options={"header": "true"}, 

1894 ... ) 

1895 >>> 

1896 >>> table_exists( 

1897 ... name="new_table.csv", 

1898 ... path="./other", 

1899 ... data_format="csv", 

1900 ... spark_session=spark, 

1901 ... ) 

1902 ``` 

1903 <div class="result" markdown> 

1904 ```{.sh .shell title="Terminal"} 

1905 True 

1906 ``` 

1907 !!! success "Conclusion: Successfully transferred from path." 

1908 </div> 

1909 

1910 ```{.py .python linenums="1" title="Example 2: Transfer from Table"} 

1911 >>> transfer( 

1912 ... spark_session=spark, 

1913 ... method="table", 

1914 ... from_table_name="test_table", 

1915 ... from_table_schema="default", 

1916 ... from_table_format="parquet", 

1917 ... to_table_name="new_table", 

1918 ... to_table_schema="default", 

1919 ... to_table_format="parquet", 

1920 ... to_table_mode="overwrite", 

1921 ... ) 

1922 >>> 

1923 >>> table_exists( 

1924 ... name="new_table", 

1925 ... schema="default", 

1926 ... data_format="parquet", 

1927 ... spark_session=spark, 

1928 ... ) 

1929 ``` 

1930 <div class="result" markdown> 

1931 ```{.sh .shell title="Terminal"} 

1932 True 

1933 ``` 

1934 !!! success "Conclusion: Successfully transferred from table." 

1935 </div> 

1936 

1937 ```{.py .python linenums="1" title="Example 3: Invalid Path"} 

1938 >>> transfer( 

1939 ... spark_session=spark, 

1940 ... method="path", 

1941 ... from_table_name="table.csv", 

1942 ... from_table_path="./invalid_path", 

1943 ... from_table_format="csv", 

1944 ... from_table_options={"header": "true"}, 

1945 ... to_table_name="new_table.csv", 

1946 ... to_table_path="./other", 

1947 ... to_table_format="csv", 

1948 ... to_table_mode="overwrite", 

1949 ... to_table_options={"header": "true"}, 

1950 ... ) 

1951 ``` 

1952 <div class="result" markdown> 

1953 ```{.txt .text title="Terminal"} 

1954 Py4JJavaError: An error occurred while calling o45.load. 

1955 ``` 

1956 !!! failure "Conclusion: Failed to transfer from invalid path." 

1957 </div> 

1958 

1959 ```{.py .python linenums="1" title="Example 4: Invalid Table Structure"} 

1960 >>> transfer( 

1961 ... spark_session=spark, 

1962 ... method="table", 

1963 ... from_table_name="schema.test_table", 

1964 ... from_table_schema="source", 

1965 ... from_table_format="parquet", 

1966 ... to_table_name="new_table", 

1967 ... to_table_schema="default", 

1968 ... to_table_format="parquet", 

1969 ... to_table_mode="overwrite", 

1970 ... ) 

1971 ``` 

1972 <div class="result" markdown> 

1973 ```{.txt .text title="Terminal"} 

1974 Invalid table. Should be in the format `schema.table`: `source.schema.test_table`. 

1975 ``` 

1976 !!! failure "Conclusion: Failed to transfer from invalid table structure." 

1977 </div> 

1978 

1979 ??? tip "See Also" 

1980 - [`transfer_by_path`][toolbox_pyspark.io.transfer_by_path] 

1981 - [`transfer_by_table`][toolbox_pyspark.io.transfer_by_table] 

1982 """ 

1983 

1984 if method == "table": 

1985 transfer_by_table( 

1986 spark_session=spark_session, 

1987 from_table_name=from_table_name, 

1988 to_table_name=to_table_name, 

1989 from_table_schema=from_table_schema, 

1990 from_table_format=from_table_format, 

1991 from_table_options=from_table_options, 

1992 to_table_schema=to_table_schema, 

1993 to_table_format=to_table_format, 

1994 to_table_mode=to_table_mode, 

1995 to_table_options=to_table_options, 

1996 to_table_partition_cols=to_partition_cols, 

1997 ) 

1998 if method == "path": 

1999 transfer_by_path( 

2000 spark_session=spark_session, 

2001 from_table_path=from_table_path, 

2002 from_table_name=from_table_name, 

2003 from_table_format=from_table_format, 

2004 to_table_path=to_table_path, 

2005 to_table_name=to_table_name, 

2006 to_table_format=to_table_format, 

2007 from_table_options=from_table_options, 

2008 to_table_mode=to_table_mode, 

2009 to_table_options=to_table_options, 

2010 to_table_partition_cols=to_partition_cols, 

2011 ) 

2012 

2013 

2014# ---------------------------------------------------------------------------- # 

2015# # 

2016# Aliases #### 

2017# # 

2018# ---------------------------------------------------------------------------- # 

2019 

2020load_from_path = read_from_path 

2021save_to_path = write_to_path 

2022load_from_table = read_from_table 

2023save_to_table = write_to_table 

2024load = read 

2025save = write