Coverage for manila/tests/share/drivers/dell_emc/plugins/powerflex/test_object_manager.py: 100%

262 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2023 Dell Inc. or its subsidiaries. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16from http import client as http_client 

17import json 

18import os 

19 

20import ddt 

21import requests_mock 

22 

23from manila import exception 

24from manila.share.drivers.dell_emc.plugins.powerflex import ( 

25 object_manager as manager 

26) 

27from manila import test 

28 

29 

30@ddt.ddt 

31class StorageObjectManagerTestCase(test.TestCase): 

32 def setUp(self): 

33 super(StorageObjectManagerTestCase, self).setUp() 

34 

35 self._mock_url = "https://192.168.0.110:443" 

36 self.manager = manager.StorageObjectManager( 

37 self._mock_url, username="admin", password="pwd", export_path=None 

38 ) 

39 self.mockup_file_base = os.path.join( 

40 os.path.dirname(os.path.realpath(__file__)), 'mockup') 

41 

42 @ddt.data(False, True) 

43 def test__get_headers(self, got_token): 

44 self.manager.got_token = got_token 

45 self.manager.rest_token = "token_str" 

46 self.assertEqual( 

47 self.manager._get_headers().get("Authorization") is not None, 

48 got_token, 

49 ) 

50 

51 def _getJsonFile(self, filename): 

52 f = open(os.path.join(self.mockup_file_base, filename)) 

53 data = json.load(f) 

54 f.close() 

55 return data 

56 

57 @requests_mock.mock() 

58 def test_get_nas_server_id(self, m): 

59 nas_server = "env8nasserver" 

60 self.assertEqual(0, len(m.request_history)) 

61 self._add_get_nas_server_id_response( 

62 m, nas_server, self._getJsonFile("get_nas_server_id_response.json") 

63 ) 

64 id = self.manager.get_nas_server_id(nas_server) 

65 self.assertEqual(id, "64132f37-d33e-9d4a-89ba-d625520a4779") 

66 

67 def _add_get_nas_server_id_response(self, m, nas_server, json_str): 

68 url = "{0}/rest/v1/nas-servers?select=id&name=eq.{1}".format( 

69 self._mock_url, nas_server 

70 ) 

71 m.get(url, status_code=200, json=json_str) 

72 

73 @requests_mock.mock() 

74 def test_create_filesystem(self, m): 

75 nas_server = "env8nasserver" 

76 self.assertEqual(0, len(m.request_history)) 

77 self._add_get_nas_server_id_response( 

78 m, nas_server, self._getJsonFile("get_nas_server_id_response.json") 

79 ) 

80 storage_pool_id = "8515fee00000000" 

81 self._add_create_filesystem_response( 

82 m, self._getJsonFile("create_filesystem_response.json") 

83 ) 

84 id = self.manager.create_filesystem( 

85 storage_pool_id, 

86 nas_server, 

87 name="Manila-filesystem", 

88 size=3221225472, 

89 ) 

90 self.assertEqual(id, "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3") 

91 

92 def _add_create_filesystem_response(self, m, json_str): 

93 url = "{0}/rest/v1/file-systems".format(self._mock_url) 

94 m.post(url, status_code=201, json=json_str) 

95 

96 @requests_mock.mock() 

97 def test_create_nfs_export(self, m): 

98 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3" 

99 name = "Manila-UT-filesystem" 

100 self.assertEqual(0, len(m.request_history)) 

101 self._add_create_nfs_export_response( 

102 m, self._getJsonFile("create_nfs_export_response.json") 

103 ) 

104 id = self.manager.create_nfs_export(filesystem_id, name) 

105 self.assertEqual(id, "6433a2b2-6d60-f737-9f3b-2a50fb1ccff3") 

106 

107 def _add_create_nfs_export_response(self, m, json_str): 

108 url = "{0}/rest/v1/nfs-exports".format(self._mock_url) 

109 m.post(url, status_code=201, json=json_str) 

110 

111 @requests_mock.mock() 

112 def test_delete_filesystem(self, m): 

113 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3" 

114 self.assertEqual(0, len(m.request_history)) 

115 self._add_delete_filesystem_response(m, filesystem_id) 

116 result = self.manager.delete_filesystem(filesystem_id) 

117 self.assertEqual(result, True) 

118 

119 def _add_delete_filesystem_response(self, m, filesystem_id): 

120 url = "{0}/rest/v1/file-systems/{1}".format( 

121 self._mock_url, filesystem_id 

122 ) 

123 m.delete(url, status_code=204) 

124 

125 @requests_mock.mock() 

126 def test_create_snapshot(self, m): 

127 name = "Manila-UT-filesystem-snap" 

128 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3" 

129 self.assertEqual(0, len(m.request_history)) 

