Coverage for manila/tests/share/drivers/dell_emc/plugins/powerstore/test_connection.py: 100%

342 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2023 Dell Inc. or its subsidiaries. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16 

17from unittest import mock 

18 

19from oslo_log import log 

20from oslo_utils import units 

21 

22from manila.common import constants as const 

23from manila import exception 

24from manila.share.drivers.dell_emc.plugins.powerstore import connection 

25from manila import test 

26 

27LOG = log.getLogger(__name__) 

28 

29 

30class PowerStoreTest(test.TestCase): 

31 """Unit test for the PowerStore Manila driver.""" 

32 

33 REST_IP = "192.168.0.110" 

34 NAS_SERVER_NAME = "powerstore-nasserver" 

35 NAS_SERVER_ID = "6423d56e-eaf3-7424-be0b-1a9efb93188b" 

36 NAS_SERVER_IP = "192.168.11.23" 

37 SHARE_NAME = "powerstore-share" 

38 SHARE_SIZE_GB = 3 

39 SHARE_NEW_SIZE_GB = 6 

40 FILESYSTEM_ID = "6454e9a9-a698-e9bc-ca61-1a9efb93188b" 

41 NFS_EXPORT_ID = "6454ec18-7b8d-1532-1b8a-1a9efb93188b" 

42 SMB_SHARE_ID = "64927ae9-3403-6930-a784-f227b9987c54" 

43 RW_HOSTS = "192.168.1.10" 

44 RO_HOSTS = "192.168.1.11" 

45 RW_USERS = "user_1" 

46 RO_USERS = "user_2" 

47 SNAPSHOT_NAME = "powerstore-share-snap" 

48 SNAPSHOT_ID = "6454ea29-09c3-030e-cfc3-1a9efb93188b" 

49 CLONE_ID = "64560f05-e677-ec2a-7fcf-1a9efb93188b" 

50 CLONE_NAME = "powerstore-nfs-share-snap-clone" 

51 

52 class MockConfig(object): 

53 def safe_get(self, value): 

54 if value == "dell_nas_backend_host": 

55 return "192.168.0.110" 

56 elif value == "dell_nas_login": 

57 return "admin" 

58 elif value == "dell_nas_password": 

59 return "pwd" 

60 elif value == "dell_nas_server": 

61 return "powerstore-nasserver" 

62 elif value == "dell_ad_domain": 

63 return "domain_name" 

64 elif value == "dell_ssl_cert_verify": 

65 return True 

66 elif value == "dell_ssl_cert_path": 

67 return "powerstore_cert_path" 

68 

69 @mock.patch( 

70 "manila.share.drivers.dell_emc.plugins.powerstore.client." 

71 "PowerStoreClient", 

72 autospec=True, 

73 ) 

74 def setUp(self, mock_powerstore_client): 

75 super(PowerStoreTest, self).setUp() 

76 

77 self._mock_powerstore_client = mock_powerstore_client.return_value 

78 self.storage_connection = connection.PowerStoreStorageConnection(LOG) 

79 

80 self.mock_context = mock.Mock("Context") 

81 self.mock_emc_driver = mock.Mock("EmcDriver") 

82 

83 self._mock_config = self.MockConfig() 

84 self.mock_emc_driver.attach_mock(self._mock_config, "configuration") 

85 self.storage_connection.connect( 

86 self.mock_emc_driver, self.mock_context 

87 ) 

88 

89 def test_connect(self): 

90 storage_connection = connection.PowerStoreStorageConnection(LOG) 

91 

92 # execute method under test 

93 storage_connection.connect(self.mock_emc_driver, self.mock_context) 

94 

95 # verify connect sets driver params appropriately 

96 mock_config = self.MockConfig() 

97 server_addr = mock_config.safe_get("dell_nas_backend_host") 

98 self.assertEqual(server_addr, storage_connection.rest_ip) 

99 expected_username = mock_config.safe_get("dell_nas_login") 

100 self.assertEqual(expected_username, storage_connection.rest_username) 

101 expected_password = mock_config.safe_get("dell_nas_password") 

102 self.assertEqual(expected_password, storage_connection.rest_password) 

103 expected_nas_server = mock_config.safe_get("dell_nas_server") 

104 self.assertEqual(expected_nas_server, storage_connection.nas_server) 

105 expected_ad_domain = mock_config.safe_get("dell_ad_domain") 

106 self.assertEqual(expected_ad_domain, storage_connection.ad_domain) 

107 expected_verify_certificate = mock_config.safe_get( 

108 "dell_ssl_cert_verify" 

109 ) 

110 self.assertEqual( 

111 expected_verify_certificate, storage_connection.verify_certificate 

112 ) 

113 

114 def test_create_share_nfs(self): 

115 self._mock_powerstore_client.get_nas_server_id.return_value = ( 

116 self.NAS_SERVER_ID 

117 ) 

