Coverage for manila/tests/share/drivers/cephfs/test_driver.py: 99%

762 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2016 Red Hat, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15import json 

16import math 

17from unittest import mock 

18 

19import ddt 

20from oslo_utils import units 

21 

22from manila.common import constants 

23from manila import context 

24import manila.exception as exception 

25from manila.share import configuration 

26from manila.share.drivers.cephfs import driver 

27from manila.share import share_types 

28from manila import test 

29from manila.tests import fake_share 

30 

31 

32DEFAULT_VOLUME_MODE = '755' 

33ALT_VOLUME_MODE = '644' 

34 

35 

36class MockRadosModule(object): 

37 """Mocked up version of the rados module.""" 

38 

39 class Rados(mock.Mock): 

40 def __init__(self, *args, **kwargs): 

41 mock.Mock.__init__(self, spec=[ 

42 "connect", "shutdown", "state" 

43 ]) 

44 self.get_mon_addrs = mock.Mock(return_value=["1.2.3.4", "5.6.7.8"]) 

45 self.get_cluster_stats = mock.Mock(return_value={ 

46 "kb": 172953600, 

47 "kb_avail": 157123584, 

48 "kb_used": 15830016, 

49 "num_objects": 26, 

50 }) 

51 

52 class Error(mock.Mock): 

53 pass 

54 

55 

56class MockAllocationCapacityCache(mock.Mock): 

57 """Mocked up version of the rados module.""" 

58 def __init__(self, *args, **kwargs): 

59 mock.Mock.__init__(self, spec=[ 

60 "update_data" 

61 ]) 

62 self.is_expired = mock.Mock(return_value=False) 

63 self.get_data = mock.Mock(return_value=20.0) 

64 

65 

66class MockCephArgparseModule(object): 

67 """Mocked up version of the ceph_argparse module.""" 

68 

69 class json_command(mock.Mock): 

70 def __init__(self, *args, **kwargs): 

71 mock.Mock.__init__(self, spec=[ 

72 "connect", "shutdown", "state" 

73 ]) 

74 

75 

76@ddt.ddt 

77class AllocationCapacityCacheTestCase(test.TestCase): 

78 """Test the Allocation capacity cache class. 

79 

80 This is a cache with a getter and a setter for the allocated capacity 

81 cached value in the driver, also with a timeout control. 

82 """ 

83 

84 def setUp(self): 

85 super(AllocationCapacityCacheTestCase, self).setUp() 

86 timeout = 10 

87 self._allocation_capacity_cache = driver.AllocationCapacityCache( 

88 timeout 

89 ) 

90 

91 def test_set_get_data(self): 

92 # Nothing set yet, info should be "expired" 

93 self.assertTrue( 

94 self._allocation_capacity_cache.is_expired() 

95 ) 

96 

97 # Class value starts with None 

98 expected_allocated_capacity_gb = None 

99 cached_allocated_capacity_gb = ( 

100 self._allocation_capacity_cache.get_data() 

101 ) 

102 self.assertEqual( 

103 cached_allocated_capacity_gb, expected_allocated_capacity_gb 

104 ) 

105 

106 # Set a new value and ensure it works properly 

107 expected_allocated_capacity_gb = 100.0 

108 self._allocation_capacity_cache.update_data( 

109 expected_allocated_capacity_gb 

110 ) 

111 cached_allocated_capacity_gb = ( 

112 self._allocation_capacity_cache.get_data() 

113 ) 

114 self.assertEqual( 

115 cached_allocated_capacity_gb, expected_allocated_capacity_gb 

116 ) 

117 

118 

119@ddt.ddt 

120class CephFSDriverTestCase(test.TestCase): 

121 """Test the CephFS driver. 

122 

123 This is a very simple driver that mainly 

124 calls through to the CephFSVolumeClient interface, so the tests validate 

125 that the Manila driver calls map to the appropriate CephFSVolumeClient 

126 calls. 

127 """ 

128 

129 def setUp(self): 

130 super(CephFSDriverTestCase, self).setUp() 

131 self._execute = mock.Mock() 

132 self.fake_conf = configuration.Configuration(None) 

133 self._context = context.get_admin_context() 

134 self._share = fake_share.fake_share(share_proto='CEPHFS') 

135 self._snapshot = fake_share.fake_snapshot_instance() 

136 

137 self.fake_conf.set_default('driver_handles_share_servers', False) 

138 self.fake_conf.set_default('cephfs_auth_id', 'manila') 

139 

140 self.mock_object(driver, "rados_command") 

141 self.mock_object(driver, "rados", MockRadosModule) 

142 self.mock_object(driver, "json_command", MockCephArgparseModule) 

143 self.mock_object(driver, 'NativeProtocolHelper') 

144 self.mock_object(driver, 'NFSProtocolHelper') 

145 self.mock_object(driver, 'NFSClusterProtocolHelper') 

146 self.mock_object(driver, "AllocationCapacityCache", 

147 MockAllocationCapacityCache) 

148 

149 driver.ceph_default_target = ('mon-mgr', ) 

150 self.fake_private_storage = mock.Mock() 

151 self.mock_object(self.fake_private_storage, 'get', 

152 mock.Mock(return_value=None)) 

153 

154 self._driver = ( 

155 driver.CephFSDriver(execute=self._execute, 

156 configuration=self.fake_conf, 

157 private_storage=self.fake_private_storage)) 

158 self._driver.protocol_helper = mock.Mock() 

159 self._driver._cached_allocated_capacity_gb = ( 

160 MockAllocationCapacityCache() 

161 ) 

162 

163 type(self._driver).volname = mock.PropertyMock(return_value='cephfs') 

164 

165 self.mock_object(share_types, 'get_share_type_extra_specs', 

166 mock.Mock(return_value={})) 

167 

168 @ddt.data( 

169 ('cephfs', None), 

170 ('nfs', None), 

171 ('nfs', 'fs-manila') 

172 ) 

173 @ddt.unpack 

174 def test_do_setup(self, protocol_helper, cephfs_nfs_cluster_id): 

175 self._driver.configuration.cephfs_protocol_helper_type = ( 

176 protocol_helper) 

177 self.fake_conf.set_default('cephfs_nfs_cluster_id', 

178 cephfs_nfs_cluster_id) 

179 self.mock_object( 

180 self._driver, '_get_cephfs_filesystem_allocation', 

181 mock.Mock(return_value=10) 

182 ) 

183 

184 self._driver.do_setup(self._context) 

185 

186 if protocol_helper == 'cephfs': 

187 driver.NativeProtocolHelper.assert_called_once_with( 

188 self._execute, self._driver.configuration, 

189 rados_client=self._driver._rados_client, 

190 volname=self._driver.volname) 

191 else: 

192 if self.fake_conf.cephfs_nfs_cluster_id is None: 

193 driver.NFSProtocolHelper.assert_called_once_with( 

194 self._execute, self._driver.configuration, 

195 rados_client=self._driver._rados_client, 

196 volname=self._driver.volname) 

197 else: 

198 driver.NFSClusterProtocolHelper.assert_called_once_with( 

199 self._execute, self._driver.configuration, 

200 rados_client=self._driver._rados_client, 

201 volname=self._driver.volname) 

202 

203 self._driver.protocol_helper.init_helper.assert_called_once_with() 

204 

205 self.assertEqual(DEFAULT_VOLUME_MODE, self._driver._cephfs_volume_mode) 

206 

207 def test__get_sub_name(self): 

208 sub_name = self._driver._get_subvolume_name(self._share["id"]) 

209 self.assertEqual(sub_name, self._share["id"]) 

210 

211 def test__get_sub_name_has_other_name(self): 

212 expected_sub_name = 'user_specified_subvolume_name' 

213 self.mock_object( 

214 self._driver.private_storage, 'get', 

215 mock.Mock(return_value=expected_sub_name) 

216 ) 

217 sub_name = self._driver._get_subvolume_name(self._share["id"]) 

218 self.assertEqual(expected_sub_name, sub_name) 

219 

220 def test__get_sub_snapshot_name(self): 

221 sub_name = self._driver._get_subvolume_snapshot_name( 

222 self._snapshot["id"] 

223 ) 

224 self.assertEqual(sub_name, self._snapshot["id"]) 

225 

226 def test__get_sub_snapshot_name_has_other_name(self): 

227 expected_sub_snap_name = 'user_specified_subvolume_snapshot_name' 

228 self.mock_object( 

229 self._driver.private_storage, 'get', 

230 mock.Mock(return_value=expected_sub_snap_name) 

231 ) 

232 sub_name = self._driver._get_subvolume_snapshot_name( 

233 self._snapshot["id"] 

234 ) 

235 self.assertEqual(expected_sub_snap_name, sub_name) 

236 

237 @ddt.data( 

238 ('{"version": "ceph version 16.2.4"}', 'pacific'), 

239 ('{"version": "ceph version 15.1.2"}', 'octopus'), 

240 ('{"version": "ceph version 14.3.1"}', 'nautilus'), 

241 ) 

242 @ddt.unpack 

243 def test_version_check(self, ceph_mon_version, codename): 

244 driver.ceph_default_target = None 

245 driver.rados_command.return_value = ceph_mon_version 

246 

247 self.mock_object( 

248 self._driver, '_get_cephfs_filesystem_allocation', 

249 mock.Mock(return_value=10) 

250 ) 

251 

252 self._driver.do_setup(self._context) 

253 

254 if codename == 'nautilus': 

255 self.assertEqual(('mgr', ), driver.ceph_default_target) 

256 else: 

257 self.assertEqual(('mon-mgr', ), driver.ceph_default_target) 

258 

259 driver.rados_command.assert_called_once_with( 

260 self._driver.rados_client, "version", target=('mon', )) 

261 

262 self.assertEqual(1, driver.rados_command.call_count) 

263 

264 def test_version_check_not_supported(self): 

265 driver.ceph_default_target = None 

266 driver.rados_command.return_value = ( 

267 '{"version": "ceph version 13.0.1"}') 

268 

269 self.assertRaises(exception.ShareBackendException, 

270 self._driver.do_setup, 

271 self._context) 

272 

273 @ddt.data('cephfs', 'nfs') 

274 def test_check_for_setup_error(self, protocol_helper): 

275 self._driver.configuration.cephfs_protocol_helper_type = ( 

276 protocol_helper) 

277 

278 self._driver.check_for_setup_error() 

279 

280 (self._driver.protocol_helper.check_for_setup_error. 

281 assert_called_once_with()) 

282 

283 def test_create_share(self): 

284 create_share_prefix = "fs subvolume create" 

285 get_path_prefix = "fs subvolume getpath" 

286 

287 create_share_dict = { 

288 "vol_name": self._driver.volname, 

289 "sub_name": self._share["id"], 

290 "size": self._share["size"] * units.Gi, 

291 "namespace_isolated": True, 

292 "mode": DEFAULT_VOLUME_MODE, 

293 } 

294 

295 get_path_dict = { 

296 "vol_name": self._driver.volname, 

297 "sub_name": self._share["id"], 

298 } 

