Coverage for src/toolbox_pyspark/keys.py: 100%

24 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-25 23:08 +0000

1# ============================================================================ # 

2# # 

3# Title : Keys # 

4# Purpose : Creating new columns to act as keys (primary and foreign), # 

5# to be used for joins with other tables, or to create # 

6# relationships within downstream applications, like PowerBI. # 

7# # 

8# ============================================================================ # 

9 

10 

11# ---------------------------------------------------------------------------- # 

12# # 

13# Overview #### 

14# # 

15# ---------------------------------------------------------------------------- # 

16 

17 

18# ---------------------------------------------------------------------------- # 

19# Description #### 

20# ---------------------------------------------------------------------------- # 

21 

22 

23""" 

24!!! note "Summary" 

25 The `keys` module is used for creating new columns to act as keys (primary and foreign), to be used for joins with other tables, or to create relationships within downstream applications, like PowerBI. 

26""" 

27 

28 

29# ---------------------------------------------------------------------------- # 

30# # 

31# Setup #### 

32# # 

33# ---------------------------------------------------------------------------- # 

34 

35 

36# ---------------------------------------------------------------------------- # 

37# Imports #### 

38# ---------------------------------------------------------------------------- # 

39 

40 

41# ## Python StdLib Imports ---- 

42from typing import Optional, Union 

43 

44# ## Python Third Party Imports ---- 

45from pyspark.sql import DataFrame as psDataFrame, functions as F 

46from toolbox_python.checkers import is_type 

47from toolbox_python.collection_types import str_collection, str_list 

48from typeguard import typechecked 

49 

50# ## Local First Party Imports ---- 

51from toolbox_pyspark.checks import assert_columns_exists 

52 

53 

54# ---------------------------------------------------------------------------- # 

55# Exports #### 

56# ---------------------------------------------------------------------------- # 

57 

58 

59__all__: str_list = ["add_keys_from_columns", "add_key_from_columns"] 

60 

61 

62# ---------------------------------------------------------------------------- # 

63# # 

64# Functions #### 

65# # 

66# ---------------------------------------------------------------------------- # 

67 

68 

69# ---------------------------------------------------------------------------- # 

70# Add Keys #### 

71# ---------------------------------------------------------------------------- # 

72 

73 

74@typechecked 

75def add_key_from_columns( 

76 dataframe: psDataFrame, 

77 columns: Union[str, str_collection], 

78 join_character: Optional[str] = "_", 

79 key_name: Optional[str] = None, 

80) -> psDataFrame: 