118 self._mock_powerstore_client.create_filesystem.return_value = ( 

119 self.FILESYSTEM_ID 

120 ) 

121 self._mock_powerstore_client.create_nfs_export.return_value = ( 

122 self.NFS_EXPORT_ID 

123 ) 

124 self._mock_powerstore_client.get_nas_server_interfaces.return_value = ( 

125 [{"ip": self.NAS_SERVER_IP, "preferred": True}] 

126 ) 

127 

128 self.assertFalse(self._mock_powerstore_client.get_nas_server_id.called) 

129 self.assertFalse(self._mock_powerstore_client.create_filesystem.called) 

130 self.assertFalse(self._mock_powerstore_client.create_nfs_export.called) 

131 self.assertFalse( 

132 self._mock_powerstore_client.get_nas_server_interfaces.called 

133 ) 

134 

135 # create the share 

136 share = {"name": self.SHARE_NAME, "share_proto": "NFS", 

137 "size": self.SHARE_SIZE_GB} 

138 locations = self.storage_connection.create_share( 

139 self.mock_context, 

140 share, 

141 None 

142 ) 

143 

144 # verify location and API call made 

145 expected_locations = [ 

146 {"path": "%s:/%s" % ( 

147 self.NAS_SERVER_IP, 

148 self.SHARE_NAME, 

149 ), 

150 "metadata": { 

151 "preferred": True}}] 

152 self.assertEqual(expected_locations, locations) 

153 self._mock_powerstore_client.get_nas_server_id.assert_called_with( 

154 self._mock_config.safe_get("dell_nas_server") 

155 ) 

156 self._mock_powerstore_client.create_filesystem.assert_called_with( 

157 self.NAS_SERVER_ID, 

158 self.SHARE_NAME, 

159 self.SHARE_SIZE_GB * units.Gi, 

160 ) 

161 self._mock_powerstore_client.create_nfs_export.assert_called_with( 

162 self.FILESYSTEM_ID, self.SHARE_NAME 

163 ) 

164 self._mock_powerstore_client.get_nas_server_interfaces. \ 

165 assert_called_with( 

166 self.NAS_SERVER_ID 

167 ) 

168 

169 def test_create_share_cifs(self): 

170 self._mock_powerstore_client.get_nas_server_id.return_value = ( 

171 self.NAS_SERVER_ID 

172 ) 

173 self._mock_powerstore_client.create_filesystem.return_value = ( 

174 self.FILESYSTEM_ID 

175 ) 

176 self._mock_powerstore_client.create_smb_share.return_value = ( 

177 self.SMB_SHARE_ID 

178 ) 

179 self._mock_powerstore_client.get_nas_server_interfaces.return_value = ( 

180 [{"ip": self.NAS_SERVER_IP, "preferred": True}] 

181 ) 

182 

183 self.assertFalse(self._mock_powerstore_client.get_nas_server_id.called) 

184 self.assertFalse(self._mock_powerstore_client.create_filesystem.called) 

185 self.assertFalse(self._mock_powerstore_client.create_smb_share.called) 

186 self.assertFalse( 

187 self._mock_powerstore_client.get_nas_server_interfaces.called 

188 ) 

189 

190 # create the share 

191 share = {"name": self.SHARE_NAME, "share_proto": "CIFS", 

192 "size": self.SHARE_SIZE_GB} 

193 locations = self.storage_connection.create_share( 

194 self.mock_context, 

195 share, 

196 None 

197 ) 

198 

199 # verify location and API call made 

200 expected_locations = [ 

201 {"path": "\\\\%s\\%s" % ( 

202 self.NAS_SERVER_IP, 

203 self.SHARE_NAME), 

204 "metadata": { 

205 "preferred": True}}] 

206 self.assertEqual(expected_locations, locations) 

207 self._mock_powerstore_client.get_nas_server_id.assert_called_with( 

208 self._mock_config.safe_get("dell_nas_server") 

209 ) 

210 self._mock_powerstore_client.create_filesystem.assert_called_with( 

211 self.NAS_SERVER_ID, 

212 self.SHARE_NAME, 

213 self.SHARE_SIZE_GB * units.Gi, 

214 ) 

215 self._mock_powerstore_client.create_smb_share.assert_called_with( 

216 self.FILESYSTEM_ID, self.SHARE_NAME 

217 ) 

218 self._mock_powerstore_client.get_nas_server_interfaces. \ 

219 assert_called_with( 

220 self.NAS_SERVER_ID 

221 ) 

222 

223 def test_create_share_filesystem_id_not_found(self): 

224 share = {"name": self.SHARE_NAME, "share_proto": "NFS", 

225 "size": self.SHARE_SIZE_GB} 

226 self._mock_powerstore_client.create_filesystem.return_value = None 

227 