299 

300 self._driver.create_share(self._context, self._share) 

301 

302 driver.rados_command.assert_has_calls([ 

303 mock.call(self._driver.rados_client, 

304 create_share_prefix, 

305 create_share_dict), 

306 mock.call(self._driver.rados_client, 

307 get_path_prefix, 

308 get_path_dict)]) 

309 

310 self.assertEqual(2, driver.rados_command.call_count) 

311 

312 def test_create_share_error(self): 

313 share = fake_share.fake_share(share_proto='NFS') 

314 

315 self.assertRaises(exception.ShareBackendException, 

316 self._driver.create_share, 

317 self._context, 

318 share) 

319 

320 def _setup_manage_subvolume_test(self): 

321 fake_els = [ 

322 {'path': 'fake/path'} 

323 ] 

324 share_with_el = fake_share.fake_share(export_locations=fake_els) 

325 expected_subvolume_info_argdict = { 

326 "vol_name": self._driver.volname, 

327 "sub_name": fake_els[0]["path"], 

328 } 

329 subvolume_info_mock_result = { 

330 'atime': '2024-07-23 16:50:03', 

331 'bytes_pcent': '0.00', 

332 'bytes_quota': 2147483648, 

333 'bytes_used': 0, 

334 'created_at': '2024-07-23 16:50:03', 

335 'ctime': '2024-07-23 17:24:49', 

336 'data_pool': 'cephfs.cephfs.data', 

337 'features': ['snapshot-clone', 'snapshot-autoprotect'], 

338 'gid': 0, 

339 'mode': 755, 

340 'mon_addrs': ['10.0.0.1:6342'], 

341 'mtime': '2024-07-23 16:50:03', 

342 'path': '/volumes/_nogroup/subbvol/475a-4972-9f6b-fe025a8d383f', 

343 'pool_namespace': 'fsvolumes_cephfs', 

344 'state': 'complete', 

345 'type': 'subvolume', 

346 'uid': 0 

347 } 

348 

349 return ( 

350 share_with_el, expected_subvolume_info_argdict, 

351 subvolume_info_mock_result 

352 ) 

353 

354 def test_manage_existing_no_subvolume_name(self): 

355 self.assertRaises( 

356 exception.ShareBackendException, 

357 self._driver.manage_existing, 

358 { 

359 'id': 'fake_project_uuid_1', 

360 'export_locations': [{'path': None}] 

361 }, 

362 {} 

363 ) 

364 

365 def test_manage_existing_subvolume_not_found(self): 

366 driver.rados_command.side_effect = exception.ShareBackendException( 

367 msg="does not exist" 

368 ) 

369 fake_els = [ 

370 {'path': 'fake/path'} 

371 ] 

372 share_with_el = fake_share.fake_share(export_locations=fake_els) 

373 expected_info_argdict = { 

374 "vol_name": self._driver.volname, 

375 "sub_name": fake_els[0]["path"], 

376 } 

377 

378 self.assertRaises( 

379 exception.ShareBackendException, 

380 self._driver.manage_existing, 

381 share_with_el, 

382 {} 

383 ) 

384 

385 driver.rados_command.assert_called_once_with( 

386 self._driver.rados_client, "fs subvolume info", 

387 expected_info_argdict, 

388 json_obj=True 

389 ) 

390 

391 def test_manage_existing_subvolume_infinite_no_provided_size(self): 

392 share_with_el, expected_info_argdict, subvolume_info = ( 

393 self._setup_manage_subvolume_test() 

394 ) 

395 subvolume_info['bytes_quota'] = "infinite" 

396 driver.rados_command.return_value = subvolume_info 

397 

398 self.assertRaises( 

399 exception.ShareBackendException, 

400 self._driver.manage_existing, 

401 share_with_el, 

402 {} 

403 ) 

404 driver.rados_command.assert_called_once_with( 

405 self._driver.rados_client, "fs subvolume info", 

406 expected_info_argdict, 

407 json_obj=True 

408 ) 

409 

410 @ddt.data( 

411 exception.ShareShrinkingPossibleDataLoss, 

412 exception.ShareBackendException 

413 ) 

414 def test_manage_existing_subvolume_infinite_size(self, expected_exception): 

415 share_with_el, expected_info_argdict, subvolume_info = ( 

416 self._setup_manage_subvolume_test() 

417 ) 

418 subvolume_info['bytes_quota'] = "infinite" 

419 driver.rados_command.return_value = subvolume_info 

420 new_size = 1 

421 

422 mock_resize = self.mock_object( 

423 self._driver, '_resize_share', 

424 mock.Mock(side_effect=expected_exception('fake')) 

425 ) 

426 

427 self.assertRaises( 

428 expected_exception, 

429 self._driver.manage_existing, 

430 share_with_el, 

431 {'size': new_size} 

432 ) 

433 

434 driver.rados_command.assert_called_once_with( 

435 self._driver.rados_client, "fs subvolume info", 

436 expected_info_argdict, 

437 json_obj=True 

438 ) 

439 mock_resize.assert_called_once_with( 

440 share_with_el, new_size, no_shrink=True 

441 ) 

442 

443 @ddt.data(True, False) 

444 def test_manage_existing(self, current_size_is_smaller): 

445 share_with_el, expected_info_argdict, subvolume_info = ( 

446 self._setup_manage_subvolume_test() 

447 ) 

448 if current_size_is_smaller: 

449 # set this to half gb, to ensure it will turn into 1gb 

450 subvolume_info['bytes_quota'] = 536870912 

451 subvolume_name = share_with_el["export_locations"][0]["path"] 

452 expected_share_metadata = {"subvolume_name": subvolume_name} 

453 expected_share_updates = { 

454 "size": int( 

455 math.ceil(int(subvolume_info['bytes_quota']) / units.Gi)), 

456 "export_locations": subvolume_name 

457 } 

458 

459 driver.rados_command.return_value = subvolume_info 

460 self.mock_object( 

461 self._driver, '_get_export_locations', 

462 mock.Mock(return_value=subvolume_name)) 

463 mock_resize_share = self.mock_object(self._driver, '_resize_share') 

464 

465 share_updates = self._driver.manage_existing(share_with_el, {}) 

466 

467 self.assertEqual(expected_share_updates, share_updates) 

468 driver.rados_command.assert_called_once_with( 

469 self._driver.rados_client, "fs subvolume info", 

470 expected_info_argdict, 

471 json_obj=True 

472 ) 

473 self._driver.private_storage.update.assert_called_once_with( 

474 share_with_el['id'], expected_share_metadata 

475 ) 

476 self._driver._get_export_locations.assert_called_once_with( 

477 share_with_el, subvolume_name=subvolume_name 

478 ) 

479 if current_size_is_smaller: 

480 mock_resize_share.assert_called_once_with( 

481 share_with_el, 1, no_shrink=True 

482 ) 

483 else: 

484 mock_resize_share.assert_not_called() 

485 

486 def test_manage_existing_snapshot_no_snapshot_name(self): 

487 self.assertRaises( 

488 exception.ShareBackendException, 

489 self._driver.manage_existing_snapshot, 

490 { 

491 'id': 'fake_project_uuid_1', 

492 'provider_location': None, 

493 }, 

494 {} 

495 ) 

496 

497 def test_manage_existing_snapshot_subvolume_not_found(self): 

498 driver.rados_command.side_effect = exception.ShareBackendException( 

499 msg="does not exist" 

500 ) 

501 snapshot_instance = { 

502 'id': 'fake_project_uuid_1', 

503 'provider_location': 'fake/provider/location', 

504 'share_instance_id': 'fake_share_instance_id' 

505 } 

506 expected_info_argdict = { 

507 "vol_name": self._driver.volname, 

508 "sub_name": snapshot_instance["share_instance_id"] 

509 } 

510 

511 self.assertRaises( 

512 exception.ShareBackendException, 

513 self._driver.manage_existing_snapshot, 

514 snapshot_instance, 

515 {} 

516 ) 

517 

518 driver.rados_command.assert_called_once_with( 

519 self._driver.rados_client, "fs subvolume info", 

520 expected_info_argdict, 

521 json_obj=True 

522 ) 

523 

524 def test_manage_existing_snapshot_snapshot_not_found(self): 

525 _, expected_info_argdict, subvolume_info = ( 

526 self._setup_manage_subvolume_test() 

527 ) 

528 expected_snapshot_name = 'fake/provider/location' 

529 snapshot_instance = { 

530 'id': 'fake_project_uuid_1', 

531 'provider_location': expected_snapshot_name, 

532 'share_instance_id': 'fake_share_instance_id' 

533 } 

534 expected_info_argdict = { 

535 "vol_name": self._driver.volname, 

536 "sub_name": snapshot_instance["share_instance_id"] 

537 } 

538 expected_snap_info_argdict = { 

539 "vol_name": self._driver.volname, 

540 "sub_name": snapshot_instance["share_instance_id"], 

541 "snap_name": expected_snapshot_name 

542 } 

543 driver.rados_command.side_effect = [ 

544 subvolume_info, 

545 exception.ShareBackendException(msg="does not exist") 

546 ] 

547 

548 self.assertRaises( 

549 exception.ShareBackendException, 

550 self._driver.manage_existing_snapshot, 

551 snapshot_instance, 

552 {} 

553 ) 

554 driver.rados_command.assert_has_calls([ 

555 mock.call( 

556 self._driver.rados_client, "fs subvolume info", 

557 expected_info_argdict, json_obj=True 

558 ), 

559 mock.call( 

560 self._driver.rados_client, "fs subvolume snapshot info", 

561 expected_snap_info_argdict, 

562 json_obj=True 

563 ) 

564 ]) 

565 

566 def test_manage_existing_snapshot(self): 

567 _, expected_info_argdict, subvolume_info = ( 

568 self._setup_manage_subvolume_test() 

569 ) 

570 expected_snapshot_name = 'fake_snapshot_name' 

571 snapshot_instance = { 

572 'id': 'fake_project_uuid_1', 

573 'provider_location': expected_snapshot_name, 

574 'share_instance_id': 'fake_share_instance_id', 

575 'snapshot_id': 'fake_snapshot_id' 

576 } 

577 expected_info_argdict = { 

578 "vol_name": self._driver.volname, 

579 "sub_name": snapshot_instance["share_instance_id"] 

580 } 

581 expected_snap_info_argdict = { 

582 "vol_name": self._driver.volname, 

583 "sub_name": snapshot_instance["share_instance_id"], 

584 "snap_name": expected_snapshot_name 

585 } 

586 driver.rados_command.side_effect = [ 

587 subvolume_info, 

588 {'name': expected_snapshot_name} 

589 ] 

590 expected_result = { 

591 'provider_location': expected_snapshot_name 

592 } 

593 

594 result = self._driver.manage_existing_snapshot( 

595 snapshot_instance, 

596 {} 

597 ) 

598 

599 self.assertEqual(expected_result, result) 

600 