130 self._add_create_snapshot_response( 

131 m, 

132 filesystem_id, 

133 self._getJsonFile("create_nfs_snapshot_response.json"), 

134 ) 

135 result = self.manager.create_snapshot(name, filesystem_id) 

136 self.assertEqual(result, True) 

137 

138 def _add_create_snapshot_response(self, m, filesystem_id, json_str): 

139 url = "{0}/rest/v1/file-systems/{1}/snapshot".format( 

140 self._mock_url, filesystem_id 

141 ) 

142 m.post(url, status_code=201, json=json_str) 

143 

144 @requests_mock.mock() 

145 def test_get_nfs_export_name(self, m): 

146 export_id = "6433a2b2-6d60-f737-9f3b-2a50fb1ccff3" 

147 self.assertEqual(0, len(m.request_history)) 

148 self._add_get_nfs_export_name_response( 

149 m, 

150 export_id, 

151 self._getJsonFile("get_nfs_export_name_response.json"), 

152 ) 

153 name = self.manager.get_nfs_export_name(export_id) 

154 self.assertEqual(name, "Manila-UT-filesystem") 

155 

156 def _add_get_nfs_export_name_response(self, m, export_id, json_str): 

157 url = "{0}/rest/v1/nfs-exports/{1}?select=*".format( 

158 self._mock_url, export_id 

159 ) 

160 m.get(url, status_code=200, json=json_str) 

161 

162 @requests_mock.mock() 

163 def test_get_filesystem_id(self, m): 

164 name = "Manila-UT-filesystem" 

165 self.assertEqual(0, len(m.request_history)) 

166 self._add_get_filesystem_id_response( 

167 m, name, self._getJsonFile("get_fileystem_id_response.json") 

168 ) 

169 id = self.manager.get_filesystem_id(name) 

170 self.assertEqual(id, "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3") 

171 

172 def _add_get_filesystem_id_response(self, m, name, json_str): 

173 url = "{0}/rest/v1/file-systems?select=id&name=eq.{1}".format( 

174 self._mock_url, name 

175 ) 

176 m.get(url, status_code=200, json=json_str) 

177 

178 @requests_mock.mock() 

179 def test_get_nfs_export_id(self, m): 

180 name = "Manila-UT-filesystem" 

181 self.assertEqual(0, len(m.request_history)) 

182 self._add_get_nfs_export_id_response( 

183 m, name, self._getJsonFile("get_nfs_export_id_response.json") 

184 ) 

185 id = self.manager.get_nfs_export_id(name) 

186 self.assertEqual(id, "6433a2b2-6d60-f737-9f3b-2a50fb1ccff3") 

187 

188 def _add_get_nfs_export_id_response(self, m, name, json_str): 

189 url = "{0}/rest/v1/nfs-exports?select=id&name=eq.{1}".format( 

190 self._mock_url, name 

191 ) 

192 m.get(url, status_code=200, json=json_str) 

193 

194 @requests_mock.mock() 

195 def test_get_storage_pool_id(self, m): 

196 protection_domain_name = "Env8-PD-1" 

197 storage_pool_name = "Env8-SP-SW_SSD-1" 

198 self.assertEqual(0, len(m.request_history)) 

199 self._add_get_storage_pool_id_response( 

200 m, self._getJsonFile("get_storage_pool_id_response.json") 

201 ) 

202 id = self.manager.get_storage_pool_id( 

203 protection_domain_name, storage_pool_name 

204 ) 

205 self.assertEqual(id, "28515fee00000000") 

206 

207 def _add_get_storage_pool_id_response(self, m, json_str): 

208 url = "{0}/api/types/StoragePool/instances/action/queryIdByKey".format( 

209 self._mock_url 

210 ) 

211 m.post(url, status_code=200, json=json_str) 

212 

213 @requests_mock.mock() 

214 def test_set_export_access(self, m): 

215 export_id = "6433a2b2-6d60-f737-9f3b-2a50fb1ccff3" 

216 rw_hosts = "192.168.1.110" 

217 ro_hosts = "192.168.1.111" 

218 self.assertEqual(0, len(m.request_history)) 

219 self._add_set_export_access_response(m, export_id) 

220 result = self.manager.set_export_access(export_id, rw_hosts, ro_hosts) 

221 self.assertEqual(result, True) 

222 

223 def _add_set_export_access_response(self, m, export_id): 

224 url = "{0}/rest/v1/nfs-exports/{1}".format(self._mock_url, export_id) 

225 m.patch(url, status_code=204) 

226 

227 @requests_mock.mock() 

228 def test_extend_export(self, m): 

229 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3" 

230 new_size = 6441225472 

231 self.assertEqual(0, len(m.request_history)) 

232 self._add_extend_export_response(m, filesystem_id) 

233 result = self.manager.extend_export(filesystem_id, new_size) 

234 self.assertEqual(result, True) 

235 