228 self.assertRaises( 

229 exception.ShareBackendException, 

230 self.storage_connection.create_share, 

231 self.mock_context, 

232 share, 

233 share_server=None 

234 ) 

235 

236 def test_create_share_nfs_backend_failure(self): 

237 share = {"name": self.SHARE_NAME, "share_proto": "NFS", 

238 "size": self.SHARE_SIZE_GB} 

239 self._mock_powerstore_client.create_nfs_export.return_value = None 

240 

241 self.assertRaises( 

242 exception.ShareBackendException, 

243 self.storage_connection.create_share, 

244 self.mock_context, 

245 share, 

246 share_server=None 

247 ) 

248 

249 def test_create_share_cifs_backend_failure(self): 

250 share = {"name": self.SHARE_NAME, "share_proto": "CIFS", 

251 "size": self.SHARE_SIZE_GB} 

252 self._mock_powerstore_client.create_smb_share.return_value = None 

253 

254 self.assertRaises( 

255 exception.ShareBackendException, 

256 self.storage_connection.create_share, 

257 self.mock_context, 

258 share, 

259 share_server=None 

260 ) 

261 

262 def test_delete_share_nfs(self): 

263 share = {"name": self.SHARE_NAME, "share_proto": "NFS"} 

264 

265 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

266 self.FILESYSTEM_ID 

267 ) 

268 

269 self.assertFalse(self._mock_powerstore_client.get_filesystem_id.called) 

270 self.assertFalse(self._mock_powerstore_client.delete_filesystem.called) 

271 

272 # delete the share 

273 self.storage_connection.delete_share(self.mock_context, share, None) 

274 

275 # verify share delete 

276 self._mock_powerstore_client.get_filesystem_id.assert_called_with( 

277 self.SHARE_NAME 

278 ) 

279 self._mock_powerstore_client.delete_filesystem.assert_called_with( 

280 self.FILESYSTEM_ID 

281 ) 

282 

283 def test_delete_nfs_share_backend_failure(self): 

284 share = {"name": self.SHARE_NAME, "share_proto": "NFS"} 

285 

286 self._mock_powerstore_client.delete_filesystem.return_value = False 

287 self.assertRaises( 

288 exception.ShareBackendException, 

289 self.storage_connection.delete_share, 

290 self.mock_context, 

291 share, 

292 None, 

293 ) 

294 

295 def test_delete_nfs_share_share_does_not_exist(self): 

296 self._mock_powerstore_client.get_filesystem_id.return_value = None 

297 share = {"name": self.SHARE_NAME, "share_proto": "NFS"} 

298 

299 # verify the calling delete on a non-existent share returns and does 

300 # not throw exception 

301 self.storage_connection.delete_share(self.mock_context, share, None) 

302 self.assertTrue(self._mock_powerstore_client.get_filesystem_id.called) 

303 self.assertFalse(self._mock_powerstore_client.delete_filesystem.called) 

304 

305 def test_extend_share(self): 

306 share = { 

307 "name": self.SHARE_NAME, 

308 "share_proto": "NFS", 

309 "size": self.SHARE_NEW_SIZE_GB, 

310 } 

311 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

312 self.FILESYSTEM_ID 

313 ) 

314 self._mock_powerstore_client.resize_filesystem.return_value = ( 

315 True, None 

316 ) 

317 self.assertFalse(self._mock_powerstore_client.get_filesystem_id.called) 

318 

319 self.storage_connection.extend_share(share, self.SHARE_NEW_SIZE_GB, 

320 self.NAS_SERVER_NAME) 

321 

322 self._mock_powerstore_client.get_filesystem_id.assert_called_with( 

323 self.SHARE_NAME 

324 ) 

325 expected_quota_size = self.SHARE_NEW_SIZE_GB * units.Gi 

326 self._mock_powerstore_client.resize_filesystem.assert_called_once_with( 

327 self.FILESYSTEM_ID, expected_quota_size 

328 ) 

329 

330 def test_shrink_share(self): 

331 share = { 

332 "name": self.SHARE_NAME, 

333 "share_proto": "NFS", 

334 "size": self.SHARE_SIZE_GB, 

335 } 

336 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

337 self.FILESYSTEM_ID 

338 ) 

339 self._mock_powerstore_client.resize_filesystem.return_value = ( 

340 True, None 

341 ) 

342 self.assertFalse(self._mock_powerstore_client.get_filesystem_id.called) 

343 

344 self.storage_connection.shrink_share(share, self.SHARE_NEW_SIZE_GB, 

345 self.NAS_SERVER_NAME) 

346 

347 self._mock_powerstore_client.get_filesystem_id.assert_called_with( 

348 self.SHARE_NAME 

349 ) 

350 expected_quota_size = self.SHARE_NEW_SIZE_GB * units.Gi 