601 driver.rados_command.assert_has_calls([ 

602 mock.call( 

603 self._driver.rados_client, "fs subvolume info", 

604 expected_info_argdict, json_obj=True 

605 ), 

606 mock.call( 

607 self._driver.rados_client, "fs subvolume snapshot info", 

608 expected_snap_info_argdict, json_obj=True 

609 ) 

610 ]) 

611 self.fake_private_storage.update.assert_called_once_with( 

612 snapshot_instance['snapshot_id'], 

613 {"subvolume_snapshot_name": expected_snapshot_name} 

614 ) 

615 

616 def test_update_access(self): 

617 alice = { 

618 'id': 'instance_mapping_id1', 

619 'access_id': 'accessid1', 

620 'access_level': 'rw', 

621 'access_type': 'cephx', 

622 'access_to': 'alice' 

623 } 

624 add_rules = access_rules = [alice, ] 

625 delete_rules = [] 

626 update_rules = [] 

627 

628 self._driver.update_access( 

629 self._context, self._share, access_rules, add_rules, delete_rules, 

630 update_rules, None) 

631 

632 self._driver.protocol_helper.update_access.assert_called_once_with( 

633 self._context, self._share, access_rules, add_rules, delete_rules, 

634 update_rules, share_server=None, sub_name=self._share['id']) 

635 

636 def test_ensure_shares(self): 

637 self._driver.protocol_helper.reapply_rules_while_ensuring_shares = True 

638 shares = [ 

639 fake_share.fake_share(share_id='123', share_proto='NFS'), 

640 fake_share.fake_share(share_id='456', share_proto='NFS'), 

641 fake_share.fake_share(share_id='789', share_proto='NFS') 

642 ] 

643 export_locations = [ 

644 { 

645 'path': '1.2.3.4,5.6.7.8:/foo/bar', 

646 'is_admin_only': False, 

647 'metadata': {}, 

648 }, 

649 { 

650 'path': '1.2.3.4,5.6.7.8:/foo/quz', 

651 'is_admin_only': False, 

652 'metadata': {}, 

653 }, 

654 

655 ] 

656 share_backend_info = {'metadata': {'__mount_options': 'fs=cephfs'}} 

657 metadata = share_backend_info.get('metadata') 

658 expected_updates = { 

659 shares[0]['id']: { 

660 'status': constants.STATUS_ERROR, 

661 'reapply_access_rules': True, 

662 'metadata': metadata, 

663 }, 

664 shares[1]['id']: { 

665 'export_locations': export_locations[0], 

666 'reapply_access_rules': True, 

667 'metadata': metadata, 

668 }, 

669 shares[2]['id']: { 

670 'export_locations': export_locations[1], 

671 'reapply_access_rules': True, 

672 'metadata': metadata, 

673 } 

674 } 

675 err_message = (f"Error ENOENT: subvolume {self._share['id']} does " 

676 f"not exist") 

677 expected_exception = exception.ShareBackendException(err_message) 

678 

679 self.mock_object( 

680 self._driver, '_get_export_locations', 

681 mock.Mock(side_effect=[expected_exception] + export_locations)) 

682 self.mock_object( 

683 self._driver, 'get_optional_share_creation_data', 

684 mock.Mock(return_value=share_backend_info)) 

685 

686 actual_updates = self._driver.ensure_shares(self._context, shares) 

687 

688 self.assertEqual(3, self._driver._get_export_locations.call_count) 

689 self._driver._get_export_locations.assert_has_calls([ 

690 mock.call(shares[0]), mock.call(shares[1]), mock.call(shares[2])]) 

691 self.assertTrue(self._driver.get_optional_share_creation_data.called) 

692 self.assertEqual(expected_updates, actual_updates) 

693 

694 def test_delete_share(self): 

695 clone_status_prefix = "fs clone status" 

696 

697 clone_status_dict = { 

698 "vol_name": self._driver.volname, 

699 "clone_name": self._share["id"], 

700 } 

701 

702 delete_share_prefix = "fs subvolume rm" 

703 

704 delete_share_dict = { 

705 "vol_name": self._driver.volname, 

706 "sub_name": self._share["id"], 

707 "force": True, 

708 } 

709 

710 driver.rados_command.side_effect = [driver.rados.Error, mock.Mock()] 

711 

712 self._driver.delete_share(self._context, self._share) 

713 

714 driver.rados_command.assert_has_calls([ 

715 mock.call(self._driver.rados_client, 

716 clone_status_prefix, 

717 clone_status_dict), 

718 mock.call(self._driver.rados_client, 

719 delete_share_prefix, 

720 delete_share_dict)]) 

721 

722 self.assertEqual(2, driver.rados_command.call_count) 

723 

724 def test_extend_share(self): 

725 extend_share_prefix = "fs subvolume resize" 

726 

727 new_size_gb = self._share['size'] * 2 

728 new_size = new_size_gb * units.Gi 

729 

730 extend_share_dict = { 

731 "vol_name": self._driver.volname, 

732 "sub_name": self._share["id"], 

733 "new_size": new_size, 

734 } 

735 

736 self._driver.extend_share(self._share, new_size_gb, None) 

737 

738 driver.rados_command.assert_called_once_with( 

739 self._driver.rados_client, extend_share_prefix, extend_share_dict) 

740 

741 def test_shrink_share(self): 

742 shrink_share_prefix = "fs subvolume resize" 

743 

744 new_size_gb = self._share['size'] * 0.5 

745 new_size = new_size_gb * units.Gi 

746 

747 shrink_share_dict = { 

748 "vol_name": self._driver.volname, 

749 "sub_name": self._share["id"], 

750 "new_size": new_size, 

751 "no_shrink": True, 

752 } 

753 

754 self._driver.shrink_share(self._share, new_size_gb, None) 

755 

756 driver.rados_command.assert_called_once_with( 

757 self._driver.rados_client, shrink_share_prefix, shrink_share_dict) 

758 

759 def test_shrink_share_full(self): 

760 """That shrink fails when share is too full.""" 

761 shrink_share_prefix = "fs subvolume resize" 

762 

763 new_size_gb = self._share['size'] * 0.5 

764 new_size = new_size_gb * units.Gi 

765 

766 msg = ("Can't resize the subvolume. " 

767 "The new size '{0}' would be lesser " 

768 "than the current used size '{1}'".format( 

769 new_size, self._share['size'])) 

770 driver.rados_command.side_effect = exception.ShareBackendException(msg) 

771 

772 shrink_share_dict = { 

773 "vol_name": self._driver.volname, 

774 "sub_name": self._share["id"], 

775 "new_size": new_size, 

776 "no_shrink": True, 

777 } 

778 

779 # Pretend to be full up 

780 self.assertRaises(exception.ShareShrinkingPossibleDataLoss, 

781 self._driver.shrink_share, 

782 self._share, new_size_gb, None) 

783 

784 driver.rados_command.assert_called_once_with( 

785 self._driver.rados_client, shrink_share_prefix, shrink_share_dict) 

786 

787 def test_create_snapshot(self): 

788 snapshot_create_prefix = "fs subvolume snapshot create" 

789 

790 snapshot_create_dict = { 

791 "vol_name": self._driver.volname, 

792 "sub_name": self._snapshot["share_id"], 

793 "snap_name": self._snapshot["snapshot_id"] 

794 } 

795 

796 self._driver.create_snapshot(self._context, self._snapshot, None) 

797 

798 driver.rados_command.assert_called_once_with( 

799 self._driver.rados_client, 

800 snapshot_create_prefix, snapshot_create_dict) 

801 

802 def test_delete_snapshot(self): 

803 legacy_snap_name = "_".join( 

804 [self._snapshot["snapshot_id"], self._snapshot["id"]]) 

805 

806 snapshot_remove_prefix = "fs subvolume snapshot rm" 

807 

808 snapshot_remove_dict = { 

809 "vol_name": self._driver.volname, 

810 "sub_name": self._snapshot["share_id"], 

811 "snap_name": legacy_snap_name, 

812 "force": True 

813 } 

814 

815 snapshot_remove_dict_2 = snapshot_remove_dict.copy() 

816 snapshot_remove_dict_2.update( 

817 {"snap_name": self._snapshot["snapshot_id"]}) 

818 

819 self.mock_object( 

820 self._driver, 

821 '_get_subvolume_snapshot_name', 

822 mock.Mock(return_value=self._snapshot["snapshot_id"])) 

823 

824 self._driver.delete_snapshot(self._context, 

825 self._snapshot, 

826 None) 

827 

828 driver.rados_command.assert_has_calls([ 

829 mock.call(self._driver.rados_client, 

830 snapshot_remove_prefix, 

831 snapshot_remove_dict), 

832 mock.call(self._driver.rados_client, 

833 snapshot_remove_prefix, 

834 snapshot_remove_dict_2)]) 

835 

836 self.assertEqual(2, driver.rados_command.call_count) 

837 

838 def test_create_share_group(self): 

839 group_create_prefix = "fs subvolumegroup create" 

840 

841 group_create_dict = { 

842 "vol_name": self._driver.volname, 

843 "group_name": "grp1", 

844 "mode": DEFAULT_VOLUME_MODE, 

845 } 

846 

847 self._driver.create_share_group(self._context, {"id": "grp1"}, None) 

848 

849 driver.rados_command.assert_called_once_with( 

850 self._driver.rados_client, 

851 group_create_prefix, group_create_dict) 

852 

853 def test_delete_share_group(self): 

854 group_delete_prefix = "fs subvolumegroup rm" 

855 

856 group_delete_dict = { 

857 "vol_name": self._driver.volname, 

858 "group_name": "grp1", 

859 "force": True, 

860 } 

861 

862 self._driver.delete_share_group(self._context, {"id": "grp1"}, None) 

863 

864 driver.rados_command.assert_called_once_with( 

865 self._driver.rados_client, 

866 group_delete_prefix, group_delete_dict) 

867 

868 def test_create_share_group_snapshot(self): 

869 msg = ("Share group snapshot feature is no longer supported in " 

870 "mainline CephFS (existing group snapshots can still be " 

871 "listed and deleted).") 

872 driver.rados_command.side_effect = exception.ShareBackendException(msg) 

873 

874 self.assertRaises(exception.ShareBackendException, 

875 self._driver.create_share_group_snapshot, 

876 self._context, {'share_group_id': 'sgid', 

877 'id': 'snapid'}) 

878 

879 def test_delete_share_group_snapshot(self): 

880 group_snapshot_delete_prefix = "fs subvolumegroup snapshot rm" 

881 

882 group_snapshot_delete_dict = { 

883 "vol_name": self._driver.volname, 

884 "group_name": "sgid", 

885 "snap_name": "snapid", 

886 "force": True, 

887 } 

888 

889 self._driver.delete_share_group_snapshot(self._context, { 

890 'share_group_id': 'sgid', 

891 'id': 'snapid', 

892 "force": True, 

893 }) 

894 

895 driver.rados_command.assert_called_once_with( 

896 self._driver.rados_client, 

897 group_snapshot_delete_prefix, group_snapshot_delete_dict) 