236 def _add_extend_export_response(self, m, filesystem_id): 

237 url = "{0}/rest/v1/file-systems/{1}".format( 

238 self._mock_url, filesystem_id 

239 ) 

240 m.patch(url, status_code=204) 

241 

242 @requests_mock.mock() 

243 def test_get_fsid_from_export_name(self, m): 

244 name = "Manila-UT-filesystem" 

245 self.assertEqual(0, len(m.request_history)) 

246 self._add_get_fsid_from_export_name_response( 

247 m, 

248 name, 

249 self._getJsonFile("get_fsid_from_export_name_response.json"), 

250 ) 

251 id = self.manager.get_fsid_from_export_name(name) 

252 self.assertEqual(id, "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3") 

253 

254 def _add_get_fsid_from_export_name_response(self, m, name, json_str): 

255 url = ( 

256 "{0}/rest/v1/nfs-exports?select=file_system_id&name=eq.{1}".format( 

257 self._mock_url, name 

258 ) 

259 ) 

260 m.get(url, status_code=200, json=json_str) 

261 

262 @requests_mock.mock() 

263 def test_get_fsid_from_snapshot_name(self, m): 

264 snapshot_name = "Manila-UT-filesystem-snap" 

265 self.assertEqual(0, len(m.request_history)) 

266 self._add_get_fsid_from_snapshot_name_response( 

267 m, 

268 snapshot_name, 

269 self._getJsonFile("get_fsid_from_snapshot_name_response.json"), 

270 ) 

271 id = self.manager.get_fsid_from_snapshot_name(snapshot_name) 

272 self.assertEqual(id, "6433b635-6c1f-878e-6467-2a50fb1ccff3") 

273 

274 def _add_get_fsid_from_snapshot_name_response( 

275 self, m, snapshot_name, json_str 

276 ): 

277 url = "{0}/rest/v1/file-systems?select=id&name=eq.{1}".format( 

278 self._mock_url, snapshot_name 

279 ) 

280 m.get(url, status_code=200, json=json_str) 

281 

282 @requests_mock.mock() 

283 def test_check_response_with_login_get(self, m): 

284 nas_server = "env8nasserver" 

285 self.assertEqual(0, len(m.request_history)) 

286 self._add_get_nas_server_id_response_list(m, nas_server) 

287 self._add_login_success_response(m) 

288 id = self.manager.get_nas_server_id(nas_server) 

289 self.assertEqual(id, "64132f37-d33e-9d4a-89ba-d625520a4779") 

290 

291 def _add_get_nas_server_id_response_list(self, m, nas_server): 

292 url = "{0}/rest/v1/nas-servers?select=id&name=eq.{1}".format( 

293 self._mock_url, nas_server 

294 ) 

295 m.get( 

296 url, 

297 [ 

298 {"status_code": http_client.UNAUTHORIZED}, 

299 { 

300 "status_code": 200, 

301 "json": self._getJsonFile( 

302 "get_nas_server_id_response.json" 

303 ), 

304 }, 

305 ], 

306 ) 

307 

308 def _add_login_success_response(self, m): 

309 url = "{0}/rest/auth/login".format(self._mock_url) 

310 m.post( 

311 url, status_code=200, json=self._getJsonFile("login_response.json") 

312 ) 

313 

314 @requests_mock.mock() 

315 def test_check_response_with_login_post(self, m): 

316 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3" 

317 name = "Manila-UT-filesystem" 

318 self.assertEqual(0, len(m.request_history)) 

319 self._add_create_nfs_export_response_list(m) 

320 self._add_login_success_response(m) 

321 id = self.manager.create_nfs_export(filesystem_id, name) 

322 self.assertEqual(id, "6433a2b2-6d60-f737-9f3b-2a50fb1ccff3") 

323 

324 def _add_create_nfs_export_response_list(self, m): 

325 url = "{0}/rest/v1/nfs-exports".format(self._mock_url) 

326 m.post( 

327 url, 

328 [ 

329 {"status_code": http_client.UNAUTHORIZED}, 

330 { 

331 "status_code": 201, 

332 "json": self._getJsonFile( 

333 "create_nfs_export_response.json" 

334 ), 

335 }, 

336 ], 

337 ) 

338 

339 @requests_mock.mock() 

340 def test_check_response_with_login_delete(self, m): 

341 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3" 

342 self.assertEqual(0, len(m.request_history)) 

343 self._add_delete_filesystem_response_list(m, filesystem_id) 

344 self._add_login_success_response(m) 

345 result = self.manager.delete_filesystem(filesystem_id) 

346 self.assertEqual(result, True) 

347 

348 def _add_delete_filesystem_response_list(self, m, filesystem_id): 

349 url = "{0}/rest/v1/file-systems/{1}".format( 

350 self._mock_url, filesystem_id 

351 ) 