351 self._mock_powerstore_client.resize_filesystem.assert_called_once_with( 

352 self.FILESYSTEM_ID, expected_quota_size 

353 ) 

354 

355 def test_shrink_share_failure(self): 

356 share = { 

357 "name": self.SHARE_NAME, 

358 "share_proto": "NFS", 

359 "size": self.SHARE_SIZE_GB, 

360 "id": self.CLONE_ID 

361 } 

362 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

363 self.FILESYSTEM_ID 

364 ) 

365 self._mock_powerstore_client.resize_filesystem.return_value = ( 

366 False, "msg" 

367 ) 

368 

369 self.assertRaises( 

370 exception.ShareShrinkingPossibleDataLoss, 

371 self.storage_connection.shrink_share, 

372 share, 

373 self.SHARE_NEW_SIZE_GB, 

374 self.NAS_SERVER_NAME 

375 ) 

376 

377 def test_shrink_share_backend_failure(self): 

378 share = { 

379 "name": self.SHARE_NAME, 

380 "share_proto": "NFS", 

381 "size": self.SHARE_SIZE_GB, 

382 } 

383 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

384 self.FILESYSTEM_ID 

385 ) 

386 self._mock_powerstore_client.resize_filesystem.return_value = ( 

387 False, None 

388 ) 

389 

390 self.assertRaises( 

391 exception.ShareBackendException, 

392 self.storage_connection.shrink_share, 

393 share, 

394 self.SHARE_NEW_SIZE_GB, 

395 self.NAS_SERVER_NAME 

396 ) 

397 

398 def test_update_access_add_nfs(self): 

399 share = {"name": self.SHARE_NAME, "share_proto": "NFS"} 

400 

401 self._mock_powerstore_client.get_nfs_export_id.return_value = ( 

402 self.NFS_EXPORT_ID 

403 ) 

404 self._mock_powerstore_client.set_export_access.return_value = True 

405 

406 self.assertFalse(self._mock_powerstore_client.get_nfs_export_id.called) 

407 self.assertFalse(self._mock_powerstore_client.set_export_access.called) 

408 

409 nfs_access_rw = { 

410 "access_type": "ip", 

411 "access_to": self.RW_HOSTS, 

412 "access_level": const.ACCESS_LEVEL_RW, 

413 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08", 

414 } 

415 nfs_access_ro = { 

416 "access_type": "ip", 

417 "access_to": self.RO_HOSTS, 

418 "access_level": const.ACCESS_LEVEL_RO, 

419 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08", 

420 } 

421 access_rules = [nfs_access_rw, nfs_access_ro] 

422 

423 self.storage_connection.update_access( 

424 self.mock_context, 

425 share, 

426 access_rules, 

427 add_rules=None, 

428 delete_rules=None, 

429 share_server=None, 

430 ) 

431 

432 self._mock_powerstore_client.get_nfs_export_id.assert_called_once_with( 

433 self.SHARE_NAME 

434 ) 

435 self._mock_powerstore_client.set_export_access.assert_called_once_with( 

436 self.NFS_EXPORT_ID, {self.RW_HOSTS}, {self.RO_HOSTS} 

437 ) 

438 

439 def test_update_access_add_cifs(self): 

440 share = {"name": self.SHARE_NAME, "share_proto": "CIFS"} 

441 

442 self._mock_powerstore_client.get_smb_share_id.return_value = ( 

443 self.SMB_SHARE_ID 

444 ) 

445 self._mock_powerstore_client.set_acl.return_value = True 

446 

447 self.assertFalse(self._mock_powerstore_client.get_smb_share_id.called) 

448 self.assertFalse(self._mock_powerstore_client.set_acl.called) 

449 

450 cifs_access_rw = { 

451 "access_type": "user", 

452 "access_to": self.RW_USERS, 

453 "access_level": const.ACCESS_LEVEL_RW, 

454 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08", 

455 } 

456 cifs_access_ro = { 

457 "access_type": "user", 

458 "access_to": self.RO_USERS, 

459 "access_level": const.ACCESS_LEVEL_RO, 

460 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08", 

461 } 

462 access_rules = [cifs_access_rw, cifs_access_ro] 

463 

464 self.storage_connection.update_access( 

465 self.mock_context, 

466 share, 

467 access_rules, 

468 add_rules=None, 

469 delete_rules=None, 

470 share_server=None, 

471 ) 

472 

473 self._mock_powerstore_client.get_smb_share_id.assert_called_once_with( 

474 self.SHARE_NAME 

475 ) 

476 self._mock_powerstore_client.set_acl.assert_called_once_with( 

477 self.SMB_SHARE_ID, {'domain_name\\user_1'}, {'domain_name\\user_2'} 

478 ) 

479 

480 def test_update_access_invalid_prefix(self): 

481 share = {"name": self.SHARE_NAME, "share_proto": "CIFS"} 