898 

899 def test_create_share_from_snapshot(self): 

900 parent_share = { 

901 'id': 'fakeparentshareid', 

902 'name': 'fakeparentshare', 

903 } 

904 

905 create_share_from_snapshot_prefix = "fs subvolume snapshot clone" 

906 

907 create_share_from_snapshot_dict = { 

908 "vol_name": self._driver.volname, 

909 "sub_name": parent_share["id"], 

910 "snap_name": self._snapshot["snapshot_id"], 

911 "target_sub_name": self._share["id"] 

912 } 

913 

914 get_clone_status_prefix = "fs clone status" 

915 get_clone_status_dict = { 

916 "vol_name": self._driver.volname, 

917 "clone_name": self._share["id"], 

918 } 

919 driver.rados_command.return_value = { 

920 'status': { 

921 'state': 'in-progress', 

922 }, 

923 } 

924 

925 self._driver.create_share_from_snapshot( 

926 self._context, self._share, self._snapshot, None, 

927 parent_share=parent_share 

928 ) 

929 

930 driver.rados_command.assert_has_calls([ 

931 mock.call(self._driver.rados_client, 

932 create_share_from_snapshot_prefix, 

933 create_share_from_snapshot_dict), 

934 mock.call(self._driver.rados_client, 

935 get_clone_status_prefix, 

936 get_clone_status_dict, 

937 True)]) 

938 

939 self.assertEqual(2, driver.rados_command.call_count) 

940 

941 def test_delete_share_from_snapshot(self): 

942 clone_status_prefix = "fs clone status" 

943 

944 clone_status_dict = { 

945 "vol_name": self._driver.volname, 

946 "clone_name": self._share["id"], 

947 } 

948 

949 clone_cancel_prefix = "fs clone cancel" 

950 

951 clone_cancel_dict = { 

952 "vol_name": self._driver.volname, 

953 "clone_name": self._share["id"], 

954 "force": True, 

955 } 

956 

957 delete_share_prefix = "fs subvolume rm" 

958 

959 delete_share_dict = { 

960 "vol_name": self._driver.volname, 

961 "sub_name": self._share["id"], 

962 "force": True, 

963 } 

964 

965 driver.rados_command.side_effect = [ 

966 'in-progress', mock.Mock(), mock.Mock()] 

967 

968 self._driver.delete_share(self._context, self._share) 

969 

970 driver.rados_command.assert_has_calls([ 

971 mock.call(self._driver.rados_client, 

972 clone_status_prefix, 

973 clone_status_dict), 

974 mock.call(self._driver.rados_client, 

975 clone_cancel_prefix, 

976 clone_cancel_dict), 

977 mock.call(self._driver.rados_client, 

978 delete_share_prefix, 

979 delete_share_dict)]) 

980 

981 self.assertEqual(3, driver.rados_command.call_count) 

982 

983 def test_delete_driver(self): 

984 # Create share to prompt volume_client construction 

985 self._driver.create_share(self._context, 

986 self._share) 

987 

988 rc = self._driver._rados_client 

989 del self._driver 

990 

991 rc.shutdown.assert_called_once_with() 

992 

993 def test_delete_driver_no_client(self): 

994 self.assertIsNone(self._driver._rados_client) 

995 del self._driver 

996 

997 @ddt.data( 

998 [21474836480, 293878, 97848372], 

999 [21474836480, "infinite", 97848372], 

1000 ["infinite", "infinite", "infinite"], 

1001 ) 

1002 def test__get_cephfs_filesystem_allocation(self, share_sizes): 

1003 subvolume_ls_args = {"vol_name": self._driver.volname} 

1004 rados_returns = [] 

1005 rados_subvolume_list_result = [] 

1006 subvolume_info_mock_calls = [] 

1007 subvolume_names = [] 

1008 expected_allocated_size_gb = 0 

1009 

1010 for idx, size in enumerate(share_sizes): 

1011 subvolume_name = f"subvolume{idx}" 

1012 subvolume_names.append(subvolume_name) 

1013 rados_returns.append({"bytes_quota": share_sizes[idx]}) 

1014 rados_subvolume_list_result.append({"name": subvolume_name}) 

1015 if size != "infinite": 

1016 expected_allocated_size_gb += size 

1017 

1018 if expected_allocated_size_gb > 0: 

1019 expected_allocated_size_gb = ( 

1020 round(int(expected_allocated_size_gb) / units.Gi, 2) 

1021 ) 

1022 

1023 # first call we make to rados is the subvolume ls 

1024 rados_returns.insert(0, rados_subvolume_list_result) 

1025 driver.rados_command.side_effect = rados_returns 

1026 

1027 allocated_size_gb = self._driver._get_cephfs_filesystem_allocation() 

1028 

1029 self.assertEqual(allocated_size_gb, expected_allocated_size_gb) 

1030 for name in subvolume_names: 

1031 subvolume_info_arg_dict = { 

1032 "vol_name": self._driver.volname, 

1033 "sub_name": name 

1034 } 

1035 subvolume_info_mock_calls.append( 

1036 mock.call( 

1037 self._driver._rados_client, 

1038 "fs subvolume info", 

1039 subvolume_info_arg_dict, json_obj=True 

1040 ) 

1041 ) 

1042 driver.rados_command.assert_has_calls([ 

1043 mock.call( 

1044 self._driver._rados_client, 

1045 "fs subvolume ls", subvolume_ls_args, json_obj=True), 

1046 *subvolume_info_mock_calls 

1047 ]) 

1048 

1049 @ddt.data(True, False) 

1050 def test_update_share_stats(self, cache_expired): 

1051 allocated_capacity_gb = 20.0 

1052 self._driver.get_configured_ip_versions = mock.Mock(return_value=[4]) 

1053 self._driver.configuration.local_conf.set_override( 

1054 'reserved_share_percentage', 5) 

1055 self._driver.configuration.local_conf.set_override( 

1056 'reserved_share_from_snapshot_percentage', 2) 

1057 self._driver.configuration.local_conf.set_override( 

1058 'reserved_share_extend_percentage', 2) 

1059 self._driver._cached_allocated_capacity_gb.is_expired = mock.Mock( 

1060 return_value=cache_expired 

1061 ) 

1062 self.mock_object( 

1063 self._driver, '_get_cephfs_filesystem_allocation', 

1064 mock.Mock(return_value=20.0) 

1065 ) 

1066 self.mock_object( 

1067 self._driver, '_get_cephfs_filesystem_allocation', 

1068 mock.Mock(return_value=allocated_capacity_gb) 

1069 ) 

1070 

1071 self._driver._update_share_stats() 

1072 result = self._driver._stats 

1073 

1074 self.assertEqual(5, result['pools'][0]['reserved_percentage']) 

1075 self.assertEqual(2, result['pools'][0]['reserved_snapshot_percentage']) 

1076 self.assertEqual( 

1077 2, result['pools'][0]['reserved_share_extend_percentage']) 

1078 self.assertEqual(164.94, result['pools'][0]['total_capacity_gb']) 

1079 self.assertEqual(149.84, result['pools'][0]['free_capacity_gb']) 

1080 self.assertEqual(20.0, result['pools'][0]['allocated_capacity_gb']) 

1081 self.assertTrue(result['ipv4_support']) 

1082 self.assertFalse(result['ipv6_support']) 

1083 self.assertEqual("CEPHFS", result['storage_protocol']) 

1084 if cache_expired: 

1085 self._driver._get_cephfs_filesystem_allocation.assert_called_once() 

1086 (self._driver._cached_allocated_capacity_gb 

1087 .update_data.assert_called_once_with(allocated_capacity_gb)) 

1088 else: 

1089 (self._driver._cached_allocated_capacity_gb 

1090 .get_data.assert_called_once()) 

1091 

1092 @ddt.data('cephfs', 'nfs') 

1093 def test_get_configured_ip_versions(self, protocol_helper): 

1094 self._driver.configuration.cephfs_protocol_helper_type = ( 

1095 protocol_helper) 

1096 

1097 self._driver.get_configured_ip_versions() 

1098 

1099 (self._driver.protocol_helper.get_configured_ip_versions. 

1100 assert_called_once_with()) 

1101 

1102 @ddt.data( 

1103 ([{'id': 'instance_mapping_id1', 'access_id': 'accessid1', 

1104 'access_level': 'rw', 'access_type': 'cephx', 'access_to': 'alice' 

1105 }], 'fake_project_uuid_1'), 

1106 ([{'id': 'instance_mapping_id1', 'access_id': 'accessid1', 

1107 'access_level': 'rw', 'access_type': 'cephx', 'access_to': 'alice' 

1108 }], 'fake_project_uuid_2'), 

1109 ([], 'fake_project_uuid_1'), 

1110 ([], 'fake_project_uuid_2'), 

1111 ) 

1112 @ddt.unpack 

1113 def test_transfer_accept(self, access_rules, new_project): 

1114 fake_share_1 = {"project_id": "fake_project_uuid_1"} 

1115 same_project = new_project == 'fake_project_uuid_1' 

1116 if access_rules and not same_project: 

1117 self.assertRaises(exception.DriverCannotTransferShareWithRules, 

1118 self._driver.transfer_accept, 

1119 self._context, fake_share_1, 

1120 'new_user', new_project, access_rules) 

1121 

1122 def test_get_share_status_returns_none_for_unexpected_status(self): 

1123 """Test get_share_status returns None for non-creating status.""" 

1124 share = fake_share.fake_share(status=constants.STATUS_AVAILABLE) 

1125 

1126 result = self._driver.get_share_status(share) 

1127 

1128 self.assertIsNone(result) 

1129 

1130 def test__need_to_cancel_clone_returns_false_for_regular_subvolume(self): 

1131 """Test _need_to_cancel_clone handles non-clone subvolumes.""" 

1132 driver.rados_command.side_effect = ( 

1133 exception.ShareBackendException(msg="not allowed on subvolume")) 

1134 

1135 result = self._driver._need_to_cancel_clone( 

1136 self._share, self._share['id']) 

1137 

1138 self.assertFalse(result) 

1139 

1140 

1141@ddt.ddt 

1142class NativeProtocolHelperTestCase(test.TestCase): 

1143 

1144 def setUp(self): 

1145 super(NativeProtocolHelperTestCase, self).setUp() 

1146 self.fake_conf = configuration.Configuration(None) 

1147 self._context = context.get_admin_context() 

1148 self._share = fake_share.fake_share_instance(share_proto='CEPHFS') 

1149 

1150 self.fake_conf.set_default('driver_handles_share_servers', False) 

1151 

1152 self.mock_object(driver, "rados_command") 

1153 

1154 driver.ceph_default_target = ('mon-mgr', ) 

1155 

1156 self._native_protocol_helper = driver.NativeProtocolHelper( 

1157 None, 

1158 self.fake_conf, 

1159 rados_client=MockRadosModule.Rados(), 

1160 volname="cephfs" 

1161 ) 

1162 