81 """ 

82 !!! note "Summary" 

83 Using a list of column names, add a new column which is a combination of all of them. 

84 

85 ???+ abstract "Details" 

86 This is a combine key, and is especially important because PowerBI cannot handle joins on multiple columns. 

87 

88 Params: 

89 dataframe (psDataFrame): 

90 The table to be updated. 

91 columns (Union[str, str_collection]): 

92 The columns to be combined.<br> 

93 If `columns` is a `#!py str`, then it will be coerced to a single-element list: `#!py [columns]`. 

94 join_character (Optional[str], optional): 

95 The character to use to combine the columns together.<br> 

96 Defaults to `#!py "_"`. 

97 key_name (Optional[str], optional): 

98 The name of the column to be given to the key. 

99 If not provided, it will form as the capitalised string of all the other column names, prefixed with `key_`.<br> 

100 Defaults to `#!py None`. 

101 

102 Raises: 

103 TypeError: 

104 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

105 ColumnDoesNotExistError: 

106 If any of the `#!py columns` do not exist within `#!py dataframe.columns`. 

107 

108 Returns: 

109 (psDataFrame): 

110 The updated `dataframe`. 

111 

112 ???+ example "Examples" 

113 

114 ```{.py .python linenums="1" title="Set up"} 

115 >>> # Imports 

116 >>> import pandas as pd 

117 >>> from pyspark.sql import SparkSession 

118 >>> from toolbox_pyspark.types import get_column_types 

119 >>> 

120 >>> # Instantiate Spark 

121 >>> spark = SparkSession.builder.getOrCreate() 

122 >>> 

123 >>> # Create data 

124 >>> df = spark.createDataFrame( 

125 ... pd.DataFrame( 

126 ... { 

127 ... "a": [1, 2, 3, 4], 

128 ... "b": ["a", "b", "c", "d"], 

129 ... "c": [1, 1, 1, 1], 

130 ... "d": ["2", "2", "2", "2"], 

131 ... } 

132 ... ) 

133 ... ) 

134 >>> 

135 >>> # Check 

136 >>> df.show() 

137 ``` 

138 <div class="result" markdown> 

139 ```{.txt .text title="Terminal"} 

140 +---+---+---+---+ 

141 | a | b | c | d | 

142 +---+---+---+---+ 

143 | 1 | a | 1 | 2 | 

144 | 2 | b | 1 | 2 | 

145 | 3 | c | 1 | 2 | 

146 | 4 | d | 1 | 2 | 

147 +---+---+---+---+ 

148 ``` 

149 </div> 

150 

151 ```{.py .python linenums="1" title="Example 1: Basic usage"} 

152 >>> new_df = add_key_from_columns(df, ["a", "b"]) 

153 >>> new_df.show() 

154 ``` 

155 <div class="result" markdown> 

156 ```{.txt .text title="Terminal"} 

157 +---+---+---+---+---------+ 

158 | a | b | c | d | key_A_B | 

159 +---+---+---+---+---------+ 

160 | 1 | a | 1 | 2 | 1_a | 

161 | 2 | b | 1 | 2 | 2_b | 

162 | 3 | c | 1 | 2 | 3_c | 

163 | 4 | d | 1 | 2 | 4_d | 

164 +---+---+---+---+---------+ 

165 ``` 

166 !!! success "Conclusion: Successfully added new key column to DataFrame." 

167 </div> 

168 

169 ```{.py .python linenums="1" title="Example 2: Single column"} 

170 >>> new_df = add_key_from_columns(df, "a") 

171 >>> new_df.show() 

172 ``` 

173 <div class="result" markdown> 

174 ```{.txt .text title="Terminal"} 

175 +---+---+---+---+-------+ 

176 | a | b | c | d | key_A | 

177 +---+---+---+---+-------+ 

178 | 1 | a | 1 | 2 | 1 | 

179 | 2 | b | 1 | 2 | 2 | 

180 | 3 | c | 1 | 2 | 3 | 

181 | 4 | d | 1 | 2 | 4 | 

182 +---+---+---+---+-------+ 

183 ``` 

184 !!! success "Conclusion: Successfully added new key column to DataFrame." 

185 </div> 

186 

187 ```{.py .python linenums="1" title="Example 3: New name"} 

188 >>> new_df = add_key_from_columns(df, ["a", "b"], "new_key") 

189 >>> new_df.show() 

190 ``` 

191 <div class="result" markdown> 

192 ```{.txt .text title="Terminal"} 

193 +---+---+---+---+---------+ 

194 | a | b | c | d | new_key | 

195 +---+---+---+---+---------+ 

196 | 1 | a | 1 | 2 | 1_a | 

197 | 2 | b | 1 | 2 | 2_b | 

198 | 3 | c | 1 | 2 | 3_c | 

199 | 4 | d | 1 | 2 | 4_d | 

200 +---+---+---+---+---------+ 

201 ``` 

202 !!! success "Conclusion: Successfully added new key column to DataFrame." 

203 </div> 

204 

205 ```{.py .python linenums="1" title="Example 4: Raise error"} 

206 >>> new_df = add_key_from_columns(df, ["a", "x"]) 

207 ``` 

208 <div class="result" markdown> 

209 ```{.txt .text title="Terminal"} 

210 Attribute Error: Columns ["x"] do not exist in "dataframe". Try one of: ["a", "b", "c", "d"]. 

211 ``` 

212 !!! failure "Conclusion: Invalid column selection." 

213 </div> 

214 """ 

215 columns = [columns] if is_type(columns, str) else columns 

216 assert_columns_exists(dataframe, columns) 

217 join_character = join_character or "" 

218 key_name = key_name or f"key_{'_'.join([col.upper() for col in columns])}" 

219 return dataframe.withColumn( 

220 key_name, 

221 F.concat_ws(join_character, *columns), 

222 ) 

223 

224 

225@typechecked 

226def add_keys_from_columns( 

227 dataframe: psDataFrame, 

228 collection_of_columns: Union[ 

229 tuple[Union[str, str_collection], ...], 

230 list[Union[str, str_collection]], 

231 dict[str, Union[str, str_collection]], 

232 ], 

233 join_character: Optional[str] = "_", 

234) -> psDataFrame: 