482 

483 self._mock_powerstore_client.get_smb_share_id.return_value = ( 

484 self.SMB_SHARE_ID 

485 ) 

486 self._mock_powerstore_client.get_nas_server_smb_netbios. \ 

487 return_value = None 

488 

489 self.assertFalse(self._mock_powerstore_client.get_smb_share_id.called) 

490 self.assertFalse(self._mock_powerstore_client.set_acl.called) 

491 

492 cifs_access_rw = { 

493 "access_type": "user", 

494 "access_to": self.RW_USERS, 

495 "access_level": const.ACCESS_LEVEL_RW, 

496 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08", 

497 } 

498 cifs_access_ro = { 

499 "access_type": "user", 

500 "access_to": self.RO_USERS, 

501 "access_level": const.ACCESS_LEVEL_RO, 

502 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08", 

503 } 

504 access_rules = [cifs_access_rw, cifs_access_ro] 

505 

506 self.storage_connection.ad_domain = None 

507 

508 access_updates = self.storage_connection.update_access( 

509 self.mock_context, 

510 share, 

511 access_rules, 

512 add_rules=None, 

513 delete_rules=None, 

514 share_server=None, 

515 ) 

516 

517 self._mock_powerstore_client.set_acl.assert_called_once_with( 

518 self.SMB_SHARE_ID, set(), set() 

519 ) 

520 

521 self.assertIsNotNone(access_updates) 

522 

523 def test_update_access_add_nfs_invalid_acess_type(self): 

524 share = { 

525 "name": self.SHARE_NAME, 

526 "share_proto": "NFS", 

527 "display_name": "foo_display_name", 

528 } 

529 

530 nfs_access_rw = { 

531 "access_type": "invalid_type", 

532 "access_to": self.RW_HOSTS, 

533 "access_level": const.ACCESS_LEVEL_RW, 

534 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08", 

535 } 

536 nfs_access_ro = { 

537 "access_type": "invalid_type", 

538 "access_to": self.RO_HOSTS, 

539 "access_level": const.ACCESS_LEVEL_RO, 

540 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd09", 

541 } 

542 access_rules = [nfs_access_rw, nfs_access_ro] 

543 

544 self._mock_powerstore_client.get_nfs_export_id.return_value = ( 

545 self.NFS_EXPORT_ID 

546 ) 

547 

548 access_updates = self.storage_connection.update_access( 

549 self.mock_context, 

550 share, 

551 access_rules, 

552 add_rules=None, 

553 delete_rules=None, 

554 share_server=None, 

555 ) 

556 

557 self._mock_powerstore_client.set_export_access.assert_called_once_with( 

558 self.NFS_EXPORT_ID, set(), set() 

559 ) 

560 

561 self.assertIsNotNone(access_updates) 

562 

563 def test_update_access_add_cifs_invalid_acess_type(self): 

564 share = { 

565 "name": self.SHARE_NAME, 

566 "share_proto": "CIFS", 

567 "display_name": "foo_display_name", 

568 } 

569 

570 cifs_access_rw = { 

571 "access_type": "invalid_type", 

572 "access_to": self.RW_USERS, 

573 "access_level": const.ACCESS_LEVEL_RW, 

574 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08", 

575 } 

576 cifs_access_ro = { 

577 "access_type": "invalid_type", 

578 "access_to": self.RO_USERS, 

579 "access_level": const.ACCESS_LEVEL_RO, 

580 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd09", 

581 } 

582 access_rules = [cifs_access_rw, cifs_access_ro] 

583 

584 self._mock_powerstore_client.get_smb_share_id.return_value = ( 

585 self.SMB_SHARE_ID 

586 ) 

587 

588 access_updates = self.storage_connection.update_access( 

589 self.mock_context, 

590 share, 

591 access_rules, 

592 add_rules=None, 

593 delete_rules=None, 

594 share_server=None, 

595 ) 

596 

597 self._mock_powerstore_client.set_acl.assert_called_once_with( 

598 self.SMB_SHARE_ID, set(), set() 

599 ) 

600 

601 self.assertIsNotNone(access_updates) 

602 

603 def test_update_access_add_nfs_backend_failure(self): 

604 share = { 

605 "name": self.SHARE_NAME, 

606 "share_proto": "NFS", 

607 "display_name": "foo_display_name", 

608 } 

609 

610 self._mock_powerstore_client.get_nfs_export_id.return_value = ( 

611 self.NFS_EXPORT_ID 

612 ) 

613 self._mock_powerstore_client.set_export_access.return_value = False 

614 

615 self.assertFalse(self._mock_powerstore_client.get_nfs_export_id.called) 

616 self.assertFalse(self._mock_powerstore_client.set_export_access.called) 

617 

