Coverage for src/toolbox_pyspark/io.py: 100%
96 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-25 23:08 +0000
1# ============================================================================ #
2# #
3# Title : IO #
4# Purpose : Read and write tables to/from directories. #
5# #
6# ============================================================================ #
9# ---------------------------------------------------------------------------- #
10# #
11# Overview ####
12# #
13# ---------------------------------------------------------------------------- #
16# ---------------------------------------------------------------------------- #
17# Description ####
18# ---------------------------------------------------------------------------- #
21"""
22!!! note "Summary"
23 The `io` module is used for reading and writing tables to/from directories.
24"""
27# ---------------------------------------------------------------------------- #
28# #
29# Setup ####
30# #
31# ---------------------------------------------------------------------------- #
34# ---------------------------------------------------------------------------- #
35# Imports ####
36# ---------------------------------------------------------------------------- #
39# ## Python StdLib Imports ----
40from typing import Literal, Optional, get_args
42# ## Python Third Party Imports ----
43from pyspark.sql import DataFrame as psDataFrame, SparkSession
44from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter
45from toolbox_python.checkers import is_type
46from toolbox_python.collection_types import (
47 str_collection,
48 str_dict,
49 str_list,
50 str_tuple,
51)
52from typeguard import typechecked
54# ## Local First Party Imports ----
55from toolbox_pyspark.utils.exceptions import ValidationError
58# ---------------------------------------------------------------------------- #
59# Exports ####
60# ---------------------------------------------------------------------------- #
63__all__: str_list = [
64 "SPARK_FORMATS",
65 "VALID_SPARK_FORMATS",
66 "WRITE_MODES",
67 "VALID_WRITE_MODES",
68 "read_from_path",
69 "write_to_path",
70 "transfer_by_path",
71 "read_from_table",
72 "write_to_table",
73 "transfer_by_table",
74 "read",
75 "write",
76 "transfer",
77 "load_from_path",
78 "save_to_path",
79 "load_from_table",
80 "save_to_table",
81 "load",
82 "save",
83]
86## --------------------------------------------------------------------------- #
87## Constants ####
88## --------------------------------------------------------------------------- #
91### Data formats ----
92SPARK_FORMATS = Literal[
93 # Built-in formats
94 "parquet",
95 "orc",
96 "json",
97 "csv",
98 "text",
99 "avro",
100 # Database formats (requires JDBC drivers)
101 "jdbc",
102 "oracle",
103 "mysql",
104 "postgresql",
105 "mssql",
106 "db2",
107 # Other formats (requires dependencies)
108 "delta", # <-- Requires: `io.delta:delta-core` dependency and `delata-spark` package
109 "xml", # <-- Requires: `com.databricks:spark-xml` dependency and `spark-xml` package
110 "excel", # <-- Requires: `com.crealytics:spark-excel` dependency and `spark-excel` package
111 "hive", # <-- Requires: Hive support
112 "mongodb", # <-- Requires: `org.mongodb.spark:mongo-spark-connector` dependency and `mongo-spark-connector` package
113 "cassandra", # <-- Requires: `com.datastax.spark:spark-cassandra-connector` dependency and `spark-cassandra-connector` package
114 "elasticsearch", # <-- Requires: `org.elasticsearch:elasticsearch-hadoop` dependency and `elasticsearch-hadoop` package
115]
116"""
117The valid formats that can be used to read/write data in Spark.
119PySpark's built-in data source formats:
121- `parquet`
122- `orc`
123- `json`
124- `csv`
125- `text`
126- `avro`
128Database formats (with proper JDBC drivers):
130- `jdbc`
131- `oracle`
132- `mysql`
133- `postgresql`
134- `mssql`
135- `db2`
137Other formats with additional dependencies:
139- `delta` (requires: `io.delta:delta-core` dependency and `delata-spark` package)
140- `xml` (requires: `com.databricks:spark-xml` dependency and `spark-xml` package)
141- `excel` (requires: `com.crealytics:spark-excel` dependency and `spark-excel` package)
142- `hive` (requires: Hive support)
143- `mongodb` (requires: `org.mongodb.spark:mongo-spark-connector` dependency and `mongo-spark-connector` package)
144- `cassandra` (requires: `com.datastax.spark:spark-cassandra-connector` dependency and `spark-cassandra-connector` package)
145- `elasticsearch` (requires: `org.elasticsearch:elasticsearch-hadoop` dependency and `elasticsearch-hadoop` package)
146"""
148VALID_SPARK_FORMATS: str_tuple = get_args(SPARK_FORMATS)
149"""
150The valid formats that can be used to read/write data in Spark.
152PySpark's built-in data source formats:
154- `parquet`
155- `orc`
156- `json`
157- `csv`
158- `text`
159- `avro`
161Database formats (with proper JDBC drivers):
163- `jdbc`
164- `oracle`
165- `mysql`
166- `postgresql`
167- `mssql`
168- `db2`
170Other formats with additional dependencies:
172- `delta` (requires: `io.delta:delta-core` dependency and `delata-spark` package)
173- `xml` (requires: `com.databricks:spark-xml` dependency and `spark-xml` package)
174- `excel` (requires: `com.crealytics:spark-excel` dependency and `spark-excel` package)
175- `hive` (requires: Hive support)
176- `mongodb` (requires: `org.mongodb.spark:mongo-spark-connector` dependency and `mongo-spark-connector` package)
177- `cassandra` (requires: `com.datastax.spark:spark-cassandra-connector` dependency and `spark-cassandra-connector` package)
178- `elasticsearch` (requires: `org.elasticsearch:elasticsearch-hadoop` dependency and `elasticsearch-hadoop` package)
179"""
182### Write modes ----
183WRITE_MODES = Literal["append", "overwrite", "ignore", "error", "errorifexists"]
184"""
185The valid modes you can use for writing data frames:
187- `append`
188- `overwrite`
189- `ignore`
190- `error`
191- `errorifexists`
192"""
194VALID_WRITE_MODES: str_tuple = get_args(WRITE_MODES)
195"""
196The valid modes you can use for writing data frames:
198- `append`
199- `overwrite`
200- `ignore`
201- `error`
202- `errorifexists`
203"""
206# ---------------------------------------------------------------------------- #
207# #
208# Path functions ####
209# #
210# ---------------------------------------------------------------------------- #
213# ---------------------------------------------------------------------------- #
214# Read ####
215# ---------------------------------------------------------------------------- #
218@typechecked
219def read_from_path(
220 spark_session: SparkSession,
221 name: str,
222 path: str,
223 data_format: Optional[SPARK_FORMATS] = "parquet",
224 read_options: Optional[str_dict] = None,
225) -> psDataFrame:
226 """
227 !!! note "Summary"
228 Read an object from a given `path` in to memory as a `pyspark` dataframe.
230 Params:
231 spark_session (SparkSession):
232 The Spark session to use for the reading.
233 name (str):
234 The name of the table to read in.
235 path (str):
236 The path from which it will be read.
237 data_format (Optional[SPARK_FORMATS], optional):
238 The format of the object at location `path`.<br>
239 Defaults to `#!py "delta"`.
240 read_options (Dict[str, str], optional):
241 Any additional obtions to parse to the Spark reader.<br>
242 Like, for example:<br>
244 - If the object is a CSV, you may want to define that it has a header row: `#!py {"header": "true"}`.
245 - If the object is a Delta table, you may want to query a specific version: `#!py {versionOf": "0"}`.
247 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameReader.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.options.html).<br>
248 Defaults to `#!py dict()`.
250 Raises:
251 TypeError:
252 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
254 Returns:
255 (psDataFrame):
256 The loaded dataframe.
258 ???+ example "Examples"
260 ```{.py .python linenums="1" title="Set up"}
261 >>> # Imports
262 >>> import pandas as pd
263 >>> from pyspark.sql import SparkSession
264 >>> from toolbox_pyspark.io import read_from_path
265 >>>
266 >>> # Instantiate Spark
267 >>> spark = SparkSession.builder.getOrCreate()
268 >>>
269 >>> # Create data
270 >>> df = pd.DataFrame(
271 ... {
272 ... "a": [1, 2, 3, 4],
273 ... "b": ["a", "b", "c", "d"],
274 ... "c": [1, 1, 1, 1],
275 ... "d": ["2", "2", "2", "2"],
276 ... }
277 ... )
278 >>>
279 >>> # Write data
280 >>> df.to_csv("./test/table.csv")
281 >>> df.to_parquet("./test/table.parquet")
282 ```
284 ```{.py .python linenums="1" title="Check"}
285 >>> import os
286 >>> print(os.listdir("./test"))
287 ```
288 <div class="result" markdown>
289 ```{.sh .shell title="Terminal"}
290 ["table.csv", "table.parquet"]
291 ```
292 </div>
294 ```{.py .python linenums="1" title="Example 1: Read CSV"}
295 >>> df_csv = read_from_path(
296 ... name="table.csv",
297 ... path="./test",
298 ... spark_session=spark,
299 ... data_format="csv",
300 ... options={"header": "true"},
301 ... )
302 >>>
303 >>> df_csv.show()
304 ```
305 <div class="result" markdown>
306 ```{.txt .text title="Terminal"}
307 +---+---+---+---+
308 | a | b | c | d |
309 +---+---+---+---+
310 | 1 | a | 1 | 2 |
311 | 2 | b | 1 | 2 |
312 | 3 | c | 1 | 2 |
313 | 4 | d | 1 | 2 |
314 +---+---+---+---+
315 ```
316 !!! success "Conclusion: Successfully read CSV."
317 </div>
319 ```{.py .python linenums="1" title="Example 2: Read Parquet"}
320 >>> df_parquet = read_from_path(
321 ... name="table.parquet",
322 ... path="./test",
323 ... spark_session=spark,
324 ... data_format="parquet",
325 ... )
326 >>>
327 >>> df_parquet.show()
328 ```
329 <div class="result" markdown>
330 ```{.txt .text title="Terminal"}
331 +---+---+---+---+
332 | a | b | c | d |
333 +---+---+---+---+
334 | 1 | a | 1 | 2 |
335 | 2 | b | 1 | 2 |
336 | 3 | c | 1 | 2 |
337 | 4 | d | 1 | 2 |
338 +---+---+---+---+
339 ```
340 !!! success "Conclusion: Successfully read Parquet."
341 </div>
343 ```{.py .python linenums="1" title="Example 3: Invalid Path"}
344 >>> df_invalid_path = read_from_path(
345 ... name="invalid_table.csv",
346 ... path="./invalid_path",
347 ... spark_session=spark,
348 ... data_format="csv",
349 ... options={"header": "true"},
350 ... )
351 ```
352 <div class="result" markdown>
353 ```{.txt .text title="Terminal"}
354 Py4JJavaError: An error occurred while calling o45.load.
355 ```
356 !!! failure "Conclusion: Failed to read from invalid path."
357 </div>
359 ```{.py .python linenums="1" title="Example 4: Invalid Format"}
360 >>> df_invalid_format = read_from_path(
361 ... name="table.csv",
362 ... path="./test",
363 ... spark_session=spark,
364 ... data_format="invalid_format",
365 ... options={"header": "true"},
366 ... )
367 ```
368 <div class="result" markdown>
369 ```{.txt .text title="Terminal"}
370 Py4JJavaError: An error occurred while calling o45.load.
371 ```
372 !!! failure "Conclusion: Failed to read due to invalid format."
373 </div>
375 ??? tip "See Also"
376 - [`load_from_path`][toolbox_pyspark.io.load_from_path]
377 - [`read`][toolbox_pyspark.io.read]
378 - [`load`][toolbox_pyspark.io.load]
379 """
381 # Set default options ----
382 read_options: str_dict = read_options or dict()
383 data_format: str = data_format or "parquet"
384 load_path: str = f"{path}{'/' if not path.endswith('/') else ''}{name}"
386 # Initialise reader (including data format) ----
387 reader: DataFrameReader = spark_session.read.format(data_format)
389 # Add options (if exists) ----
390 if read_options:
391 reader.options(**read_options)
393 # Load DataFrame ----
394 return reader.load(load_path)
397## --------------------------------------------------------------------------- #
398## Write ####
399## --------------------------------------------------------------------------- #
402@typechecked
403def write_to_path(
404 data_frame: psDataFrame,
405 name: str,
406 path: str,
407 data_format: Optional[SPARK_FORMATS] = "parquet",
408 mode: Optional[WRITE_MODES] = None,
409 write_options: Optional[str_dict] = None,
410 partition_cols: Optional[str_collection] = None,
411) -> None:
412 """
413 !!! note "Summary"
414 For a given `table`, write it out to a specified `path` with name `name` and format `format`.
416 Params:
417 data_frame (psDataFrame):
418 The DataFrame to be written. Must be a valid `pyspark` DataFrame ([`pyspark.sql.DataFrame`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)).
419 name (str):
420 The name of the table where it will be written.
421 path (str):
422 The path location for where to save the table.
423 data_format (Optional[SPARK_FORMATS], optional):
424 The format that the `table` will be written to.<br>
425 Defaults to `#!py "delta"`.
426 mode (Optional[WRITE_MODES], optional):
427 The behaviour for when the data already exists.<br>
428 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.mode`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.mode.html).<br>
429 Defaults to `#!py None`.
430 write_options (Dict[str, str], optional):
431 Any additional settings to parse to the writer class.<br>
432 Like, for example:
434 - If you are writing to a Delta object, and wanted to overwrite the schema: `#!py {"overwriteSchema": "true"}`.
435 - If you"re writing to a CSV file, and wanted to specify the header row: `#!py {"header": "true"}`.
437 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html).<br>
438 Defaults to `#!py dict()`.
439 partition_cols (Optional[Union[str_collection, str]], optional):
440 The column(s) that the table should partition by.<br>
441 Defaults to `#!py None`.
443 Raises:
444 TypeError:
445 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
447 Returns:
448 (type(None)):
449 Nothing is returned.
451 ???+ tip "Note"
452 You know that this function is successful if the table exists at the specified location, and there are no errors thrown.
454 ???+ example "Examples"
456 ```{.py .python linenums="1" title="Set up"}
457 >>> # Imports
458 >>> import pandas as pd
459 >>> from pyspark.sql import SparkSession
460 >>> from toolbox_pyspark.io import write_to_path
461 >>> from toolbox_pyspark.checks import table_exists
462 >>>
463 >>> # Instantiate Spark
464 >>> spark = SparkSession.builder.getOrCreate()
465 >>>
466 >>> # Create data
467 >>> df = spark.createDataFrame(
468 ... pd.DataFrame(
469 ... {
470 ... "a": [1, 2, 3, 4],
471 ... "b": ["a", "b", "c", "d"],
472 ... "c": [1, 1, 1, 1],
473 ... "d": ["2", "2", "2", "2"],
474 ... }
475 ... )
476 ... )
477 ```
479 ```{.py .python linenums="1" title="Check"}
480 >>> df.show()
481 ```
482 <div class="result" markdown>
483 ```{.txt .text title="Terminal"}
484 +---+---+---+---+
485 | a | b | c | d |
486 +---+---+---+---+
487 | 1 | a | 1 | 2 |
488 | 2 | b | 1 | 2 |
489 | 3 | c | 1 | 2 |
490 | 4 | d | 1 | 2 |
491 +---+---+---+---+
492 ```
493 </div>
495 ```{.py .python linenums="1" title="Example 1: Write to CSV"}
496 >>> write_to_path(
497 ... data_frame=df,
498 ... name="df.csv",
499 ... path="./test",
500 ... data_format="csv",
501 ... mode="overwrite",
502 ... options={"header": "true"},
503 ... )
504 >>>
505 >>> table_exists(
506 ... name="df.csv",
507 ... path="./test",
508 ... data_format="csv",
509 ... spark_session=df.sparkSession,
510 ... )
511 ```
512 <div class="result" markdown>
513 ```{.sh .shell title="Terminal"}
514 True
515 ```
516 !!! success "Conclusion: Successfully written to CSV."
517 </div>
519 ```{.py .python linenums="1" title="Example 2: Write to Parquet"}
520 >>> write_to_path(
521 ... data_frame=df,
522 ... name="df.parquet",
523 ... path="./test",
524 ... data_format="parquet",
525 ... mode="overwrite",
526 ... )
527 >>>
528 >>> table_exists(
529 ... name="df.parquet",
530 ... path="./test",
531 ... data_format="parquet",
532 ... spark_session=df.sparkSession,
533 ... )
534 ```
535 <div class="result" markdown>
536 ```{.sh .shell title="Terminal"}
537 True
538 ```
539 !!! success "Conclusion: Successfully written to Parquet."
540 </div>
542 ```{.py .python linenums="1" title="Example 3: Invalid Path"}
543 >>> write_to_path(
544 ... data_frame=df,
545 ... name="df.csv",
546 ... path="./invalid_path",
547 ... data_format="csv",
548 ... mode="overwrite",
549 ... options={"header": "true"},
550 ... )
551 ```
552 <div class="result" markdown>
553 ```{.txt .text title="Terminal"}
554 Py4JJavaError: An error occurred while calling o45.save.
555 ```
556 !!! failure "Conclusion: Failed to write to invalid path."
557 </div>
559 ```{.py .python linenums="1" title="Example 4: Invalid Format"}
560 >>> write_to_path(
561 ... data_frame=df,
562 ... name="df.csv",
563 ... path="./test",
564 ... data_format="invalid_format",
565 ... mode="overwrite",
566 ... options={"header": "true"},
567 ... )
568 ```
569 <div class="result" markdown>
570 ```{.txt .text title="Terminal"}
571 Py4JJavaError: An error occurred while calling o45.save.
572 ```
573 !!! failure "Conclusion: Failed to write due to invalid format."
574 </div>
576 ??? tip "See Also"
577 - [`save_to_path`][toolbox_pyspark.io.save_to_path]
578 - [`write`][toolbox_pyspark.io.write]
579 - [`save`][toolbox_pyspark.io.save]
580 """
582 # Set default options ----
583 write_options: str_dict = write_options or dict()
584 data_format: str = data_format or "parquet"
585 write_path: str = f"{path}{'/' if not path.endswith('/') else ''}{name}"
587 # Initialise writer (including data format) ----
588 writer: DataFrameWriter = data_frame.write.mode(mode).format(data_format)
590 # Add options (if exists) ----
591 if write_options:
592 writer.options(**write_options)
594 # Add partition (if exists) ----
595 if partition_cols is not None:
596 partition_cols = [partition_cols] if is_type(partition_cols, str) else partition_cols
597 writer = writer.partitionBy(list(partition_cols))
599 # Write table ----
600 writer.save(write_path)
603## --------------------------------------------------------------------------- #
604## Transfer ####
605## --------------------------------------------------------------------------- #
608@typechecked
609def transfer_by_path(
610 spark_session: SparkSession,
611 from_table_path: str,
612 from_table_name: str,
613 to_table_path: str,
614 to_table_name: str,
615 from_table_format: Optional[SPARK_FORMATS] = "parquet",
616 from_table_options: Optional[str_dict] = None,
617 to_table_format: Optional[SPARK_FORMATS] = "parquet",
618 to_table_mode: Optional[WRITE_MODES] = None,
619 to_table_options: Optional[str_dict] = None,
620 to_table_partition_cols: Optional[str_collection] = None,
621) -> None:
622 """
623 !!! note "Summary"
624 Copy a table from one location to another.
626 ???+ abstract "Details"
627 This is a blind transfer. There is no validation, no alteration, no adjustments made at all. Simply read directly from one location and move immediately to another location straight away.
629 Params:
630 spark_session (SparkSession):
631 The spark session to use for the transfer. Necessary in order to instantiate the reading process.
632 from_table_path (str):
633 The path from which the table will be read.
634 from_table_name (str):
635 The name of the table to be read.
636 to_table_path (str):
637 The location where to save the table to.
638 to_table_name (str):
639 The name of the table where it will be saved.
640 from_table_format (Optional[SPARK_FORMATS], optional):
641 The format of the data at the reading location.
642 to_table_format (Optional[SPARK_FORMATS], optional):
643 The format of the saved table.
644 from_table_options (Dict[str, str], optional):
645 Any additional obtions to parse to the Spark reader.<br>
646 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameReader.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.options.html).<br>
647 Defaults to `#! dict()`.
648 to_table_mode (Optional[WRITE_MODES], optional):
649 The behaviour for when the data already exists.<br>
650 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.mode`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.mode.html).<br>
651 Defaults to `#!py None`.
652 to_table_options (Dict[str, str], optional):
653 Any additional settings to parse to the writer class.<br>
654 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html).<br>
655 Defaults to `#! dict()`.
656 to_table_partition_cols (Optional[Union[str_collection, str]], optional):
657 The column(s) that the table should partition by.<br>
658 Defaults to `#!py None`.
660 Raises:
661 TypeError:
662 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
664 Returns:
665 (type(None)):
666 Nothing is returned.
668 ???+ tip "Note"
669 You know that this function is successful if the table exists at the specified location, and there are no errors thrown.
671 ???+ example "Examples"
673 ```{.py .python linenums="1" title="Set up"}
674 >>> # Imports
675 >>> import pandas as pd
676 >>> from pyspark.sql import SparkSession
677 >>> from toolbox_pyspark.io import transfer_by_path
678 >>> from toolbox_pyspark.checks import table_exists
679 >>>
680 >>> # Instantiate Spark
681 >>> spark = SparkSession.builder.getOrCreate()
682 >>>
683 >>> # Create data
684 >>> df = pd.DataFrame(
685 ... {
686 ... "a": [1, 2, 3, 4],
687 ... "b": ["a", "b", "c", "d"],
688 ... "c": [1, 1, 1 1],
689 ... "d": ["2", "2", "2", "2"],
690 ... }
691 ... )
692 >>> df.to_csv("./test/table.csv")
693 >>> df.to_parquet("./test/table.parquet")
694 ```
696 ```{.py .python linenums="1" title="Check"}
697 >>> import os
698 >>> print(os.listdir("./test"))
699 ```
700 <div class="result" markdown>
701 ```{.sh .shell title="Terminal"}
702 ["table.csv", "table.parquet"]
703 ```
704 </div>
706 ```{.py .python linenums="1" title="Example 1: Transfer CSV"}
707 >>> transfer_by_path(
708 ... spark_session=spark,
709 ... from_table_path="./test",
710 ... from_table_name="table.csv",
711 ... from_table_format="csv",
712 ... to_table_path="./other",
713 ... to_table_name="table.csv",
714 ... to_table_format="csv",
715 ... from_table_options={"header": "true"},
716 ... to_table_mode="overwrite",
717 ... to_table_options={"header": "true"},
718 ... )
719 >>>
720 >>> table_exists(
721 ... name="df.csv",
722 ... path="./other",
723 ... data_format="csv",
724 ... spark_session=spark,
725 ... )
726 ```
727 <div class="result" markdown>
728 ```{.sh .shell title="Terminal"}
729 True
730 ```
731 !!! success "Conclusion: Successfully transferred CSV to CSV."
732 </div>
734 ```{.py .python linenums="1" title="Example 2: Transfer Parquet"}
735 >>> transfer_by_path(
736 ... spark_session=spark,
737 ... from_table_path="./test",
738 ... from_table_name="table.parquet",
739 ... from_table_format="parquet",
740 ... to_table_path="./other",
741 ... to_table_name="table.parquet",
742 ... to_table_format="parquet",
743 ... to_table_mode="overwrite",
744 ... to_table_options={"overwriteSchema": "true"},
745 ... )
746 >>>
747 >>> table_exists(
748 ... name="df.parquet",
749 ... path="./other",
750 ... data_format="parquet",
751 ... spark_session=spark,
752 ... )
753 ```
754 <div class="result" markdown>
755 ```{.sh .shell title="Terminal"}
756 True
757 ```
758 !!! success "Conclusion: Successfully transferred Parquet to Parquet."
759 </div>
761 ```{.py .python linenums="1" title="Example 3: Transfer CSV to Parquet"}
762 >>> transfer_by_path(
763 ... spark_session=spark,
764 ... from_table_path="./test",
765 ... from_table_name="table.csv",
766 ... from_table_format="csv",
767 ... to_table_path="./other",
768 ... to_table_name="table.parquet",
769 ... to_table_format="parquet",
770 ... to_table_mode="overwrite",
771 ... to_table_options={"overwriteSchema": "true"},
772 ... )
773 >>>
774 >>> table_exists(
775 ... name="df.parquet",
776 ... path="./other",
777 ... data_format="parquet",
778 ... spark_session=spark,
779 ... )
780 ```
781 <div class="result" markdown>
782 ```{.sh .shell title="Terminal"}
783 True
784 ```
785 !!! success "Conclusion: Successfully transferred CSV to Parquet."
786 </div>
788 ```{.py .python linenums="1" title="Example 4: Invalid Source Path"}
789 >>> transfer_by_path(
790 ... spark_session=spark,
791 ... from_table_path="./invalid_path",
792 ... from_table_name="table.csv",
793 ... from_table_format="csv",
794 ... to_table_path="./other",
795 ... to_table_name="table.csv",
796 ... to_table_format="csv",
797 ... from_table_options={"header": "true"},
798 ... to_table_mode="overwrite",
799 ... to_table_options={"header": "true"},
800 ... )
801 ```
802 <div class="result" markdown>
803 ```{.txt .text title="Terminal"}
804 Py4JJavaError: An error occurred while calling o45.load.
805 ```
806 !!! failure "Conclusion: Failed to transfer due to invalid source path."
807 </div>
809 ```{.py .python linenums="1" title="Example 5: Invalid Target Format"}
810 >>> transfer_by_path(
811 ... spark_session=spark,
812 ... from_table_path="./test",
813 ... from_table_name="table.csv",
814 ... from_table_format="csv",
815 ... to_table_path="./other",
816 ... to_table_name="table.csv",
817 ... to_table_format="invalid_format",
818 ... from_table_options={"header": "true"},
819 ... to_table_mode="overwrite",
820 ... to_table_options={"header": "true"},
821 ... )
822 ```
823 <div class="result" markdown>
824 ```{.txt .text title="Terminal"}
825 Py4JJavaError: An error occurred while calling o45.save.
826 ```
827 !!! failure "Conclusion: Failed to transfer due to invalid target format."
828 </div>
830 ??? tip "See Also"
831 - [`transfer`][toolbox_pyspark.io.transfer]
832 """
834 # Read from source ----
835 from_table: psDataFrame = read_from_path(
836 name=from_table_name,
837 path=from_table_path,
838 spark_session=spark_session,
839 data_format=from_table_format,
840 read_options=from_table_options,
841 )
843 # Write to target ----
844 write_to_path(
845 data_frame=from_table,
846 name=to_table_name,
847 path=to_table_path,
848 data_format=to_table_format,
849 mode=to_table_mode,
850 write_options=to_table_options,
851 partition_cols=to_table_partition_cols,
852 )
855# ---------------------------------------------------------------------------- #
856# #
857# Table functions ####
858# #
859# ---------------------------------------------------------------------------- #
862def _validate_table_name(table: str) -> None:
863 if "/" in table:
864 raise ValidationError(f"Invalid table. Cannot contain `/`: `{table}`.")
865 if len(table.split(".")) != 2:
866 raise ValidationError(
867 f"Invalid table. Should be in the format `schema.table`: `{table}`."
868 )
871## --------------------------------------------------------------------------- #
872## Read ####
873## --------------------------------------------------------------------------- #
876@typechecked
877def read_from_table(
878 spark_session: SparkSession,
879 name: str,
880 schema: Optional[str] = None,
881 data_format: Optional[SPARK_FORMATS] = "parquet",
882 read_options: Optional[str_dict] = None,
883) -> psDataFrame:
884 """
885 !!! note "Summary"
886 Read a table from a given `schema` and `name` into memory as a `pyspark` dataframe.
888 ???+ abstract "Details"
889 - If `schema` is `#!py None`, then we would expect the `name` to contain both the schema and the table name in the same. Like: `schema.name`, for example `production.orders`.
890 - Else, if `schema` is not `#! None`, then we would expect the `schema` to (quite logically) contain the name of the schema, and the `name` to contain the name of the table.
892 Params:
893 spark_session (SparkSession):
894 The Spark session to use for the reading.
895 name (str):
896 The name of the table to read in.
897 schema (Optional[str], optional):
898 The schema of the table to read in.<br>
899 Defaults to `#!py None`.
900 data_format (Optional[SPARK_FORMATS], optional):
901 The format of the table.<br>
902 Defaults to `#!py "parquet"`.
903 read_options (Dict[str, str], optional):
904 Any additional options to parse to the Spark reader.<br>
905 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameReader.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.options.html).<br>
906 Defaults to `#!py dict()`.
908 Raises:
909 TypeError:
910 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
911 ValidationError:
912 If `name` contains `/`, or is structured with three elements like: `source.schema.table`.
914 Returns:
915 (psDataFrame):
916 The loaded dataframe.
918 ???+ example "Examples"
920 ```{.py .python linenums="1" title="Set up"}
921 >>> # Imports
922 >>> import pandas as pd
923 >>> from pyspark.sql import SparkSession
924 >>> from toolbox_pyspark.io import read_from_table
925 >>>
926 >>> # Instantiate Spark
927 >>> spark = SparkSession.builder.getOrCreate()
928 >>>
929 >>> # Create data
930 >>> df = pd.DataFrame(
931 ... {
932 ... "a": [1, 2, 3, 4],
933 ... "b": ["a", "b", "c", "d"],
934 ... "c": [1, 1, 1, 1],
935 ... "d": ["2", "2", "2", "2"],
936 ... }
937 ... )
938 >>> df.to_parquet("./test/table.parquet")
939 >>> spark.read.parquet("./test/table.parquet").createOrReplaceTempView("test_table")
940 ```
942 ```{.py .python linenums="1" title="Example 1: Read Table"}
943 >>> df_table = read_from_table(
944 ... name="test_table",
945 ... spark_session=spark,
946 ... data_format="parquet",
947 ... )
948 >>>
949 >>> df_table.show()
950 ```
951 <div class="result" markdown>
952 ```{.txt .text title="Terminal"}
953 +---+---+---+---+
954 | a | b | c | d |
955 +---+---+---+---+
956 | 1 | a | 1 | 2 |
957 | 2 | b | 1 | 2 |
958 | 3 | c | 1 | 2 |
959 | 4 | d | 1 | 2 |
960 +---+---+---+---+
961 ```
962 !!! success "Conclusion: Successfully read table."
963 </div>
965 ```{.py .python linenums="1" title="Example 2: Invalid table structure"}
966 >>> df_table = read_from_table(
967 ... name="schema.test_table",
968 ... schema="source",
969 ... spark_session=spark,
970 ... data_format="parquet",
971 ... )
972 ```
973 <div class="result" markdown>
974 ```{.txt .text title="Terminal"}
975 Invalid table. Should be in the format `schema.table`: `source.schema.test_table`.
976 ```
977 !!! failure "Conclusion: Failed to write to table due to invalid table structure."
978 </div>
980 ??? tip "See Also"
981 - [`save_to_table`][toolbox_pyspark.io.save_to_table]
982 - [`write`][toolbox_pyspark.io.write]
983 - [`save`][toolbox_pyspark.io.save]
984 """
986 # Set default options ----
987 data_format: str = data_format or "parquet"
988 table: str = name if not schema else f"{schema}.{name}"
990 # Validate that `table` is in the correct format ----
991 _validate_table_name(table)
993 # Initialise reader (including data format) ----
994 reader: DataFrameReader = spark_session.read.format(data_format)
996 # Add options (if exists) ----
997 if read_options:
998 reader.options(**read_options)
1000 # Load DataFrame ----
1001 return reader.table(table)
1004## --------------------------------------------------------------------------- #
1005## Write ####
1006## --------------------------------------------------------------------------- #
1009@typechecked
1010def write_to_table(
1011 data_frame: psDataFrame,
1012 name: str,
1013 schema: Optional[str] = None,
1014 data_format: Optional[SPARK_FORMATS] = "parquet",
1015 mode: Optional[WRITE_MODES] = None,
1016 write_options: Optional[str_dict] = None,
1017 partition_cols: Optional[str_collection] = None,
1018) -> None:
1019 """
1020 !!! note "Summary"
1021 For a given `data_frame`, write it out to a specified `schema` and `name` with format `data_format`.
1023 ???+ abstract "Details"
1024 - If `schema` is `#!py None`, then we would expect the `name` to contain both the schema and the table name in the same. Like: `schema.name`, for example `production.orders`.
1025 - Else, if `schema` is not `#! None`, then we would expect the `schema` to (quite logically) contain the name of the schema, and the `name` to contain the name of the table.
1027 Params:
1028 data_frame (psDataFrame):
1029 The DataFrame to be written. Must be a valid `pyspark` DataFrame ([`pyspark.sql.DataFrame`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)).
1030 name (str):
1031 The name of the table where it will be written.
1032 schema (Optional[str], optional):
1033 The schema of the table where it will be written.<br>
1034 Defaults to `#!py None`.
1035 data_format (Optional[SPARK_FORMATS], optional):
1036 The format that the `data_frame` will be written to.<br>
1037 Defaults to `#!py "parquet"`.
1038 mode (Optional[WRITE_MODES], optional):
1039 The behaviour for when the data already exists.<br>
1040 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.mode`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.mode.html).<br>
1041 Defaults to `#!py None`.
1042 write_options (Dict[str, str], optional):
1043 Any additional settings to parse to the writer class.<br>
1044 Like, for example:
1046 - If you are writing to a Delta object, and wanted to overwrite the schema: `#!py {"overwriteSchema": "true"}`.
1047 - If you're writing to a CSV file, and wanted to specify the header row: `#!py {"header": "true"}`.
1049 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html).<br>
1050 Defaults to `#!py dict()`.
1051 partition_cols (Optional[Union[str_collection, str]], optional):
1052 The column(s) that the table should partition by.<br>
1053 Defaults to `#!py None`.
1055 Raises:
1056 TypeError:
1057 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
1058 ValidationError:
1059 If `name` contains `/`, or is structured with three elements like: `source.schema.table`.
1061 Returns:
1062 (type(None)):
1063 Nothing is returned.
1065 ???+ tip "Note"
1066 You know that this function is successful if the table exists at the specified location, and there are no errors thrown.
1068 ???+ example "Examples"
1070 ```{.py .python linenums="1" title="Set up"}
1071 >>> # Imports
1072 >>> import pandas as pd
1073 >>> from pyspark.sql import SparkSession
1074 >>> from toolbox_pyspark.io import write_to_table
1075 >>> from toolbox_pyspark.checks import table_exists
1076 >>>
1077 >>> # Instantiate Spark
1078 >>> spark = SparkSession.builder.getOrCreate()
1079 >>>
1080 >>> # Create data
1081 >>> df = spark.createDataFrame(
1082 ... pd.DataFrame(
1083 ... {
1084 ... "a": [1, 2, 3, 4],
1085 ... "b": ["a", "b", "c", "d"],
1086 ... "c": [1, 1, 1, 1],
1087 ... "d": ["2", "2", "2", "2"],
1088 ... }
1089 ... )
1090 ... )
1091 ```
1093 ```{.py .python linenums="1" title="Check"}
1094 >>> df.show()
1095 ```
1096 <div class="result" markdown>
1097 ```{.txt .text title="Terminal"}
1098 +---+---+---+---+
1099 | a | b | c | d |
1100 +---+---+---+---+
1101 | 1 | a | 1 | 2 |
1102 | 2 | b | 1 | 2 |
1103 | 3 | c | 1 | 2 |
1104 | 4 | d | 1 | 2 |
1105 +---+---+---+---+
1106 ```
1107 </div>
1109 ```{.py .python linenums="1" title="Example 1: Write to Table"}
1110 >>> write_to_table(
1111 ... data_frame=df,
1112 ... name="test_table",
1113 ... schema="default",
1114 ... data_format="parquet",
1115 ... mode="overwrite",
1116 ... )
1117 >>>
1118 >>> table_exists(
1119 ... name="test_table",
1120 ... schema="default",
1121 ... data_format="parquet",
1122 ... spark_session=df.sparkSession,
1123 ... )
1124 ```
1125 <div class="result" markdown>
1126 ```{.sh .shell title="Terminal"}
1127 True
1128 ```
1129 !!! success "Conclusion: Successfully written to table."
1130 </div>
1132 ```{.py .python linenums="1" title="Example 2: Invalid table structure"}
1133 >>> write_to_table(
1134 ... data_frame=df,
1135 ... name="schema.test_table",
1136 ... schema="source",
1137 ... data_format="parquet",
1138 ... mode="overwrite",
1139 ... )
1140 ```
1141 <div class="result" markdown>
1142 ```{.txt .text title="Terminal"}
1143 Invalid table. Should be in the format `schema.table`: `source.schema.test_table`.
1144 ```
1145 !!! failure "Conclusion: Failed to write to table due to invalid table structure."
1146 </div>
1148 ??? tip "See Also"
1149 - [`save_to_table`][toolbox_pyspark.io.save_to_table]
1150 - [`write`][toolbox_pyspark.io.write]
1151 - [`save`][toolbox_pyspark.io.save]
1152 """
1154 # Set default options ----
1155 write_options: str_dict = write_options or dict()
1156 data_format: str = data_format or "parquet"
1157 table: str = name if not schema else f"{schema}.{name}"
1159 # Validate that `table` is in the correct format ----
1160 _validate_table_name(table)
1162 # Initialise writer (including data format) ----
1163 writer: DataFrameWriter = data_frame.write.mode(mode).format(data_format)
1165 # Add options (if exists) ----
1166 if write_options:
1167 writer.options(**write_options)
1169 # Add partition (if exists) ----
1170 if partition_cols is not None:
1171 partition_cols = [partition_cols] if is_type(partition_cols, str) else partition_cols
1172 writer = writer.partitionBy(list(partition_cols))
1174 # Write table ----
1175 writer.saveAsTable(table)
1178## --------------------------------------------------------------------------- #
1179## Transfer ####
1180## --------------------------------------------------------------------------- #
1183@typechecked
1184def transfer_by_table(
1185 spark_session: SparkSession,
1186 from_table_name: str,
1187 to_table_name: str,
1188 from_table_schema: Optional[str] = None,
1189 from_table_format: Optional[SPARK_FORMATS] = "parquet",
1190 from_table_options: Optional[str_dict] = None,
1191 to_table_schema: Optional[str] = None,
1192 to_table_format: Optional[SPARK_FORMATS] = "parquet",
1193 to_table_mode: Optional[WRITE_MODES] = None,
1194 to_table_options: Optional[str_dict] = None,
1195 to_table_partition_cols: Optional[str_collection] = None,
1196) -> None:
1197 """
1198 !!! note "Summary"
1199 Copy a table from one schema and name to another schema and name.
1201 ???+ abstract "Details"
1202 This is a blind transfer. There is no validation, no alteration, no adjustments made at all. Simply read directly from one table and move immediately to another table straight away.
1204 Params:
1205 spark_session (SparkSession):
1206 The spark session to use for the transfer. Necessary in order to instantiate the reading process.
1207 from_table_name (str):
1208 The name of the table to be read.
1209 to_table_name (str):
1210 The name of the table where it will be saved.
1211 from_table_schema (Optional[str], optional):
1212 The schema of the table to be read.<br>
1213 Defaults to `#!py None`.
1214 from_table_format (Optional[SPARK_FORMATS], optional):
1215 The format of the data at the reading location.<br>
1216 Defaults to `#!py "parquet"`.
1217 from_table_options (Dict[str, str], optional):
1218 Any additional options to parse to the Spark reader.<br>
1219 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameReader.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.options.html).<br>
1220 Defaults to `#!py dict()`.
1221 to_table_schema (Optional[str], optional):
1222 The schema of the table where it will be saved.<br>
1223 Defaults to `#!py None`.
1224 to_table_format (Optional[SPARK_FORMATS], optional):
1225 The format of the saved table.<br>
1226 Defaults to `#!py "parquet"`.
1227 to_table_mode (Optional[WRITE_MODES], optional):
1228 The behaviour for when the data already exists.<br>
1229 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.mode`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.mode.html).<br>
1230 Defaults to `#!py None`.
1231 to_table_options (Dict[str, str], optional):
1232 Any additional settings to parse to the writer class.<br>
1233 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html).<br>
1234 Defaults to `#!py dict()`.
1235 to_table_partition_cols (Optional[Union[str_collection, str]], optional):
1236 The column(s) that the table should partition by.<br>
1237 Defaults to `#!py None`.
1239 Raises:
1240 TypeError:
1241 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
1243 Returns:
1244 (type(None)):
1245 Nothing is returned.
1247 ???+ tip "Note"
1248 You know that this function is successful if the table exists at the specified location, and there are no errors thrown.
1250 ???+ example "Examples"
1252 ```{.py .python linenums="1" title="Set up"}
1253 >>> # Imports
1254 >>> import pandas as pd
1255 >>> from pyspark.sql import SparkSession
1256 >>> from toolbox_pyspark.io import transfer_by_table
1257 >>> from toolbox_pyspark.checks import table_exists
1258 >>>
1259 >>> # Instantiate Spark
1260 >>> spark = SparkSession.builder.getOrCreate()
1261 >>>
1262 >>> # Create data
1263 >>> df = pd.DataFrame(
1264 ... {
1265 ... "a": [1, 2, 3, 4],
1266 ... "b": ["a", "b", "c", "d"],
1267 ... "c": [1, 1, 1, 1],
1268 ... "d": ["2", "2", "2", "2"],
1269 ... }
1270 ... )
1271 >>> df.to_parquet("./test/table.parquet")
1272 >>> spark.read.parquet("./test/table.parquet").createOrReplaceTempView("test_table")
1273 ```
1275 ```{.py .python linenums="1" title="Example 1: Transfer Table"}
1276 >>> transfer_by_table(
1277 ... spark_session=spark,
1278 ... from_table_name="test_table",
1279 ... from_table_schema="default",
1280 ... from_table_format="parquet",
1281 ... to_table_name="new_table",
1282 ... to_table_schema="default",
1283 ... to_table_format="parquet",
1284 ... to_table_mode="overwrite",
1285 ... )
1286 >>>
1287 >>> table_exists(
1288 ... name="new_table",
1289 ... schema="default",
1290 ... data_format="parquet",
1291 ... spark_session=spark,
1292 ... )
1293 ```
1294 <div class="result" markdown>
1295 ```{.sh .shell title="Terminal"}
1296 True
1297 ```
1298 !!! success "Conclusion: Successfully transferred table."
1299 </div>
1301 ```{.py .python linenums="1" title="Example 2: Invalid table structure"}
1302 >>> transfer_by_table(
1303 ... spark_session=spark,
1304 ... from_table_name="schema.test_table",
1305 ... from_table_schema="source",
1306 ... from_table_format="parquet",
1307 ... to_table_name="new_table",
1308 ... to_table_schema="default",
1309 ... to_table_format="parquet",
1310 ... to_table_mode="overwrite",
1311 ... )
1312 ```
1313 <div class="result" markdown>
1314 ```{.txt .text title="Terminal"}
1315 Invalid table. Should be in the format `schema.table`: `source.schema.test_table`.
1316 ```
1317 !!! failure "Conclusion: Failed to transfer table due to invalid table structure."
1318 </div>
1320 ??? tip "See Also"
1321 - [`transfer`][toolbox_pyspark.io.transfer]
1322 """
1324 # Read from source ----
1325 source_table: psDataFrame = read_from_table(
1326 name=from_table_name,
1327 schema=from_table_schema,
1328 spark_session=spark_session,
1329 data_format=from_table_format,
1330 read_options=from_table_options,
1331 )
1333 # Write to target ----
1334 write_to_table(
1335 data_frame=source_table,
1336 name=to_table_name,
1337 schema=to_table_schema,
1338 data_format=to_table_format,
1339 mode=to_table_mode,
1340 write_options=to_table_options,
1341 partition_cols=to_table_partition_cols,
1342 )
1345# ---------------------------------------------------------------------------- #
1346# #
1347# Combined Functions ####
1348# #
1349# ---------------------------------------------------------------------------- #
1352## --------------------------------------------------------------------------- #
1353## Read ####
1354## --------------------------------------------------------------------------- #
1357@typechecked
1358def read(
1359 spark_session: SparkSession,
1360 name: str,
1361 method: Literal["table", "path"],
1362 path: Optional[str] = None,
1363 schema: Optional[str] = None,
1364 data_format: Optional[SPARK_FORMATS] = "parquet",
1365 read_options: Optional[str_dict] = None,
1366) -> psDataFrame:
1367 """
1368 !!! note "Summary"
1369 Read a table or file from a given `path` or `schema` and `name` into memory as a `pyspark` dataframe.
1371 ???+ abstract "Details"
1372 This function serves as a unified interface for reading data into a `pyspark` dataframe. Depending on the `method` parameter, it will either read from a file path or a table.
1374 - If `method` is `#!py "path"`, the function will use the `read_from_path` function to read the data from the specified `path` and `name`.
1375 - If `method` is `#!py "table"`, the function will use the `read_from_table` function to read the data from the specified `schema` and `name`.
1377 Params:
1378 spark_session (SparkSession):
1379 The Spark session to use for the reading.
1380 name (str):
1381 The name of the table or file to read in.
1382 method (Literal["table", "path"]):
1383 The method to use for reading the data. Either `#!py "table"` or `#!py "path"`.
1384 path (Optional[str], optional):
1385 The path from which the file will be read. Required if `method` is `#!py "path"`.<br>
1386 Defaults to `#!py None`.
1387 schema (Optional[str], optional):
1388 The schema of the table to read in. Required if `method` is `#!py "table"`.<br>
1389 Defaults to `#!py None`.
1390 data_format (Optional[SPARK_FORMATS], optional):
1391 The format of the data.<br>
1392 Defaults to `#!py "parquet"`.
1393 read_options (Dict[str, str], optional):
1394 Any additional options to parse to the Spark reader.<br>
1395 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameReader.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.options.html).<br>
1396 Defaults to `#!py dict()`.
1398 Raises:
1399 TypeError:
1400 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
1401 ValidationError:
1402 If `name` contains `/`, or is structured with three elements like: `source.schema.table`.
1404 Returns:
1405 (psDataFrame):
1406 The loaded dataframe.
1408 ???+ example "Examples"
1410 ```{.py .python linenums="1" title="Set up"}
1411 >>> # Imports
1412 >>> import pandas as pd
1413 >>> from pyspark.sql import SparkSession
1414 >>> from toolbox_pyspark.io import read
1415 >>>
1416 >>> # Instantiate Spark
1417 >>> spark = SparkSession.builder.getOrCreate()
1418 >>>
1419 >>> # Create data
1420 >>> df = pd.DataFrame(
1421 ... {
1422 ... "a": [1, 2, 3, 4],
1423 ... "b": ["a", "b", "c", "d"],
1424 ... "c": [1, 1, 1, 1],
1425 ... "d": ["2", "2", "2", "2"],
1426 ... }
1427 ... )
1428 >>> df.to_csv("./test/table.csv")
1429 >>> df.to_parquet("./test/table.parquet")
1430 >>> spark.read.parquet("./test/table.parquet").createOrReplaceTempView("test_table")
1431 ```
1433 ```{.py .python linenums="1" title="Example 1: Read from Path"}
1434 >>> df_path = read(
1435 ... spark_session=spark,
1436 ... name="table.csv",
1437 ... method="path",
1438 ... path="./test",
1439 ... data_format="csv",
1440 ... read_options={"header": "true"},
1441 ... )
1442 >>>
1443 >>> df_path.show()
1444 ```
1445 <div class="result" markdown>
1446 ```{.txt .text title="Terminal"}
1447 +---+---+---+---+
1448 | a | b | c | d |
1449 +---+---+---+---+
1450 | 1 | a | 1 | 2 |
1451 | 2 | b | 1 | 2 |
1452 | 3 | c | 1 | 2 |
1453 | 4 | d | 1 | 2 |
1454 +---+---+---+---+
1455 ```
1456 !!! success "Conclusion: Successfully read from path."
1457 </div>
1459 ```{.py .python linenums="1" title="Example 2: Read from Table"}
1460 >>> df_table = read(
1461 ... spark_session=spark,
1462 ... name="test_table",
1463 ... method="table",
1464 ... schema="default",
1465 ... data_format="parquet",
1466 ... )
1467 >>>
1468 >>> df_table.show()
1469 ```
1470 <div class="result" markdown>
1471 ```{.txt .text title="Terminal"}
1472 +---+---+---+---+
1473 | a | b | c | d |
1474 +---+---+---+---+
1475 | 1 | a | 1 | 2 |
1476 | 2 | b | 1 | 2 |
1477 | 3 | c | 1 | 2 |
1478 | 4 | d | 1 | 2 |
1479 +---+---+---+---+
1480 ```
1481 !!! success "Conclusion: Successfully read from table."
1482 </div>
1484 ```{.py .python linenums="1" title="Example 3: Invalid Path"}
1485 >>> df_invalid_path = read(
1486 ... spark_session=spark,
1487 ... name="invalid_table.csv",
1488 ... method="path",
1489 ... path="./invalid_path",
1490 ... data_format="csv",
1491 ... read_options={"header": "true"},
1492 ... )
1493 ```
1494 <div class="result" markdown>
1495 ```{.txt .text title="Terminal"}
1496 Py4JJavaError: An error occurred while calling o45.load.
1497 ```
1498 !!! failure "Conclusion: Failed to read from invalid path."
1499 </div>
1501 ```{.py .python linenums="1" title="Example 4: Invalid Table Structure"}
1502 >>> df_invalid_table = read(
1503 ... spark_session=spark,
1504 ... name="schema.invalid_table",
1505 ... method="table",
1506 ... schema="source",
1507 ... data_format="parquet",
1508 ... )
1509 ```
1510 <div class="result" markdown>
1511 ```{.txt .text title="Terminal"}
1512 Invalid table. Should be in the format `schema.table`: `source.schema.invalid_table`.
1513 ```
1514 !!! failure "Conclusion: Failed to read from invalid table structure."
1515 </div>
1517 ??? tip "See Also"
1518 - [`read_from_path`][toolbox_pyspark.io.read_from_path]
1519 - [`read_from_table`][toolbox_pyspark.io.read_from_table]
1520 - [`load`][toolbox_pyspark.io.load]
1521 """
1523 if method == "table":
1524 return read_from_table(
1525 spark_session=spark_session,
1526 name=name,
1527 schema=schema,
1528 data_format=data_format,
1529 read_options=read_options,
1530 )
1531 if method == "path":
1532 return read_from_path(
1533 spark_session=spark_session,
1534 name=name,
1535 path=path,
1536 data_format=data_format,
1537 read_options=read_options,
1538 )
1541## --------------------------------------------------------------------------- #
1542## Write ####
1543## --------------------------------------------------------------------------- #
1546@typechecked
1547def write(
1548 data_frame: psDataFrame,
1549 name: str,
1550 method: Literal["table", "path"],
1551 path: Optional[str] = None,
1552 schema: Optional[str] = None,
1553 data_format: Optional[SPARK_FORMATS] = "parquet",
1554 mode: Optional[WRITE_MODES] = None,
1555 write_options: Optional[str_dict] = None,
1556 partition_cols: Optional[str_collection] = None,
1557) -> None:
1558 """
1559 !!! note "Summary"
1560 Write a dataframe to a specified `path` or `schema` and `name` with format `data_format`.
1562 ???+ abstract "Details"
1563 This function serves as a unified interface for writing data from a `pyspark` dataframe. Depending on the `method` parameter, it will either write to a file path or a table.
1565 - If `method` is `#!py "path"`, the function will use the `write_to_path` function to write the data to the specified `path` and `name`.
1566 - If `method` is `#!py "table"`, the function will use the `write_to_table` function to write the data to the specified `schema` and `name`.
1568 Params:
1569 data_frame (psDataFrame):
1570 The DataFrame to be written. Must be a valid `pyspark` DataFrame ([`pyspark.sql.DataFrame`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)).
1571 name (str):
1572 The name of the table or file where it will be written.
1573 method (Literal["table", "path"]):
1574 The method to use for writing the data. Either `#!py "table"` or `#!py "path"`.
1575 path (Optional[str], optional):
1576 The path location for where to save the table. Required if `method` is `#!py "path"`.<br>
1577 Defaults to `#!py None`.
1578 schema (Optional[str], optional):
1579 The schema of the table where it will be written. Required if `method` is `#!py "table"`.<br>
1580 Defaults to `#!py None`.
1581 data_format (Optional[SPARK_FORMATS], optional):
1582 The format that the `data_frame` will be written to.<br>
1583 Defaults to `#!py "parquet"`.
1584 mode (Optional[WRITE_MODES], optional):
1585 The behaviour for when the data already exists.<br>
1586 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.mode`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.mode.html).<br>
1587 Defaults to `#!py None`.
1588 write_options (Dict[str, str], optional):
1589 Any additional settings to parse to the writer class.<br>
1590 Like, for example:
1592 - If you are writing to a Delta object, and wanted to overwrite the schema: `#!py {"overwriteSchema": "true"}`.
1593 - If you're writing to a CSV file, and wanted to specify the header row: `#!py {"header": "true"}`.
1595 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html).<br>
1596 Defaults to `#!py dict()`.
1597 partition_cols (Optional[Union[str_collection, str]], optional):
1598 The column(s) that the table should partition by.<br>
1599 Defaults to `#!py None`.
1601 Raises:
1602 TypeError:
1603 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
1604 ValidationError:
1605 If `name` contains `/`, or is structured with three elements like: `source.schema.table`.
1607 Returns:
1608 (type(None)):
1609 Nothing is returned.
1611 ???+ tip "Note"
1612 You know that this function is successful if the table or file exists at the specified location, and there are no errors thrown.
1614 ???+ example "Examples"
1616 ```{.py .python linenums="1" title="Set up"}
1617 >>> # Imports
1618 >>> import pandas as pd
1619 >>> from pyspark.sql import SparkSession
1620 >>> from toolbox_pyspark.io import write
1621 >>> from toolbox_pyspark.checks import table_exists
1622 >>>
1623 >>> # Instantiate Spark
1624 >>> spark = SparkSession.builder.getOrCreate()
1625 >>>
1626 >>> # Create data
1627 >>> df = spark.createDataFrame(
1628 ... pd.DataFrame(
1629 ... {
1630 ... "a": [1, 2, 3, 4],
1631 ... "b": ["a", "b", "c", "d"],
1632 ... "c": [1, 1, 1, 1],
1633 ... "d": ["2", "2", "2", "2"],
1634 ... }
1635 ... )
1636 ... )
1637 ```
1639 ```{.py .python linenums="1" title="Check"}
1640 >>> df.show()
1641 ```
1642 <div class="result" markdown>
1643 ```{.txt .text title="Terminal"}
1644 +---+---+---+---+
1645 | a | b | c | d |
1646 +---+---+---+---+
1647 | 1 | a | 1 | 2 |
1648 | 2 | b | 1 | 2 |
1649 | 3 | c | 1 | 2 |
1650 | 4 | d | 1 | 2 |
1651 +---+---+---+---+
1652 ```
1653 </div>
1655 ```{.py .python linenums="1" title="Example 1: Write to Path"}
1656 >>> write(
1657 ... data_frame=df,
1658 ... name="df.csv",
1659 ... method="path",
1660 ... path="./test",
1661 ... data_format="csv",
1662 ... mode="overwrite",
1663 ... write_options={"header": "true"},
1664 ... )
1665 >>>
1666 >>> table_exists(
1667 ... name="df.csv",
1668 ... path="./test",
1669 ... data_format="csv",
1670 ... spark_session=df.sparkSession,
1671 ... )
1672 ```
1673 <div class="result" markdown>
1674 ```{.sh .shell title="Terminal"}
1675 True
1676 ```
1677 !!! success "Conclusion: Successfully written to path."
1678 </div>
1680 ```{.py .python linenums="1" title="Example 2: Write to Table"}
1681 >>> write(
1682 ... data_frame=df,
1683 ... name="test_table",
1684 ... method="table",
1685 ... schema="default",
1686 ... data_format="parquet",
1687 ... mode="overwrite",
1688 ... )
1689 >>>
1690 >>> table_exists(
1691 ... name="test_table",
1692 ... schema="default",
1693 ... data_format="parquet",
1694 ... spark_session=df.sparkSession,
1695 ... )
1696 ```
1697 <div class="result" markdown>
1698 ```{.sh .shell title="Terminal"}
1699 True
1700 ```
1701 !!! success "Conclusion: Successfully written to table."
1702 </div>
1704 ```{.py .python linenums="1" title="Example 3: Invalid Path"}
1705 >>> write(
1706 ... data_frame=df,
1707 ... name="df.csv",
1708 ... method="path",
1709 ... path="./invalid_path",
1710 ... data_format="csv",
1711 ... mode="overwrite",
1712 ... write_options={"header": "true"},
1713 ... )
1714 ```
1715 <div class="result" markdown>
1716 ```{.txt .text title="Terminal"}
1717 Py4JJavaError: An error occurred while calling o45.save.
1718 ```
1719 !!! failure "Conclusion: Failed to write to invalid path."
1720 </div>
1722 ```{.py .python linenums="1" title="Example 4: Invalid Table Structure"}
1723 >>> write(
1724 ... data_frame=df,
1725 ... name="schema.test_table",
1726 ... method="table",
1727 ... schema="source",
1728 ... data_format="parquet",
1729 ... mode="overwrite",
1730 ... )
1731 ```
1732 <div class="result" markdown>
1733 ```{.txt .text title="Terminal"}
1734 Invalid table. Should be in the format `schema.table`: `source.schema.test_table`.
1735 ```
1736 !!! failure "Conclusion: Failed to write to table due to invalid table structure."
1737 </div>
1739 ??? tip "See Also"
1740 - [`write_to_path`][toolbox_pyspark.io.write_to_path]
1741 - [`write_to_table`][toolbox_pyspark.io.write_to_table]
1742 - [`save`][toolbox_pyspark.io.save]
1743 """
1745 if method == "table":
1746 write_to_table(
1747 data_frame=data_frame,
1748 name=name,
1749 schema=schema,
1750 data_format=data_format,
1751 mode=mode,
1752 write_options=write_options,
1753 partition_cols=partition_cols,
1754 )
1755 if method == "path":
1756 write_to_path(
1757 data_frame=data_frame,
1758 name=name,
1759 path=path,
1760 data_format=data_format,
1761 mode=mode,
1762 write_options=write_options,
1763 partition_cols=partition_cols,
1764 )
1767## --------------------------------------------------------------------------- #
1768## Transfer ####
1769## --------------------------------------------------------------------------- #
1772@typechecked
1773def transfer(
1774 spark_session: SparkSession,
1775 from_table_name: str,
1776 to_table_name: str,
1777 method: Literal["table", "path"],
1778 from_table_path: Optional[str] = None,
1779 from_table_schema: Optional[str] = None,
1780 from_table_format: Optional[SPARK_FORMATS] = "parquet",
1781 from_table_options: Optional[str_dict] = None,
1782 to_table_path: Optional[str] = None,
1783 to_table_schema: Optional[str] = None,
1784 to_table_format: Optional[SPARK_FORMATS] = "parquet",
1785 to_table_mode: Optional[WRITE_MODES] = None,
1786 to_table_options: Optional[str_dict] = None,
1787 to_partition_cols: Optional[str_collection] = None,
1788) -> None:
1789 """
1790 !!! note "Summary"
1791 Transfer a table or file from one location to another.
1793 ???+ abstract "Details"
1794 This function serves as a unified interface for transferring data from one location to another. Depending on the `method` parameter, it will either transfer from a file path or a table.
1796 - If `method` is `#!py "path"`, the function will use the `transfer_by_path` function to transfer the data from the specified `from_table_path` and `from_table_name` to the specified `to_table_path` and `to_table_name`.
1797 - If `method` is `#!py "table"`, the function will use the `transfer_by_table` function to transfer the data from the specified `from_table_schema` and `from_table_name` to the specified `to_table_schema` and `to_table_name`.
1799 Params:
1800 spark_session (SparkSession):
1801 The Spark session to use for the transfer.
1802 from_table_name (str):
1803 The name of the table or file to be transferred.
1804 to_table_name (str):
1805 The name of the table or file where it will be transferred.
1806 method (Literal["table", "path"]):
1807 The method to use for transferring the data. Either `#!py "table"` or `#!py "path"`.
1808 from_table_path (Optional[str], optional):
1809 The path from which the file will be transferred. Required if `method` is `#!py "path"`.<br>
1810 Defaults to `#!py None`.
1811 from_table_schema (Optional[str], optional):
1812 The schema of the table to be transferred. Required if `method` is `#!py "table"`.<br>
1813 Defaults to `#!py None`.
1814 from_table_format (Optional[SPARK_FORMATS], optional):
1815 The format of the data at the source location.<br>
1816 Defaults to `#!py "parquet"`.
1817 from_table_options (Dict[str, str], optional):
1818 Any additional options to parse to the Spark reader.<br>
1819 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameReader.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.options.html).<br>
1820 Defaults to `#!py dict()`.
1821 to_table_path (Optional[str], optional):
1822 The path location for where to save the table. Required if `method` is `#!py "path"`.<br>
1823 Defaults to `#!py None`.
1824 to_table_schema (Optional[str], optional):
1825 The schema of the table where it will be saved. Required if `method` is `#!py "table"`.<br>
1826 Defaults to `#!py None`.
1827 to_table_format (Optional[SPARK_FORMATS], optional):
1828 The format of the saved table.<br>
1829 Defaults to `#!py "parquet"`.
1830 to_table_mode (Optional[WRITE_MODES], optional):
1831 The behaviour for when the data already exists.<br>
1832 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.mode`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.mode.html).<br>
1833 Defaults to `#!py None`.
1834 to_table_options (Dict[str, str], optional):
1835 Any additional settings to parse to the writer class.<br>
1836 For more info, check the `pyspark` docs: [`pyspark.sql.DataFrameWriter.options`](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.options.html).<br>
1837 Defaults to `#!py dict()`.
1838 to_partition_cols (Optional[Union[str_collection, str]], optional):
1839 The column(s) that the table should partition by.<br>
1840 Defaults to `#!py None`.
1842 Raises:
1843 TypeError:
1844 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator.
1845 ValidationError:
1846 If `from_table_name` or `to_table_name` contains `/`, or is structured with three elements like: `source.schema.table`.
1848 Returns:
1849 (type(None)):
1850 Nothing is returned.
1852 ???+ tip "Note"
1853 You know that this function is successful if the table or file exists at the specified location, and there are no errors thrown.
1855 ???+ example "Examples"
1857 ```{.py .python linenums="1" title="Set up"}
1858 >>> # Imports
1859 >>> import pandas as pd
1860 >>> from pyspark.sql import SparkSession
1861 >>> from toolbox_pyspark.io import transfer
1862 >>> from toolbox_pyspark.checks import table_exists
1863 >>>
1864 >>> # Instantiate Spark
1865 >>> spark = SparkSession.builder.getOrCreate()
1866 >>>
1867 >>> # Create data
1868 >>> df = pd.DataFrame(
1869 ... {
1870 ... "a": [1, 2, 3, 4],
1871 ... "b": ["a", "b", "c", "d"],
1872 ... "c": [1, 1, 1, 1],
1873 ... "d": ["2", "2", "2", "2"],
1874 ... }
1875 ... )
1876 >>> df.to_csv("./test/table.csv")
1877 >>> df.to_parquet("./test/table.parquet")
1878 >>> spark.read.parquet("./test/table.parquet").createOrReplaceTempView("test_table")
1879 ```
1881 ```{.py .python linenums="1" title="Example 1: Transfer from Path"}
1882 >>> transfer(
1883 ... spark_session=spark,
1884 ... method="path",
1885 ... from_table_name="table.csv",
1886 ... from_table_path="./test",
1887 ... from_table_format="csv",
1888 ... from_table_options={"header": "true"},
1889 ... to_table_name="new_table.csv",
1890 ... to_table_path="./other",
1891 ... to_table_format="csv",
1892 ... to_table_mode="overwrite",
1893 ... to_table_options={"header": "true"},
1894 ... )
1895 >>>
1896 >>> table_exists(
1897 ... name="new_table.csv",
1898 ... path="./other",
1899 ... data_format="csv",
1900 ... spark_session=spark,
1901 ... )
1902 ```
1903 <div class="result" markdown>
1904 ```{.sh .shell title="Terminal"}
1905 True
1906 ```
1907 !!! success "Conclusion: Successfully transferred from path."
1908 </div>
1910 ```{.py .python linenums="1" title="Example 2: Transfer from Table"}
1911 >>> transfer(
1912 ... spark_session=spark,
1913 ... method="table",
1914 ... from_table_name="test_table",
1915 ... from_table_schema="default",
1916 ... from_table_format="parquet",
1917 ... to_table_name="new_table",
1918 ... to_table_schema="default",
1919 ... to_table_format="parquet",
1920 ... to_table_mode="overwrite",
1921 ... )
1922 >>>
1923 >>> table_exists(
1924 ... name="new_table",
1925 ... schema="default",
1926 ... data_format="parquet",
1927 ... spark_session=spark,
1928 ... )
1929 ```
1930 <div class="result" markdown>
1931 ```{.sh .shell title="Terminal"}
1932 True
1933 ```
1934 !!! success "Conclusion: Successfully transferred from table."
1935 </div>
1937 ```{.py .python linenums="1" title="Example 3: Invalid Path"}
1938 >>> transfer(
1939 ... spark_session=spark,
1940 ... method="path",
1941 ... from_table_name="table.csv",
1942 ... from_table_path="./invalid_path",
1943 ... from_table_format="csv",
1944 ... from_table_options={"header": "true"},
1945 ... to_table_name="new_table.csv",
1946 ... to_table_path="./other",
1947 ... to_table_format="csv",
1948 ... to_table_mode="overwrite",
1949 ... to_table_options={"header": "true"},
1950 ... )
1951 ```
1952 <div class="result" markdown>
1953 ```{.txt .text title="Terminal"}
1954 Py4JJavaError: An error occurred while calling o45.load.
1955 ```
1956 !!! failure "Conclusion: Failed to transfer from invalid path."
1957 </div>
1959 ```{.py .python linenums="1" title="Example 4: Invalid Table Structure"}
1960 >>> transfer(
1961 ... spark_session=spark,
1962 ... method="table",
1963 ... from_table_name="schema.test_table",
1964 ... from_table_schema="source",
1965 ... from_table_format="parquet",
1966 ... to_table_name="new_table",
1967 ... to_table_schema="default",
1968 ... to_table_format="parquet",
1969 ... to_table_mode="overwrite",
1970 ... )
1971 ```
1972 <div class="result" markdown>
1973 ```{.txt .text title="Terminal"}
1974 Invalid table. Should be in the format `schema.table`: `source.schema.test_table`.
1975 ```
1976 !!! failure "Conclusion: Failed to transfer from invalid table structure."
1977 </div>
1979 ??? tip "See Also"
1980 - [`transfer_by_path`][toolbox_pyspark.io.transfer_by_path]
1981 - [`transfer_by_table`][toolbox_pyspark.io.transfer_by_table]
1982 """
1984 if method == "table":
1985 transfer_by_table(
1986 spark_session=spark_session,
1987 from_table_name=from_table_name,
1988 to_table_name=to_table_name,
1989 from_table_schema=from_table_schema,
1990 from_table_format=from_table_format,
1991 from_table_options=from_table_options,
1992 to_table_schema=to_table_schema,
1993 to_table_format=to_table_format,
1994 to_table_mode=to_table_mode,
1995 to_table_options=to_table_options,
1996 to_table_partition_cols=to_partition_cols,
1997 )
1998 if method == "path":
1999 transfer_by_path(
2000 spark_session=spark_session,
2001 from_table_path=from_table_path,
2002 from_table_name=from_table_name,
2003 from_table_format=from_table_format,
2004 to_table_path=to_table_path,
2005 to_table_name=to_table_name,
2006 to_table_format=to_table_format,
2007 from_table_options=from_table_options,
2008 to_table_mode=to_table_mode,
2009 to_table_options=to_table_options,
2010 to_table_partition_cols=to_partition_cols,
2011 )
2014# ---------------------------------------------------------------------------- #
2015# #
2016# Aliases ####
2017# #
2018# ---------------------------------------------------------------------------- #
2020load_from_path = read_from_path
2021save_to_path = write_to_path
2022load_from_table = read_from_table
2023save_to_table = write_to_table
2024load = read
2025save = write