1163 self._rados_client = self._native_protocol_helper.rados_client 

1164 

1165 self._native_protocol_helper.get_mon_addrs = mock.Mock( 

1166 return_value=['1.2.3.4', '5.6.7.8']) 

1167 

1168 def test_check_for_setup_error(self): 

1169 expected = None 

1170 

1171 result = self._native_protocol_helper.check_for_setup_error() 

1172 

1173 self.assertEqual(expected, result) 

1174 

1175 def test_get_export_locations(self): 

1176 fake_cephfs_subvolume_path = '/foo/bar' 

1177 expected_export_locations = { 

1178 'path': '1.2.3.4,5.6.7.8:/foo/bar', 

1179 'is_admin_only': False, 

1180 'metadata': {}, 

1181 } 

1182 

1183 export_locations = self._native_protocol_helper.get_export_locations( 

1184 self._share, fake_cephfs_subvolume_path) 

1185 

1186 self.assertEqual(expected_export_locations, export_locations) 

1187 self._native_protocol_helper.get_mon_addrs.assert_called_once_with() 

1188 

1189 @ddt.data(constants.ACCESS_LEVEL_RW, constants.ACCESS_LEVEL_RO) 

1190 def test_allow_access_rw_ro(self, mode): 

1191 access_allow_prefix = "fs subvolume authorize" 

1192 access_allow_mode = "r" if mode == "ro" else "rw" 

1193 

1194 access_allow_dict = { 

1195 "vol_name": self._native_protocol_helper.volname, 

1196 "sub_name": self._share["id"], 

1197 "auth_id": "alice", 

1198 "tenant_id": self._share["project_id"], 

1199 "access_level": access_allow_mode, 

1200 } 

1201 

1202 rule = { 

1203 'access_level': mode, 

1204 'access_to': 'alice', 

1205 'access_type': 'cephx', 

1206 } 

1207 

1208 driver.rados_command.return_value = 'native-zorilla' 

1209 

1210 auth_key = self._native_protocol_helper._allow_access( 

1211 self._context, self._share, rule, sub_name=self._share['id']) 

1212 

1213 self.assertEqual("native-zorilla", auth_key) 

1214 

1215 driver.rados_command.assert_called_once_with( 

1216 self._rados_client, 

1217 access_allow_prefix, access_allow_dict) 

1218 

1219 def test_allow_access_wrong_type(self): 

1220 self.assertRaises( 

1221 exception.InvalidShareAccessType, 

1222 self._native_protocol_helper._allow_access, 

1223 self._context, 

1224 self._share, 

1225 { 

1226 'access_level': constants.ACCESS_LEVEL_RW, 

1227 'access_type': 'RHUBARB', 

1228 'access_to': 'alice' 

1229 }, 

1230 self._share['id'] 

1231 ) 

1232 

1233 def test_allow_access_same_cephx_id_as_manila_service(self): 

1234 self.assertRaises( 

1235 exception.InvalidShareAccess, 

1236 self._native_protocol_helper._allow_access, 

1237 self._context, 

1238 self._share, 

1239 { 

1240 'access_level': constants.ACCESS_LEVEL_RW, 

1241 'access_type': 'cephx', 

1242 'access_to': 'manila', 

1243 }, 

1244 self._share['id'] 

1245 ) 

1246 

1247 def test_allow_access_to_preexisting_ceph_user(self): 

1248 msg = ("auth ID: admin exists and not created by " 

1249 "ceph manager plugin. Not allowed to modify") 

1250 driver.rados_command.side_effect = exception.ShareBackendException(msg) 

1251 

1252 self.assertRaises(exception.InvalidShareAccess, 

1253 self._native_protocol_helper._allow_access, 

1254 self._context, self._share, 

1255 { 

1256 'access_level': constants.ACCESS_LEVEL_RW, 

1257 'access_type': 'cephx', 

1258 'access_to': 'admin' 

1259 }, 

1260 self._share['id'] 

1261 ) 

1262 

1263 def test_deny_access(self): 

1264 access_deny_prefix = "fs subvolume deauthorize" 

1265 

1266 access_deny_dict = { 

1267 "vol_name": self._native_protocol_helper.volname, 

1268 "sub_name": self._share["id"], 

1269 "auth_id": "alice", 

1270 } 

1271 

1272 evict_prefix = "fs subvolume evict" 

1273 

1274 evict_dict = access_deny_dict 

1275 

1276 self._native_protocol_helper._deny_access( 

1277 self._context, 

1278 self._share, 

1279 { 

1280 'access_level': 'rw', 

1281 'access_type': 'cephx', 

1282 'access_to': 'alice' 

1283 }, 

1284 sub_name=self._share['id'] 

1285 ) 

1286 

1287 driver.rados_command.assert_has_calls([ 

1288 mock.call(self._native_protocol_helper.rados_client, 

1289 access_deny_prefix, 

1290 access_deny_dict), 

1291 mock.call(self._native_protocol_helper.rados_client, 

1292 evict_prefix, 

1293 evict_dict)]) 

1294 

1295 self.assertEqual(2, driver.rados_command.call_count) 

1296 

1297 def test_deny_access_missing_access_rule(self): 

1298 access_deny_prefix = "fs subvolume deauthorize" 

1299 

1300 exception_msg = ( 

1301 f"json_command failed - prefix=fs subvolume deauthorize, " 

1302 f"argdict='vol_name': {self._native_protocol_helper.volname}, " 

1303 f"'sub_name': '{self._share['id']}', 'auth_id': 'alice', " 

1304 f"'format': 'json' - exception message: [errno -2] " 

1305 f"auth ID: alice doesn't exist.") 

1306 

1307 driver.rados_command.side_effect = exception.ShareBackendException( 

1308 msg=exception_msg) 

1309 

1310 access_deny_dict = { 

1311 "vol_name": self._native_protocol_helper.volname, 

1312 "sub_name": self._share["id"], 

1313 "auth_id": "alice", 

1314 } 

1315 

1316 self._native_protocol_helper._deny_access( 

1317 self._context, 

1318 self._share, 

1319 { 

1320 'access_level': 'rw', 

1321 'access_type': 'cephx', 

1322 'access_to': 'alice' 

1323 }, 

1324 sub_name=self._share['id'] 

1325 ) 

1326 

1327 driver.rados_command.assert_called_once_with( 

1328 self._native_protocol_helper.rados_client, 

1329 access_deny_prefix, access_deny_dict) 

1330 

1331 self.assertEqual(1, driver.rados_command.call_count) 

1332 

1333 def test_update_access_add_rm(self): 

1334 alice = { 

1335 'id': 'instance_mapping_id1', 

1336 'access_id': 'accessid1', 

1337 'access_level': 'rw', 

1338 'access_type': 'cephx', 

1339 'access_to': 'alice' 

1340 } 

1341 bob = { 

1342 'id': 'instance_mapping_id2', 

1343 'access_id': 'accessid2', 

1344 'access_level': 'ro', 

1345 'access_type': 'cephx', 

1346 'access_to': 'bob' 

1347 } 

1348 manila = { 

1349 'id': 'instance_mapping_id3', 

1350 'access_id': 'accessid3', 

1351 'access_level': 'ro', 

1352 'access_type': 'cephx', 

1353 'access_to': 'manila' 

1354 } 

1355 admin = { 

1356 'id': 'instance_mapping_id4', 

1357 'access_id': 'accessid4', 

1358 'access_level': 'rw', 

1359 'access_type': 'cephx', 

1360 'access_to': 'admin' 

1361 } 

1362 dabo = { 

1363 'id': 'instance_mapping_id5', 

1364 'access_id': 'accessid5', 

1365 'access_level': 'rwx', 

1366 'access_type': 'cephx', 

1367 'access_to': 'dabo' 

1368 } 

1369 

1370 allow_access_side_effects = [ 

1371 'abc123', 

1372 exception.InvalidShareAccess(reason='not'), 

1373 exception.InvalidShareAccess(reason='allowed'), 

1374 exception.InvalidShareAccessLevel(level='rwx') 

1375 ] 

1376 self.mock_object(self._native_protocol_helper.message_api, 'create') 

1377 self.mock_object(self._native_protocol_helper, '_deny_access') 

1378 self.mock_object(self._native_protocol_helper, 

1379 '_allow_access', 

1380 mock.Mock(side_effect=allow_access_side_effects)) 

1381 

1382 access_updates = self._native_protocol_helper.update_access( 

1383 self._context, 

1384 self._share, 

1385 access_rules=[alice, manila, admin, dabo], 

1386 add_rules=[alice, manila, admin, dabo], 

1387 delete_rules=[bob], 

1388 update_rules=[], 

1389 sub_name=self._share['id'] 

1390 ) 

1391 

1392 expected_access_updates = { 

1393 'accessid1': {'access_key': 'abc123'}, 

1394 'accessid3': {'state': 'error'}, 

1395 'accessid4': {'state': 'error'}, 

1396 'accessid5': {'state': 'error'} 

1397 } 

1398 self.assertEqual(expected_access_updates, access_updates) 

1399 self._native_protocol_helper._allow_access.assert_has_calls( 

1400 [mock.call(self._context, self._share, alice, 

1401 sub_name=self._share['id']), 

1402 mock.call(self._context, self._share, manila, 

1403 sub_name=self._share['id']), 

1404 mock.call(self._context, self._share, admin, 

1405 sub_name=self._share['id'])]) 

1406 self._native_protocol_helper._deny_access.assert_called_once_with( 

1407 self._context, self._share, bob, sub_name=self._share['id']) 

1408 self.assertEqual( 

1409 3, self._native_protocol_helper.message_api.create.call_count) 

1410 

1411 def test_update_access_all(self): 

1412 get_authorized_ids_prefix = "fs subvolume authorized_list" 

1413 

1414 get_authorized_ids_dict = { 

1415 "vol_name": self._native_protocol_helper.volname, 

1416 "sub_name": self._share["id"] 

1417 } 

1418 

1419 access_allow_prefix = "fs subvolume authorize" 

1420 

1421 access_allow_dict = { 

1422 "vol_name": self._native_protocol_helper.volname, 

1423 "sub_name": self._share["id"], 

1424 "auth_id": "alice", 

1425 "tenant_id": self._share["project_id"], 

1426 "access_level": "rw", 

1427 } 

1428 

1429 access_deny_prefix = "fs subvolume deauthorize" 

1430 

1431 access_deny_john_dict = { 

1432 "vol_name": self._native_protocol_helper.volname, 

1433 "sub_name": self._share["id"], 

1434 "auth_id": "john", 

1435 } 

1436 

1437 access_deny_paul_dict = { 

1438 "vol_name": self._native_protocol_helper.volname, 

1439 "sub_name": self._share["id"], 

1440 "auth_id": "paul", 

1441 } 

1442 

1443 evict_prefix = "fs subvolume evict" 

1444 

1445 alice = { 

1446 'id': 'instance_mapping_id1', 

1447 'access_id': 'accessid1', 

1448 'access_level': 'rw', 

1449 'access_type': 'cephx', 

1450 'access_to': 'alice', 

1451 } 