352 m.delete( 

353 url, 

354 [{"status_code": http_client.UNAUTHORIZED}, {"status_code": 204}], 

355 ) 

356 

357 @requests_mock.mock() 

358 def test_check_response_with_login_patch(self, m): 

359 filesystem_id = "6432b79e-1cc3-0414-3ffd-2a50fb1ccff3" 

360 new_size = 6441225472 

361 self.assertEqual(0, len(m.request_history)) 

362 self._add_extend_export_response_list(m, filesystem_id) 

363 self._add_login_success_response(m) 

364 result = self.manager.extend_export(filesystem_id, new_size) 

365 self.assertEqual(result, True) 

366 

367 def _add_extend_export_response_list(self, m, filesystem_id): 

368 url = "{0}/rest/v1/file-systems/{1}".format( 

369 self._mock_url, filesystem_id 

370 ) 

371 m.patch( 

372 url, 

373 [{"status_code": http_client.UNAUTHORIZED}, {"status_code": 204}], 

374 ) 

375 

376 @requests_mock.mock() 

377 def test_check_response_with_invalid_credential(self, m): 

378 nas_server = "env8nasserver" 

379 self.assertEqual(0, len(m.request_history)) 

380 self._add_get_nas_server_id_unauthorized_response(m, nas_server) 

381 self._add_login_fail_response(m) 

382 self.assertRaises( 

383 exception.NotAuthorized, self.manager.get_nas_server_id, nas_server 

384 ) 

385 

386 def _add_get_nas_server_id_unauthorized_response(self, m, nas_server): 

387 url = "{0}/rest/v1/nas-servers?select=id&name=eq.{1}".format( 

388 self._mock_url, nas_server 

389 ) 

390 m.get(url, status_code=http_client.UNAUTHORIZED) 

391 

392 def _add_login_fail_response(self, m): 

393 url = "{0}/rest/auth/login".format(self._mock_url) 

394 m.post(url, status_code=http_client.UNAUTHORIZED) 

395 

396 @requests_mock.mock() 

397 def test_execute_powerflex_post_request_with_no_param(self, m): 

398 url = self._mock_url + "/fake_url" 

399 self.assertEqual(0, len(m.request_history)) 

400 m.post(url, status_code=201) 

401 res, response = self.manager.execute_powerflex_post_request(url) 

402 self.assertEqual(res.status_code, 201) 

403 

404 @requests_mock.mock() 

405 def test_execute_powerflex_patch_request_with_no_param(self, m): 

406 url = self._mock_url + "/fake_url" 

407 self.assertEqual(0, len(m.request_history)) 

408 m.patch(url, status_code=204) 

409 res = self.manager.execute_powerflex_patch_request(url) 

410 self.assertEqual(res.status_code, 204) 

411 

412 @requests_mock.mock() 

413 def test_get_storage_pool_spare_percentage(self, m): 

414 storage_pool_id = "28515fee00000000" 

415 self.assertEqual(0, len(m.request_history)) 

416 self._add_get_storage_pool_spare_percentage( 

417 m, 

418 storage_pool_id, 

419 self._getJsonFile("get_storage_pool_spare_percentage.json"), 

420 ) 

421 spare = self.manager.get_storage_pool_spare_percentage(storage_pool_id) 

422 self.assertEqual(spare, 34) 

423 

424 def _add_get_storage_pool_spare_percentage(self, m, storage_pool_id, 

425 json_str): 

426 url = ( 

427 "{0}/api/instances/StoragePool::{1}".format( 

428 self._mock_url, storage_pool_id 

429 ) 

430 ) 

431 m.get(url, status_code=200, json=json_str) 

432 

433 @requests_mock.mock() 

434 def test_get_storage_pool_statistic(self, m): 

435 storage_pool_id = "28515fee00000000" 

436 self.assertEqual(0, len(m.request_history)) 

437 self._add_get_storage_pool_statistic( 

438 m, 

439 storage_pool_id, 

440 self._getJsonFile("get_storage_pool_statistic.json"), 

441 ) 

442 statistic = self.manager.get_storage_pool_statistic(storage_pool_id) 

443 self.assertEqual(statistic['maxCapacityInKb'], 4826330112) 

444 self.assertEqual(statistic['capacityInUseInKb'], 53217280) 

445 self.assertEqual(statistic['netUnusedCapacityInKb'], 1566080512) 

446 self.assertEqual(statistic['primaryVacInKb'], 184549376) 

447 

448 def _add_get_storage_pool_statistic(self, m, storage_pool_id, 

449 json_str): 

450 url = ( 

451 ("{0}/api/instances/StoragePool::{1}/relationships/" + 

452 "Statistics").format( 

453 self._mock_url, storage_pool_id 

454 ) 

455 ) 

456 m.get(url, status_code=200, json=json_str)