618 nfs_access_rw = { 

619 "access_type": "ip", 

620 "access_to": self.RW_HOSTS, 

621 "access_level": const.ACCESS_LEVEL_RW, 

622 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08", 

623 } 

624 nfs_access_ro = { 

625 "access_type": "ip", 

626 "access_to": self.RO_HOSTS, 

627 "access_level": const.ACCESS_LEVEL_RO, 

628 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08", 

629 } 

630 access_rules = [nfs_access_rw, nfs_access_ro] 

631 

632 self.assertRaises( 

633 exception.ShareBackendException, 

634 self.storage_connection.update_access, 

635 self.mock_context, 

636 share, 

637 access_rules, 

638 add_rules=None, 

639 delete_rules=None, 

640 share_server=None, 

641 ) 

642 

643 def test_update_access_add_cifs_backend_failure(self): 

644 share = { 

645 "name": self.SHARE_NAME, 

646 "share_proto": "CIFS", 

647 "display_name": "foo_display_name", 

648 } 

649 

650 self._mock_powerstore_client.set_acl.return_value = False 

651 

652 cifs_access_rw = { 

653 "access_type": "user", 

654 "access_to": self.RW_USERS, 

655 "access_level": const.ACCESS_LEVEL_RW, 

656 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08", 

657 } 

658 cifs_access_ro = { 

659 "access_type": "user", 

660 "access_to": self.RO_USERS, 

661 "access_level": const.ACCESS_LEVEL_RO, 

662 "access_id": "09960614-8574-4e03-89cf-7cf267b0bd08", 

663 } 

664 access_rules = [cifs_access_rw, cifs_access_ro] 

665 

666 self.assertRaises( 

667 exception.ShareBackendException, 

668 self.storage_connection.update_access, 

669 self.mock_context, 

670 share, 

671 access_rules, 

672 add_rules=None, 

673 delete_rules=None, 

674 share_server=None, 

675 ) 

676 

677 def test_allow_access(self): 

678 self.assertRaises( 

679 NotImplementedError, 

680 self.storage_connection.allow_access, 

681 self.mock_context, 

682 share=None, 

683 access=None, 

684 share_server=None, 

685 ) 

686 

687 def test_deny_access(self): 

688 self.assertRaises( 

689 NotImplementedError, 

690 self.storage_connection.deny_access, 

691 self.mock_context, 

692 share=None, 

693 access=None, 

694 share_server=None, 

695 ) 

696 

697 def test_update_share_stats(self): 

698 data = dict( 

699 share_backend_name='powerstore', 

700 vendor_name='Dell EMC', 

701 storage_protocol='NFS_CIFS', 

702 snapshot_support=True, 

703 create_share_from_snapshot_support=True) 

704 self._mock_powerstore_client.get_cluster_id.return_value = "0" 

705 self._mock_powerstore_client. \ 

706 retreive_cluster_capacity_metrics.return_value = \ 

707 47345047046144, 366003363027 

708 self.storage_connection.update_share_stats(data) 

709 self.assertEqual(data['storage_protocol'], 'NFS_CIFS') 

710 self.assertEqual(data['driver_version'], connection.VERSION) 

711 self.assertEqual(data['total_capacity_gb'], 44093) 

712 self.assertEqual(data['free_capacity_gb'], 43752) 

713 

714 def test_update_share_stats_failure(self): 

715 data = dict( 

716 share_backend_name='powerstore', 

717 vendor_name='Dell EMC', 

718 storage_protocol='NFS_CIFS', 

719 snapshot_support=True, 

720 create_share_from_snapshot_support=True) 

721 self._mock_powerstore_client. \ 

722 retreive_cluster_capacity_metrics.return_value = \ 

723 None, None 

724 self.storage_connection.update_share_stats(data) 

725 self.assertIsNone(data.get('total_capacity_gb')) 

726 self.assertIsNone(data.get('free_capacity_gb')) 

727 

728 def test_create_snapshot(self): 

729 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

730 self.FILESYSTEM_ID 

731 ) 

732 self._mock_powerstore_client. \ 

733 create_snapshot.return_value = self.SNAPSHOT_ID 

734 

735 snapshot = { 

736 "name": self.SNAPSHOT_NAME, 

737 "share_name": self.SHARE_NAME 

738 } 

739 self.storage_connection.create_snapshot( 

740 self.mock_context, snapshot, None 

741 ) 

742 

743 self._mock_powerstore_client.get_filesystem_id. \ 

744 assert_called_with( 

745 self.SHARE_NAME 

746 ) 

747 self._mock_powerstore_client.create_snapshot.assert_called_with( 

748 self.FILESYSTEM_ID, self.SNAPSHOT_NAME 

749 ) 

750 

751 def test_create_snapshot_invalid_filesystem_id(self): 

752 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

753 None 

754 ) 

755 

756 snapshot = { 

757 "name": self.SNAPSHOT_NAME, 

758 "share_name": self.SHARE_NAME 

759 } 