1452 

1453 driver.rados_command.side_effect = [ 

1454 [{"john": "rw"}, {"paul": "r"}], 

1455 'abc123', 

1456 mock.Mock(), mock.Mock(), 

1457 mock.Mock(), mock.Mock()] 

1458 

1459 access_updates = self._native_protocol_helper.update_access( 

1460 self._context, self._share, access_rules=[alice], add_rules=[], 

1461 delete_rules=[], update_rules=[], sub_name=self._share['id']) 

1462 

1463 self.assertEqual( 

1464 {'accessid1': {'access_key': 'abc123'}}, access_updates) 

1465 

1466 driver.rados_command.assert_has_calls([ 

1467 mock.call(self._native_protocol_helper.rados_client, 

1468 get_authorized_ids_prefix, 

1469 get_authorized_ids_dict, 

1470 json_obj=True), 

1471 mock.call(self._native_protocol_helper.rados_client, 

1472 access_allow_prefix, 

1473 access_allow_dict), 

1474 mock.call(self._native_protocol_helper.rados_client, 

1475 access_deny_prefix, 

1476 access_deny_john_dict), 

1477 mock.call(self._native_protocol_helper.rados_client, 

1478 evict_prefix, 

1479 access_deny_john_dict), 

1480 mock.call(self._native_protocol_helper.rados_client, 

1481 access_deny_prefix, 

1482 access_deny_paul_dict), 

1483 mock.call(self._native_protocol_helper.rados_client, 

1484 evict_prefix, 

1485 access_deny_paul_dict)], any_order=True) 

1486 

1487 self.assertEqual(6, driver.rados_command.call_count) 

1488 

1489 def test_get_configured_ip_versions(self): 

1490 expected = [4] 

1491 

1492 result = self._native_protocol_helper.get_configured_ip_versions() 

1493 

1494 self.assertEqual(expected, result) 

1495 

1496 

1497@ddt.ddt 

1498class NFSProtocolHelperTestCase(test.TestCase): 

1499 

1500 def setUp(self): 

1501 super(NFSProtocolHelperTestCase, self).setUp() 

1502 self._execute = mock.Mock() 

1503 self._share = fake_share.fake_share(share_proto='NFS') 

1504 self._rados_client = MockRadosModule.Rados() 

1505 self._volname = "cephfs" 

1506 self.fake_conf = configuration.Configuration(None) 

1507 

1508 self.fake_conf.set_default('cephfs_ganesha_server_ip', 

1509 'fakeip') 

1510 self.mock_object(driver.ganesha_utils, 'SSHExecutor') 

1511 self.mock_object(driver.ganesha_utils, 'RootExecutor') 

1512 self.mock_object(driver.socket, 'gethostname') 

1513 self.mock_object(driver, "rados_command") 

1514 

1515 driver.ceph_default_target = ('mon-mgr', ) 

1516 

1517 self._nfs_helper = driver.NFSProtocolHelper( 

1518 self._execute, 

1519 self.fake_conf, 

1520 rados_client=self._rados_client, 

1521 volname=self._volname) 

1522 

1523 @ddt.data( 

1524 (['fakehost', 'some.host.name', 'some.host.name.', '1.1.1.0'], False), 

1525 (['fakehost', 'some.host.name', 'some.host.name.', '1.1..1.0'], True), 

1526 (['fakehost', 'some.host.name', 'some.host.name', '1.1.1.256'], True), 

1527 (['fakehost..', 'some.host.name', 'some.host.name', '1.1.1.0'], True), 

1528 (['fakehost', 'some.host.name..', 'some.host.name', '1.1.1.0'], True), 

1529 (['fakehost', 'some.host.name', 'some.host.name.', '1.1..1.0'], True), 

1530 (['fakehost', 'some.host.name', '1.1.1.0/24'], True), 

1531 (['fakehost', 'some.host.name', '1.1.1.0', '1001::1001'], False), 

1532 (['fakehost', 'some.host.name', '1.1.1.0', '1001:1001'], True), 

1533 (['fakehost', 'some.host.name', '1.1.1.0', '1001::1001:'], True), 

1534 (['fakehost', 'some.host.name', '1.1.1.0', '1001::1001.'], True), 

1535 (['fakehost', 'some.host.name', '1.1.1.0', '1001::1001/129.'], True), 

1536 ) 

1537 @ddt.unpack 

1538 def test_check_for_setup_error(self, cephfs_ganesha_export_ips, raises): 

1539 fake_conf = configuration.Configuration(None) 

1540 fake_conf.set_default('cephfs_ganesha_export_ips', 

1541 cephfs_ganesha_export_ips) 

1542 

1543 helper = driver.NFSProtocolHelper( 

1544 self._execute, 

1545 fake_conf, 

1546 rados_client=MockRadosModule.Rados(), 

1547 volname="cephfs" 

1548 ) 

1549 

1550 if raises: 

1551 self.assertRaises(exception.InvalidParameterValue, 

1552 helper.check_for_setup_error) 

1553 else: 

1554 self.assertIsNone(helper.check_for_setup_error()) 

1555 

1556 @ddt.data(False, True) 

1557 def test_init_executor_type(self, ganesha_server_is_remote): 

1558 fake_conf = configuration.Configuration(None) 

1559 conf_args_list = [ 

1560 ('cephfs_ganesha_server_is_remote', ganesha_server_is_remote), 

1561 ('cephfs_ganesha_server_ip', 'fakeip'), 

1562 ('cephfs_ganesha_server_username', 'fake_username'), 

1563 ('cephfs_ganesha_server_password', 'fakepwd'), 

1564 ('cephfs_ganesha_path_to_private_key', 'fakepathtokey')] 

1565 for args in conf_args_list: 

1566 fake_conf.set_default(*args) 

1567 

1568 driver.NFSProtocolHelper( 

1569 self._execute, 

1570 fake_conf, 

1571 rados_client=MockRadosModule.Rados(), 

1572 volname="cephfs" 

1573 ) 

1574 

1575 if ganesha_server_is_remote: 

1576 driver.ganesha_utils.SSHExecutor.assert_has_calls( 

1577 [mock.call('fakeip', 22, None, 'fake_username', 

1578 password='fakepwd', 

1579 privatekey='fakepathtokey')]) 

1580 else: 

1581 driver.ganesha_utils.RootExecutor.assert_has_calls( 

1582 [mock.call(self._execute)]) 

1583 

1584 @ddt.data('fakeip', None) 

1585 def test_init_identify_local_host(self, ganesha_server_ip): 

1586 self.mock_object(driver.LOG, 'info') 

1587 fake_conf = configuration.Configuration(None) 

1588 conf_args_list = [ 

1589 ('cephfs_ganesha_server_ip', ganesha_server_ip), 

1590 ('cephfs_ganesha_server_username', 'fake_username'), 

1591 ('cephfs_ganesha_server_password', 'fakepwd'), 

1592 ('cephfs_ganesha_path_to_private_key', 'fakepathtokey')] 

1593 for args in conf_args_list: 

1594 fake_conf.set_default(*args) 

1595 

1596 driver.NFSProtocolHelper( 

1597 self._execute, 

1598 fake_conf, 

1599 rados_client=MockRadosModule.Rados(), 

1600 volname="cephfs" 

1601 ) 

1602 

1603 driver.ganesha_utils.RootExecutor.assert_has_calls( 

1604 [mock.call(self._execute)]) 

1605 if ganesha_server_ip: 

1606 self.assertFalse(driver.socket.gethostname.called) 

1607 self.assertFalse(driver.LOG.info.called) 

1608 else: 

1609 driver.socket.gethostname.assert_called_once_with() 

1610 driver.LOG.info.assert_called_once() 

1611 

1612 def test_get_export_locations_no_export_ips_configured(self): 

1613 cephfs_subvolume_path = "/foo/bar" 

1614 fake_conf = configuration.Configuration(None) 

1615 fake_conf.set_default('cephfs_ganesha_server_ip', '1.2.3.4') 

1616 

1617 helper = driver.NFSProtocolHelper( 

1618 self._execute, 

1619 fake_conf, 

1620 rados_client=MockRadosModule.Rados(), 

1621 volname="cephfs" 

1622 ) 

1623 

1624 ret = helper.get_export_locations(self._share, 

1625 cephfs_subvolume_path) 

1626 self.assertEqual( 

1627 [{ 

1628 'path': '1.2.3.4:/foo/bar', 

1629 'is_admin_only': False, 

1630 'metadata': { 

1631 'preferred': False, 

1632 }, 

1633 }], ret) 

1634 

1635 def test_get_export_locations_with_export_ips_configured(self): 

1636 fake_conf = configuration.Configuration(None) 

1637 conf_args_list = [ 

1638 ('cephfs_ganesha_server_ip', '1.2.3.4'), 

1639 ('cephfs_ganesha_export_ips', 

1640 ['127.0.0.1', 'fd3f:c057:1192:1::1', '::1'])] 

1641 for args in conf_args_list: 

1642 fake_conf.set_default(*args) 

1643 

1644 helper = driver.NFSProtocolHelper( 

1645 self._execute, 

1646 fake_conf, 

1647 rados_client=MockRadosModule.Rados(), 

1648 volname="cephfs" 

1649 ) 

1650 

1651 cephfs_subvolume_path = "/foo/bar" 

1652 

1653 ret = helper.get_export_locations(self._share, cephfs_subvolume_path) 

1654 

1655 self._assertEqualListsOfObjects( 

1656 [ 

1657 { 

1658 'path': '127.0.0.1:/foo/bar', 

1659 'is_admin_only': False, 

1660 'metadata': { 

1661 'preferred': False, 

1662 }, 

1663 }, 

1664 { 

1665 'path': '[fd3f:c057:1192:1::1]:/foo/bar', 

1666 'is_admin_only': False, 

1667 'metadata': { 

1668 'preferred': False, 

1669 }, 

1670 }, 

1671 { 

1672 'path': '[::1]:/foo/bar', 

1673 'is_admin_only': False, 

1674 'metadata': { 

1675 'preferred': False, 

1676 }, 

1677 }, 

1678 ], ret) 

1679 

1680 @ddt.data(('some.host.name', None, [4, 6]), ('host.', None, [4, 6]), 

1681 ('1001::1001', None, [6]), ('1.1.1.0', None, [4]), 

1682 (None, ['1001::1001', '1.1.1.0'], [6, 4]), 

1683 (None, ['1001::1001'], [6]), (None, ['1.1.1.0'], [4]), 

1684 (None, ['1001::1001/129', '1.1.1.0'], [4, 6])) 

1685 @ddt.unpack 

1686 def test_get_configured_ip_versions( 

1687 self, cephfs_ganesha_server_ip, cephfs_ganesha_export_ips, 

1688 configured_ip_version): 

1689 fake_conf = configuration.Configuration(None) 