235 """ 

236 !!! note "Summary" 

237 Add multiple new keys, each of which are collections of other columns. 

238 

239 ???+ abstract "Details" 

240 There are a few reasons why this functionality would be needed: 

241 

242 1. When you wanted to create a new single column to act as a combine key, derived from multiple other columns. 

243 1. When you're interacting with PowerBI, it will only allow you to create relationships on one single column, not a combination of multiple columns. 

244 1. When you're joining multiple tables together, each of them join on a different combination of different columns, and you want to make your `pyspark` joins cleaner, instead of using `#!py list`'s of multiple `#!py F.col(...)` equality checks. 

245 

246 Params: 

247 dataframe (psDataFrame): 

248 The table to be updated. 

249 collection_of_columns (Union[tuple[Union[str, str_collection], ...], [Union[str, str_collection]], dict[str, Union[str, str_collection]]]): 

250 The collection of columns to be combined together.<br> 

251 If it is a `#!py list` of `#!py list`'s of `#!py str`'s (or similar), then the key name will be derived from a concatenation of the original columns names.<br> 

252 If it's a `#!py dict` where the values are a `#!py list` of `#!py str`'s (or similar), then the column name for the new key is taken from the key of the dictionary. 

253 join_character (Optional[str], optional): 

254 The character to use to combine the columns together.<br> 

255 Defaults to `#!py "_"`. 

256 

257 Raises: 

258 TypeError: 

259 If any of the inputs parsed to the parameters of this function are not the correct type. Uses the [`@typeguard.typechecked`](https://typeguard.readthedocs.io/en/stable/api.html#typeguard.typechecked) decorator. 

260 ColumnDoesNotExistError: 

261 If any of the `#!py columns` do not exist within `#!py dataframe.columns`. 

262 

263 Returns: 

264 (psDataFrame): 

265 The updated `dataframe`. 

266 

267 ???+ example "Examples" 

268 

269 ```{.py .python linenums="1" title="Set up"} 

270 >>> # Imports 

271 >>> import pandas as pd 

272 >>> from pyspark.sql import SparkSession 

273 >>> from toolbox_pyspark.types import get_column_types 

274 >>> 

275 >>> # Instantiate Spark 

276 >>> spark = SparkSession.builder.getOrCreate() 

277 >>> 

278 >>> # Create data 

279 >>> df = spark.createDataFrame( 

280 ... pd.DataFrame( 

281 ... { 

282 ... "a": [1, 2, 3, 4], 

283 ... "b": ["a", "b", "c", "d"], 

284 ... "c": [1, 1, 1, 1], 

285 ... "d": ["2", "2", "2", "2"], 

286 ... } 

287 ... ) 

288 ... ) 

289 >>> 

290 >>> # Check 

291 >>> df.show() 

292 ``` 

293 <div class="result" markdown> 

294 ```{.txt .text title="Terminal"} 

295 +---+---+---+---+ 

296 | a | b | c | d | 

297 +---+---+---+---+ 

298 | 1 | a | 1 | 2 | 

299 | 2 | b | 1 | 2 | 

300 | 3 | c | 1 | 2 | 

301 | 4 | d | 1 | 2 | 

302 +---+---+---+---+ 

303 ``` 

304 </div> 

305 

306 ```{.py .python linenums="1" title="Example 1: Basic usage"} 

307 >>> new_df = add_keys_from_columns(df, [["a", "b"], ["b", "c"]]) 

308 >>> new_df.show() 

309 ``` 

310 <div class="result" markdown> 

311 ```{.txt .text title="Terminal"} 

312 +---+---+---+---+---------+---------+ 

313 | a | b | c | d | key_A_B | key_B_C | 

314 +---+---+---+---+---------+---------+ 

315 | 1 | a | 1 | 2 | 1_a | a_1 | 

316 | 2 | b | 1 | 2 | 2_b | b_1 | 

317 | 3 | c | 1 | 2 | 3_c | c_1 | 

318 | 4 | d | 1 | 2 | 4_d | d_1 | 

319 +---+---+---+---+---------+---------+ 

320 ``` 

321 !!! success "Conclusion: Successfully added two new key columns to DataFrame." 

322 </div> 

323 

324 ```{.py .python linenums="1" title="Example 2: Created from dict"} 

325 >>> new_df = add_keys_from_columns(df, {"first": ["a", "b"], "second": ["b", "c"]]) 

326 >>> new_df.show() 

327 ``` 

328 <div class="result" markdown> 

329 ```{.txt .text title="Terminal"} 

330 +---+---+---+---+-------+--------+ 

331 | a | b | c | d | first | second | 

332 +---+---+---+---+-------+--------+ 

333 | 1 | a | 1 | 2 | 1_a | a_1 | 

334 | 2 | b | 1 | 2 | 2_b | b_1 | 

335 | 3 | c | 1 | 2 | 3_c | c_1 | 

336 | 4 | d | 1 | 2 | 4_d | d_1 | 

337 +---+---+---+---+-------+--------+ 

338 ``` 

339 !!! success "Conclusion: Successfully added two new key columns to DataFrame." 

340 </div> 

341 """ 

342 join_character = join_character or "" 

343 if is_type(collection_of_columns, dict): 

344 for key_name, columns in collection_of_columns.items(): 

345 dataframe = add_key_from_columns(dataframe, columns, join_character, key_name) 

346 elif is_type(collection_of_columns, (tuple, list)): 

347 for columns in collection_of_columns: 

348 dataframe = add_key_from_columns(dataframe, columns, join_character) 

349 return dataframe