760 self.assertRaises( 

761 exception.ShareBackendException, 

762 self.storage_connection.create_snapshot, 

763 self.mock_context, 

764 snapshot, 

765 None 

766 ) 

767 

768 def test_create_snapshot_backend_failure(self): 

769 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

770 self.FILESYSTEM_ID 

771 ) 

772 self._mock_powerstore_client. \ 

773 create_snapshot.return_value = None 

774 

775 snapshot = { 

776 "name": self.SNAPSHOT_NAME, 

777 "share_name": self.SHARE_NAME, 

778 "share": { 

779 "share_proto": "NFS" 

780 } 

781 } 

782 self.assertRaises( 

783 exception.ShareBackendException, 

784 self.storage_connection.create_snapshot, 

785 self.mock_context, 

786 snapshot, 

787 None 

788 ) 

789 

790 def test_delete_snapshot(self): 

791 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

792 self.SNAPSHOT_ID 

793 ) 

794 self._mock_powerstore_client.delete_filesystem.return_value = True 

795 

796 snapshot = { 

797 "name": self.SNAPSHOT_NAME, 

798 "share_name": self.SHARE_NAME 

799 } 

800 self.storage_connection.delete_snapshot( 

801 self.mock_context, snapshot, None 

802 ) 

803 

804 self._mock_powerstore_client.get_filesystem_id. \ 

805 assert_called_with( 

806 self.SNAPSHOT_NAME 

807 ) 

808 self._mock_powerstore_client.delete_filesystem.assert_called_with( 

809 self.SNAPSHOT_ID 

810 ) 

811 

812 def test_delete_snapshot_backend_failure(self): 

813 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

814 self.SNAPSHOT_ID 

815 ) 

816 self._mock_powerstore_client.delete_filesystem.return_value = False 

817 

818 snapshot = { 

819 "name": self.SNAPSHOT_NAME, 

820 "share_name": self.SHARE_NAME 

821 } 

822 self.assertRaises( 

823 exception.ShareBackendException, 

824 self.storage_connection.delete_snapshot, 

825 self.mock_context, 

826 snapshot, 

827 None 

828 ) 

829 

830 def test_revert_to_snapshot(self): 

831 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

832 self.SNAPSHOT_ID 

833 ) 

834 self._mock_powerstore_client.restore_snapshot.return_value = True 

835 

836 snapshot = { 

837 "name": self.SNAPSHOT_NAME, 

838 "share_name": self.SHARE_NAME 

839 } 

840 self.storage_connection.revert_to_snapshot( 

841 self.mock_context, snapshot, None, None, None 

842 ) 

843 

844 self._mock_powerstore_client.get_filesystem_id. \ 

845 assert_called_with( 

846 self.SNAPSHOT_NAME 

847 ) 

848 self._mock_powerstore_client.restore_snapshot.assert_called_with( 

849 self.SNAPSHOT_ID 

850 ) 

851 

852 def test_revert_to_snapshot_backend_failure(self): 

853 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

854 self.SNAPSHOT_ID 

855 ) 

856 self._mock_powerstore_client.restore_snapshot.return_value = False 

857 

858 snapshot = { 

859 "name": self.SNAPSHOT_NAME, 

860 "share_name": self.SHARE_NAME 

861 } 

862 self.assertRaises( 

863 exception.ShareBackendException, 

864 self.storage_connection.revert_to_snapshot, 

865 self.mock_context, 

866 snapshot, 

867 None, None, None 

868 ) 

869 

870 def test_create_share_from_snapshot_nfs(self): 

871 self._mock_powerstore_client.get_nas_server_id.return_value = ( 

872 self.NAS_SERVER_ID 

873 ) 

874 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

875 self.SNAPSHOT_ID 

876 ) 

877 self._mock_powerstore_client.clone_snapshot.return_value = ( 

878 self.CLONE_ID 

879 ) 

880 self._mock_powerstore_client.create_nfs_export.return_value = ( 

881 self.NFS_EXPORT_ID 

882 ) 

883 self._mock_powerstore_client.get_nas_server_interfaces.return_value = ( 

884 [{"ip": self.NAS_SERVER_IP, "preferred": True}] 

885 ) 

886 self._mock_powerstore_client.resize_filesystem.return_value = ( 

887 True, None 

888 ) 

889 share = {"name": self.SHARE_NAME, "share_proto": "NFS", 

890 "size": self.SHARE_NEW_SIZE_GB} 

891 snapshot = {"name": self.SNAPSHOT_NAME, "size": self.SHARE_SIZE_GB} 

892 locations = self.storage_connection.create_share_from_snapshot( 

893 self.mock_context, 

894 share, 

895 snapshot 

896 ) 