1690 conf_args_list = [ 

1691 ('cephfs_ganesha_server_ip', cephfs_ganesha_server_ip), 

1692 ('cephfs_ganesha_export_ips', cephfs_ganesha_export_ips)] 

1693 

1694 for args in conf_args_list: 

1695 fake_conf.set_default(*args) 

1696 

1697 helper = driver.NFSProtocolHelper( 

1698 self._execute, 

1699 fake_conf, 

1700 rados_client=MockRadosModule.Rados(), 

1701 volname="cephfs" 

1702 ) 

1703 

1704 self.assertEqual(set(configured_ip_version), 

1705 set(helper.get_configured_ip_versions())) 

1706 self.assertEqual(set(configured_ip_version), 

1707 helper.configured_ip_versions) 

1708 

1709 def test_get_configured_ip_versions_already_set(self): 

1710 fake_conf = configuration.Configuration(None) 

1711 helper = driver.NFSProtocolHelper( 

1712 self._execute, 

1713 fake_conf, 

1714 rados_client=MockRadosModule.Rados(), 

1715 volname="cephfs" 

1716 ) 

1717 

1718 ip_versions = ['foo', 'bar'] 

1719 

1720 helper.configured_ip_versions = ip_versions 

1721 

1722 result = helper.get_configured_ip_versions() 

1723 

1724 self.assertEqual(ip_versions, result) 

1725 

1726 def test_default_config_hook(self): 

1727 fake_conf_dict = {'key': 'value1'} 

1728 self.mock_object(driver.ganesha.GaneshaNASHelper, 

1729 '_default_config_hook', 

1730 mock.Mock(return_value={})) 

1731 self.mock_object(driver.ganesha_utils, 'path_from', 

1732 mock.Mock(return_value='/fakedir/cephfs/conf')) 

1733 self.mock_object(self._nfs_helper, '_load_conf_dir', 

1734 mock.Mock(return_value=fake_conf_dict)) 

1735 

1736 ret = self._nfs_helper._default_config_hook() 

1737 

1738 (driver.ganesha.GaneshaNASHelper._default_config_hook. 

1739 assert_called_once_with()) 

1740 driver.ganesha_utils.path_from.assert_called_once_with( 

1741 driver.__file__, 'conf') 

1742 self._nfs_helper._load_conf_dir.assert_called_once_with( 

1743 '/fakedir/cephfs/conf') 

1744 self.assertEqual(fake_conf_dict, ret) 

1745 

1746 def test_fsal_hook(self): 

1747 access_allow_prefix = "fs subvolume authorize" 

1748 

1749 access_allow_dict = { 

1750 "vol_name": self._nfs_helper.volname, 

1751 "sub_name": self._share["id"], 

1752 "auth_id": "ganesha-fakeid", 

1753 "tenant_id": self._share["project_id"], 

1754 "access_level": "rw", 

1755 } 

1756 

1757 expected_ret = { 

1758 "Name": "Ceph", 

1759 "User_Id": "ganesha-fakeid", 

1760 "Secret_Access_Key": "ganesha-zorilla", 

1761 "Filesystem": self._nfs_helper.volname 

1762 } 

1763 

1764 driver.rados_command.return_value = 'ganesha-zorilla' 

1765 

1766 ret = self._nfs_helper._fsal_hook( 

1767 None, self._share, None, self._share['id'] 

1768 ) 

1769 

1770 driver.rados_command.assert_called_once_with( 

1771 self._nfs_helper.rados_client, 

1772 access_allow_prefix, access_allow_dict) 

1773 

1774 self.assertEqual(expected_ret, ret) 

1775 

1776 def test_cleanup_fsal_hook(self): 

1777 access_deny_prefix = "fs subvolume deauthorize" 

1778 

1779 access_deny_dict = { 

1780 "vol_name": self._nfs_helper.volname, 

1781 "sub_name": self._share["id"], 

1782 "auth_id": "ganesha-fakeid", 

1783 } 

1784 

1785 ret = self._nfs_helper._cleanup_fsal_hook( 

1786 None, self._share, None, self._share['id'] 

1787 ) 

1788 

1789 driver.rados_command.assert_called_once_with( 

1790 self._nfs_helper.rados_client, 

1791 access_deny_prefix, access_deny_dict) 

1792 

1793 self.assertIsNone(ret) 

1794 

1795 def test_get_export_path(self): 

1796 get_path_prefix = "fs subvolume getpath" 

1797 

1798 get_path_dict = { 

1799 "vol_name": self._nfs_helper.volname, 

1800 "sub_name": self._share["id"], 

1801 } 

1802 

1803 driver.rados_command.return_value = '/foo/bar' 

1804 

1805 ret = self._nfs_helper._get_export_path(self._share) 

1806 

1807 driver.rados_command.assert_called_once_with( 

1808 self._nfs_helper.rados_client, 

1809 get_path_prefix, get_path_dict) 

1810 

1811 self.assertEqual('/foo/bar', ret) 

1812 

1813 def test_get_export_pseudo_path(self): 

1814 get_path_prefix = "fs subvolume getpath" 

1815 

1816 get_path_dict = { 

1817 "vol_name": self._nfs_helper.volname, 

1818 "sub_name": self._share["id"], 

1819 } 

1820 

1821 driver.rados_command.return_value = '/foo/bar' 

1822 

1823 ret = self._nfs_helper._get_export_pseudo_path(self._share) 

1824 

1825 driver.rados_command.assert_called_once_with( 

1826 self._nfs_helper.rados_client, 

1827 get_path_prefix, get_path_dict) 

1828 

1829 self.assertEqual('/foo/bar', ret) 

1830 

1831 

1832@ddt.ddt 

1833class NFSClusterProtocolHelperTestCase(test.TestCase): 

1834 

1835 def setUp(self): 

1836 super(NFSClusterProtocolHelperTestCase, self).setUp() 

1837 self._execute = mock.Mock() 

1838 self._context = context.get_admin_context() 

1839 self._share = fake_share.fake_share(share_proto='NFS') 

1840 self._rados_client = MockRadosModule.Rados() 

1841 self._volname = "cephfs" 

1842 self.fake_conf = configuration.Configuration(None) 

1843 

1844 self.mock_object(driver.NFSClusterProtocolHelper, 

1845 '_get_export_path', 

1846 mock.Mock(return_value="ganesha:/foo/bar")) 

1847 self.mock_object(driver.NFSClusterProtocolHelper, 

1848 '_get_export_pseudo_path', 

1849 mock.Mock(return_value="ganesha:/foo/bar")) 

1850 self.mock_object(driver, "rados_command") 

1851 

1852 driver.ceph_default_target = ('mon-mgr', ) 

1853 

1854 self._nfscluster_protocol_helper = driver.NFSClusterProtocolHelper( 

1855 self._execute, 

1856 self.fake_conf, 

1857 rados_client=self._rados_client, 

1858 volname=self._volname) 

1859 

1860 type(self._nfscluster_protocol_helper).nfs_clusterid = ( 

1861 mock.PropertyMock(return_value='fs-manila')) 

1862 

1863 def test_get_export_ips_no_backends(self): 

1864 fake_conf = configuration.Configuration(None) 

1865 cluster_info = { 

1866 "fs-manila": { 

1867 "virtual_ip": None, 

1868 "backend": [] 

1869 } 

1870 } 

1871 

1872 driver.rados_command.return_value = json.dumps(cluster_info) 

1873 

1874 helper = driver.NFSClusterProtocolHelper( 

1875 self._execute, 

1876 fake_conf, 

1877 rados_client=self._rados_client, 

1878 volname=self._volname 

1879 ) 

1880 

1881 self.assertRaises(exception.ShareBackendException, 

1882 helper._get_export_ips) 

1883 

1884 @ddt.data(constants.ACCESS_LEVEL_RW, constants.ACCESS_LEVEL_RO) 

1885 def test_allow_access_rw_ro_when_export_does_not_exist(self, mode): 

1886 export_info_prefix = "nfs export info" 

1887 access_allow_prefix = "nfs export apply" 

1888 nfs_clusterid = self._nfscluster_protocol_helper.nfs_clusterid 

1889 volname = self._nfscluster_protocol_helper.volname 

1890 

1891 driver.rados_command.return_value = {} 

1892 

1893 clients = { 

1894 'access_type': mode, 

1895 'addresses': ['10.0.0.1'], 

1896 'squash': 'none' 

1897 } 

1898 

1899 export_info_dict = { 

1900 "cluster_id": nfs_clusterid, 

1901 "pseudo_path": "ganesha:/foo/bar", 

1902 } 

1903 

1904 access_allow_dict = { 

1905 "cluster_id": nfs_clusterid, 

1906 } 

1907 

1908 export = { 

1909 "path": "ganesha:/foo/bar", 

1910 "cluster_id": nfs_clusterid, 

1911 "pseudo": "ganesha:/foo/bar", 

1912 "squash": "none", 

1913 "security_label": True, 

1914 "fsal": { 

1915 "name": "CEPH", 

1916 "fs_name": volname, 

1917 

1918 }, 

1919 "clients": clients 

1920 } 

1921 

1922 inbuf = json.dumps(export).encode('utf-8') 

1923 

1924 self._nfscluster_protocol_helper._allow_access( 

1925 self._share, clients, sub_name=self._share['id'] 

1926 ) 

1927 

1928 driver.rados_command.assert_has_calls([ 

1929 mock.call(self._rados_client, 

1930 export_info_prefix, 

1931 export_info_dict, json_obj=True), 

1932 mock.call(self._rados_client, 

1933 access_allow_prefix, 

1934 access_allow_dict, inbuf=inbuf)]) 

1935 

1936 self.assertEqual(2, driver.rados_command.call_count) 

1937 

1938 @ddt.data(constants.ACCESS_LEVEL_RW, constants.ACCESS_LEVEL_RO) 

1939 def test_allow_access_rw_ro_when_export_exist(self, mode): 

1940 export_info_prefix = "nfs export info" 

1941 access_allow_prefix = "nfs export apply" 

1942 nfs_clusterid = self._nfscluster_protocol_helper.nfs_clusterid 

1943 volname = self._nfscluster_protocol_helper.volname 

1944 

1945 new_clients = { 

1946 'access_type': mode, 

1947 'addresses': ['10.0.0.2'], 

1948 'squash': 'none' 

1949 } 

1950 

1951 export_info_dict = { 

1952 "cluster_id": nfs_clusterid, 

1953 "pseudo_path": "ganesha:/foo/bar", 

1954 } 

1955 

1956 access_allow_dict = { 

1957 "cluster_id": nfs_clusterid, 

1958 } 

1959 

1960 export = { 

1961 "path": "ganesha:/foo/bar", 

1962 "cluster_id": nfs_clusterid, 

1963 "pseudo": "ganesha:/foo/bar", 

1964 "squash": "none", 

1965 "security_label": True, 

1966 "fsal": { 

1967 "name": "CEPH", 

1968 "User_Id": "nfs.user", 

1969 "fs_name": volname 

1970 

1971 }, 

1972 "clients": { 

1973 'access_type': "ro", 

1974 'addresses': ['10.0.0.1'], 

1975 'squash': 'none' 

1976 } 

1977 } 