897 expected_locations = [ 

898 {"path": "%s:/%s" % ( 

899 self.NAS_SERVER_IP, 

900 self.SHARE_NAME, 

901 ), 

902 "metadata": { 

903 "preferred": True}}] 

904 self.assertEqual(expected_locations, locations) 

905 self._mock_powerstore_client.get_nas_server_id.assert_called_with( 

906 self._mock_config.safe_get("dell_nas_server") 

907 ) 

908 self._mock_powerstore_client.clone_snapshot.assert_called_with( 

909 self.SNAPSHOT_ID, 

910 self.SHARE_NAME 

911 ) 

912 self._mock_powerstore_client.create_nfs_export.assert_called_with( 

913 self.CLONE_ID, 

914 self.SHARE_NAME 

915 ) 

916 self._mock_powerstore_client.get_nas_server_interfaces. \ 

917 assert_called_with( 

918 self.NAS_SERVER_ID 

919 ) 

920 

921 def test_create_share_from_snapshot_cifs(self): 

922 self._mock_powerstore_client.get_nas_server_id.return_value = ( 

923 self.NAS_SERVER_ID 

924 ) 

925 self._mock_powerstore_client.get_filesystem_id.return_value = ( 

926 self.SNAPSHOT_ID 

927 ) 

928 self._mock_powerstore_client.clone_snapshot.return_value = ( 

929 self.CLONE_ID 

930 ) 

931 self._mock_powerstore_client.create_smb_share.return_value = ( 

932 self.NFS_EXPORT_ID 

933 ) 

934 self._mock_powerstore_client.get_nas_server_interfaces.return_value = ( 

935 [{"ip": self.NAS_SERVER_IP, "preferred": True}] 

936 ) 

937 self._mock_powerstore_client.resize_filesystem.return_value = ( 

938 True, None 

939 ) 

940 

941 share = {"name": self.SHARE_NAME, "share_proto": "CIFS", 

942 "size": self.SHARE_NEW_SIZE_GB} 

943 snapshot = {"name": self.SNAPSHOT_NAME, "size": self.SHARE_SIZE_GB} 

944 locations = self.storage_connection.create_share_from_snapshot( 

945 self.mock_context, 

946 share, 

947 snapshot 

948 ) 

949 expected_locations = [ 

950 {"path": "\\\\%s\\%s" % ( 

951 self.NAS_SERVER_IP, 

952 self.SHARE_NAME), 

953 "metadata": { 

954 "preferred": True}}] 

955 self.assertEqual(expected_locations, locations) 

956 self._mock_powerstore_client.get_nas_server_id.assert_called_with( 

957 self._mock_config.safe_get("dell_nas_server") 

958 ) 

959 self._mock_powerstore_client.clone_snapshot.assert_called_with( 

960 self.SNAPSHOT_ID, 

961 self.SHARE_NAME 

962 ) 

963 self._mock_powerstore_client.create_smb_share.assert_called_with( 

964 self.CLONE_ID, 

965 self.SHARE_NAME 

966 ) 

967 self._mock_powerstore_client.get_nas_server_interfaces. \ 

968 assert_called_with( 

969 self.NAS_SERVER_ID 

970 ) 

971 

972 def test_create_share_from_snapshot_clone_failure(self): 

973 share = {"name": self.SHARE_NAME, "share_proto": "NFS"} 

974 snapshot = {"name": self.SNAPSHOT_NAME} 

975 self._mock_powerstore_client.clone_snapshot.return_value = None 

976 

977 self.assertRaises( 

978 exception.ShareBackendException, 

979 self.storage_connection.create_share_from_snapshot, 

980 self.mock_context, 

981 share, 

982 snapshot 

983 ) 

984 

985 def test_create_share_from_snapshot_export_failure(self): 

986 share = {"name": self.SHARE_NAME, "share_proto": "NFS"} 

987 snapshot = {"name": self.SNAPSHOT_NAME} 

988 self._mock_powerstore_client.create_nfs_export.return_value = None 

989 

990 self.assertRaises( 

991 exception.ShareBackendException, 

992 self.storage_connection.create_share_from_snapshot, 

993 self.mock_context, 

994 share, 

995 snapshot 

996 ) 

997 

998 def test_create_share_from_snapshot_share_failure(self): 

999 share = {"name": self.SHARE_NAME, "share_proto": "CIFS"} 

1000 snapshot = {"name": self.SNAPSHOT_NAME} 

1001 self._mock_powerstore_client.create_smb_share.return_value = None 

1002 

1003 self.assertRaises( 

1004 exception.ShareBackendException, 

1005 self.storage_connection.create_share_from_snapshot, 

1006 self.mock_context, 

1007 share, 

1008 snapshot 

1009 ) 

1010 

1011 def test_get_default_filter_function(self): 

1012 filter = self.storage_connection.get_default_filter_function() 

1013 self.assertEqual(filter, "share.size >= 3")