1978 

1979 driver.rados_command.return_value = export 

1980 export['clients'] = new_clients 

1981 inbuf = json.dumps(export).encode('utf-8') 

1982 

1983 self._nfscluster_protocol_helper._allow_access( 

1984 self._share, new_clients, sub_name=self._share['id'] 

1985 ) 

1986 

1987 driver.rados_command.assert_has_calls([ 

1988 mock.call(self._rados_client, 

1989 export_info_prefix, 

1990 export_info_dict, json_obj=True), 

1991 mock.call(self._rados_client, 

1992 access_allow_prefix, 

1993 access_allow_dict, inbuf=inbuf)]) 

1994 

1995 self.assertEqual(2, driver.rados_command.call_count) 

1996 

1997 def test_deny_access(self): 

1998 access_deny_prefix = "nfs export rm" 

1999 

2000 nfs_clusterid = self._nfscluster_protocol_helper.nfs_clusterid 

2001 

2002 access_deny_dict = { 

2003 "cluster_id": nfs_clusterid, 

2004 "pseudo_path": "ganesha:/foo/bar" 

2005 } 

2006 

2007 self._nfscluster_protocol_helper._deny_access( 

2008 self._share, self._share['id'] 

2009 ) 

2010 

2011 driver.rados_command.assert_called_once_with( 

2012 self._rados_client, 

2013 access_deny_prefix, access_deny_dict) 

2014 

2015 def test_get_export_locations(self): 

2016 cluster_info_prefix = "nfs cluster info" 

2017 nfs_clusterid = self._nfscluster_protocol_helper.nfs_clusterid 

2018 

2019 cluster_info_dict = { 

2020 "cluster_id": nfs_clusterid, 

2021 } 

2022 

2023 cluster_info = {"fs-manila": { 

2024 "virtual_ip": None, 

2025 "backend": [ 

2026 {"hostname": "fake-ceph-node-1", 

2027 "ip": "10.0.0.10", 

2028 "port": "1010"}, 

2029 {"hostname": "fake-ceph-node-2", 

2030 "ip": "10.0.0.11", 

2031 "port": "1011"} 

2032 ] 

2033 }} 

2034 

2035 driver.rados_command.return_value = json.dumps(cluster_info) 

2036 

2037 fake_cephfs_subvolume_path = "/foo/bar" 

2038 expected_export_locations = [{ 

2039 'path': '10.0.0.10:/foo/bar', 

2040 'is_admin_only': False, 

2041 'metadata': { 

2042 'preferred': True, 

2043 }, 

2044 }, { 

2045 'path': '10.0.0.11:/foo/bar', 

2046 'is_admin_only': False, 

2047 'metadata': { 

2048 'preferred': True, 

2049 }, 

2050 }] 

2051 

2052 export_locations = ( 

2053 self._nfscluster_protocol_helper.get_export_locations( 

2054 self._share, fake_cephfs_subvolume_path)) 

2055 

2056 driver.rados_command.assert_called_once_with( 

2057 self._rados_client, 

2058 cluster_info_prefix, cluster_info_dict) 

2059 

2060 self._assertEqualListsOfObjects(expected_export_locations, 

2061 export_locations) 

2062 

2063 @ddt.data('cephfs_ganesha_server_ip', 'cephfs_ganesha_export_ips') 

2064 def test_get_export_locations_ganesha_still_configured(self, confopt): 

2065 if confopt == 'cephfs_ganesha_server_ip': 

2066 val = '10.0.0.1' 

2067 else: 

2068 val = ['10.0.0.2', '10.0.0.3'] 

2069 

2070 cluster_info_prefix = "nfs cluster info" 

2071 nfs_clusterid = self._nfscluster_protocol_helper.nfs_clusterid 

2072 self.fake_conf.set_default(confopt, val) 

2073 

2074 cluster_info_dict = { 

2075 "cluster_id": nfs_clusterid, 

2076 } 

2077 

2078 cluster_info = {"fs-manila": { 

2079 "virtual_ip": None, 

2080 "backend": [ 

2081 {"hostname": "fake-ceph-node-1", 

2082 "ip": "10.0.0.10", 

2083 "port": "1010"}, 

2084 {"hostname": "fake-ceph-node-2", 

2085 "ip": "10.0.0.11", 

2086 "port": "1011"} 

2087 ] 

2088 }} 

2089 

2090 driver.rados_command.return_value = json.dumps(cluster_info) 

2091 

2092 fake_cephfs_subvolume_path = "/foo/bar" 

2093 expected_export_locations = [ 

2094 { 

2095 'path': '10.0.0.10:/foo/bar', 

2096 'is_admin_only': False, 

2097 'metadata': { 

2098 'preferred': True, 

2099 }, 

2100 }, 

2101 { 

2102 'path': '10.0.0.11:/foo/bar', 

2103 'is_admin_only': False, 

2104 'metadata': { 

2105 'preferred': True, 

2106 }, 

2107 }, 

2108 ] 

2109 

2110 if isinstance(val, list): 

2111 for ip in val: 

2112 expected_export_locations.append( 

2113 { 

2114 'path': f'{ip}:/foo/bar', 

2115 'is_admin_only': False, 

2116 'metadata': { 

2117 'preferred': False, 

2118 }, 

2119 }, 

2120 ) 

2121 else: 

2122 expected_export_locations.append( 

2123 { 

2124 'path': f'{val}:/foo/bar', 

2125 'is_admin_only': False, 

2126 'metadata': { 

2127 'preferred': False, 

2128 }, 

2129 } 

2130 ) 

2131 

2132 expected_export_locations = sorted( 

2133 expected_export_locations, 

2134 key=lambda d: d['path'] 

2135 ) 

2136 export_locations = ( 

2137 self._nfscluster_protocol_helper.get_export_locations( 

2138 self._share, fake_cephfs_subvolume_path) 

2139 ) 

2140 

2141 actual_export_locations = sorted( 

2142 export_locations, 

2143 key=lambda d: d['path'] 

2144 ) 

2145 

2146 driver.rados_command.assert_called_once_with( 

2147 self._rados_client, 

2148 cluster_info_prefix, cluster_info_dict) 

2149 

2150 self.assertEqual(expected_export_locations, 

2151 actual_export_locations) 

2152 

2153 

2154@ddt.ddt 

2155class CephFSDriverAltConfigTestCase(test.TestCase): 

2156 """Test the CephFS driver with non-default config values.""" 

2157 

2158 def setUp(self): 

2159 super(CephFSDriverAltConfigTestCase, self).setUp() 

2160 self._execute = mock.Mock() 

2161 self.fake_conf = configuration.Configuration(None) 

2162 self._rados_client = MockRadosModule.Rados() 

2163 self._context = context.get_admin_context() 

2164 self._share = fake_share.fake_share(share_proto='CEPHFS') 

2165 

2166 self.fake_conf.set_default('driver_handles_share_servers', False) 

2167 self.fake_conf.set_default('cephfs_auth_id', 'manila') 

2168 

2169 self.mock_object(driver, "rados", MockRadosModule) 

2170 self.mock_object(driver, "json_command", 

2171 MockCephArgparseModule.json_command) 

2172 self.mock_object(driver, "rados_command") 

2173 self.mock_object(driver, 'NativeProtocolHelper') 

2174 self.mock_object(driver, 'NFSProtocolHelper') 

2175 

2176 driver.ceph_default_target = ('mon-mgr', ) 

2177 

2178 @ddt.data('cephfs', 'nfs') 

2179 def test_do_setup_alt_volume_mode(self, protocol_helper): 

2180 self.fake_conf.set_default('cephfs_volume_mode', ALT_VOLUME_MODE) 

2181 self._driver = driver.CephFSDriver(execute=self._execute, 

2182 configuration=self.fake_conf, 

2183 rados_client=self._rados_client) 

2184 self.mock_object( 

2185 self._driver, '_get_cephfs_filesystem_allocation', 

2186 mock.Mock(return_value=10) 

2187 ) 

2188 

2189 type(self._driver).volname = mock.PropertyMock(return_value='cephfs') 

2190 

2191 self._driver.configuration.cephfs_protocol_helper_type = ( 

2192 protocol_helper) 

2193 

2194 self._driver.do_setup(self._context) 

2195 

2196 if protocol_helper == 'cephfs': 

2197 driver.NativeProtocolHelper.assert_called_once_with( 

2198 self._execute, self._driver.configuration, 

2199 rados_client=self._driver.rados_client, 

2200 volname=self._driver.volname) 

2201 else: 

2202 driver.NFSProtocolHelper.assert_called_once_with( 

2203 self._execute, self._driver.configuration, 

2204 rados_client=self._driver._rados_client, 

2205 volname=self._driver.volname) 

2206 

2207 self._driver.protocol_helper.init_helper.assert_called_once_with() 

2208 

2209 self.assertEqual(ALT_VOLUME_MODE, self._driver._cephfs_volume_mode) 

2210 

2211 @ddt.data('0o759', '0x755', '12a3') 

2212 def test_volume_mode_exception(self, volume_mode): 

2213 # cephfs_volume_mode must be a string representing an int as octal 

2214 self.fake_conf.set_default('cephfs_volume_mode', volume_mode) 

2215 

2216 self.assertRaises(exception.BadConfigurationException, 

2217 driver.CephFSDriver, execute=self._execute, 

2218 configuration=self.fake_conf) 

2219 

2220 

2221@ddt.ddt 

2222class MiscTests(test.TestCase): 

2223 

2224 @ddt.data({'import_exc': None}, 

2225 {'import_exc': ImportError}) 

2226 @ddt.unpack 

2227 def test_rados_module_missing(self, import_exc): 

2228 driver.rados = None 

2229 with mock.patch.object( 

2230 driver.importutils, 

2231 'import_module', 

2232 side_effect=import_exc) as mock_import_module: 

2233 if import_exc: 

2234 self.assertRaises( 

2235 exception.ShareBackendException, driver.setup_rados) 

2236 else: 

2237 driver.setup_rados() 

2238 self.assertEqual(mock_import_module.return_value, 

2239 driver.rados) 

2240 

2241 mock_import_module.assert_called_once_with('rados') 

2242 

2243 @ddt.data({'import_exc': None}, 

2244 {'import_exc': ImportError}) 

2245 @ddt.unpack 

2246 def test_setup_json_class_missing(self, import_exc): 

2247 driver.json_command = None 

2248 with mock.patch.object( 

2249 driver.importutils, 

2250 'import_class', 

2251 side_effect=import_exc) as mock_import_class: 

2252 if import_exc: 

2253 self.assertRaises( 

2254 exception.ShareBackendException, driver.setup_json_command) 

2255 else: 

2256 driver.setup_json_command() 

2257 self.assertEqual(mock_import_class.return_value, 

2258 driver.json_command) 

2259 mock_import_class.assert_called_once_with( 

2260 'ceph_argparse.json_command')