Coverage for manila/tests/share/drivers/cephfs/test_driver.py: 99%
762 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2016 Red Hat, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15import json
16import math
17from unittest import mock
19import ddt
20from oslo_utils import units
22from manila.common import constants
23from manila import context
24import manila.exception as exception
25from manila.share import configuration
26from manila.share.drivers.cephfs import driver
27from manila.share import share_types
28from manila import test
29from manila.tests import fake_share
32DEFAULT_VOLUME_MODE = '755'
33ALT_VOLUME_MODE = '644'
36class MockRadosModule(object):
37 """Mocked up version of the rados module."""
39 class Rados(mock.Mock):
40 def __init__(self, *args, **kwargs):
41 mock.Mock.__init__(self, spec=[
42 "connect", "shutdown", "state"
43 ])
44 self.get_mon_addrs = mock.Mock(return_value=["1.2.3.4", "5.6.7.8"])
45 self.get_cluster_stats = mock.Mock(return_value={
46 "kb": 172953600,
47 "kb_avail": 157123584,
48 "kb_used": 15830016,
49 "num_objects": 26,
50 })
52 class Error(mock.Mock):
53 pass
56class MockAllocationCapacityCache(mock.Mock):
57 """Mocked up version of the rados module."""
58 def __init__(self, *args, **kwargs):
59 mock.Mock.__init__(self, spec=[
60 "update_data"
61 ])
62 self.is_expired = mock.Mock(return_value=False)
63 self.get_data = mock.Mock(return_value=20.0)
66class MockCephArgparseModule(object):
67 """Mocked up version of the ceph_argparse module."""
69 class json_command(mock.Mock):
70 def __init__(self, *args, **kwargs):
71 mock.Mock.__init__(self, spec=[
72 "connect", "shutdown", "state"
73 ])
76@ddt.ddt
77class AllocationCapacityCacheTestCase(test.TestCase):
78 """Test the Allocation capacity cache class.
80 This is a cache with a getter and a setter for the allocated capacity
81 cached value in the driver, also with a timeout control.
82 """
84 def setUp(self):
85 super(AllocationCapacityCacheTestCase, self).setUp()
86 timeout = 10
87 self._allocation_capacity_cache = driver.AllocationCapacityCache(
88 timeout
89 )
91 def test_set_get_data(self):
92 # Nothing set yet, info should be "expired"
93 self.assertTrue(
94 self._allocation_capacity_cache.is_expired()
95 )
97 # Class value starts with None
98 expected_allocated_capacity_gb = None
99 cached_allocated_capacity_gb = (
100 self._allocation_capacity_cache.get_data()
101 )
102 self.assertEqual(
103 cached_allocated_capacity_gb, expected_allocated_capacity_gb
104 )
106 # Set a new value and ensure it works properly
107 expected_allocated_capacity_gb = 100.0
108 self._allocation_capacity_cache.update_data(
109 expected_allocated_capacity_gb
110 )
111 cached_allocated_capacity_gb = (
112 self._allocation_capacity_cache.get_data()
113 )
114 self.assertEqual(
115 cached_allocated_capacity_gb, expected_allocated_capacity_gb
116 )
119@ddt.ddt
120class CephFSDriverTestCase(test.TestCase):
121 """Test the CephFS driver.
123 This is a very simple driver that mainly
124 calls through to the CephFSVolumeClient interface, so the tests validate
125 that the Manila driver calls map to the appropriate CephFSVolumeClient
126 calls.
127 """
129 def setUp(self):
130 super(CephFSDriverTestCase, self).setUp()
131 self._execute = mock.Mock()
132 self.fake_conf = configuration.Configuration(None)
133 self._context = context.get_admin_context()
134 self._share = fake_share.fake_share(share_proto='CEPHFS')
135 self._snapshot = fake_share.fake_snapshot_instance()
137 self.fake_conf.set_default('driver_handles_share_servers', False)
138 self.fake_conf.set_default('cephfs_auth_id', 'manila')
140 self.mock_object(driver, "rados_command")
141 self.mock_object(driver, "rados", MockRadosModule)
142 self.mock_object(driver, "json_command", MockCephArgparseModule)
143 self.mock_object(driver, 'NativeProtocolHelper')
144 self.mock_object(driver, 'NFSProtocolHelper')
145 self.mock_object(driver, 'NFSClusterProtocolHelper')
146 self.mock_object(driver, "AllocationCapacityCache",
147 MockAllocationCapacityCache)
149 driver.ceph_default_target = ('mon-mgr', )
150 self.fake_private_storage = mock.Mock()
151 self.mock_object(self.fake_private_storage, 'get',
152 mock.Mock(return_value=None))
154 self._driver = (
155 driver.CephFSDriver(execute=self._execute,
156 configuration=self.fake_conf,
157 private_storage=self.fake_private_storage))
158 self._driver.protocol_helper = mock.Mock()
159 self._driver._cached_allocated_capacity_gb = (
160 MockAllocationCapacityCache()
161 )
163 type(self._driver).volname = mock.PropertyMock(return_value='cephfs')
165 self.mock_object(share_types, 'get_share_type_extra_specs',
166 mock.Mock(return_value={}))
168 @ddt.data(
169 ('cephfs', None),
170 ('nfs', None),
171 ('nfs', 'fs-manila')
172 )
173 @ddt.unpack
174 def test_do_setup(self, protocol_helper, cephfs_nfs_cluster_id):
175 self._driver.configuration.cephfs_protocol_helper_type = (
176 protocol_helper)
177 self.fake_conf.set_default('cephfs_nfs_cluster_id',
178 cephfs_nfs_cluster_id)
179 self.mock_object(
180 self._driver, '_get_cephfs_filesystem_allocation',
181 mock.Mock(return_value=10)
182 )
184 self._driver.do_setup(self._context)
186 if protocol_helper == 'cephfs':
187 driver.NativeProtocolHelper.assert_called_once_with(
188 self._execute, self._driver.configuration,
189 rados_client=self._driver._rados_client,
190 volname=self._driver.volname)
191 else:
192 if self.fake_conf.cephfs_nfs_cluster_id is None:
193 driver.NFSProtocolHelper.assert_called_once_with(
194 self._execute, self._driver.configuration,
195 rados_client=self._driver._rados_client,
196 volname=self._driver.volname)
197 else:
198 driver.NFSClusterProtocolHelper.assert_called_once_with(
199 self._execute, self._driver.configuration,
200 rados_client=self._driver._rados_client,
201 volname=self._driver.volname)
203 self._driver.protocol_helper.init_helper.assert_called_once_with()
205 self.assertEqual(DEFAULT_VOLUME_MODE, self._driver._cephfs_volume_mode)
207 def test__get_sub_name(self):
208 sub_name = self._driver._get_subvolume_name(self._share["id"])
209 self.assertEqual(sub_name, self._share["id"])
211 def test__get_sub_name_has_other_name(self):
212 expected_sub_name = 'user_specified_subvolume_name'
213 self.mock_object(
214 self._driver.private_storage, 'get',
215 mock.Mock(return_value=expected_sub_name)
216 )
217 sub_name = self._driver._get_subvolume_name(self._share["id"])
218 self.assertEqual(expected_sub_name, sub_name)
220 def test__get_sub_snapshot_name(self):
221 sub_name = self._driver._get_subvolume_snapshot_name(
222 self._snapshot["id"]
223 )
224 self.assertEqual(sub_name, self._snapshot["id"])
226 def test__get_sub_snapshot_name_has_other_name(self):
227 expected_sub_snap_name = 'user_specified_subvolume_snapshot_name'
228 self.mock_object(
229 self._driver.private_storage, 'get',
230 mock.Mock(return_value=expected_sub_snap_name)
231 )
232 sub_name = self._driver._get_subvolume_snapshot_name(
233 self._snapshot["id"]
234 )
235 self.assertEqual(expected_sub_snap_name, sub_name)
237 @ddt.data(
238 ('{"version": "ceph version 16.2.4"}', 'pacific'),
239 ('{"version": "ceph version 15.1.2"}', 'octopus'),
240 ('{"version": "ceph version 14.3.1"}', 'nautilus'),
241 )
242 @ddt.unpack
243 def test_version_check(self, ceph_mon_version, codename):
244 driver.ceph_default_target = None
245 driver.rados_command.return_value = ceph_mon_version
247 self.mock_object(
248 self._driver, '_get_cephfs_filesystem_allocation',
249 mock.Mock(return_value=10)
250 )
252 self._driver.do_setup(self._context)
254 if codename == 'nautilus':
255 self.assertEqual(('mgr', ), driver.ceph_default_target)
256 else:
257 self.assertEqual(('mon-mgr', ), driver.ceph_default_target)
259 driver.rados_command.assert_called_once_with(
260 self._driver.rados_client, "version", target=('mon', ))
262 self.assertEqual(1, driver.rados_command.call_count)
264 def test_version_check_not_supported(self):
265 driver.ceph_default_target = None
266 driver.rados_command.return_value = (
267 '{"version": "ceph version 13.0.1"}')
269 self.assertRaises(exception.ShareBackendException,
270 self._driver.do_setup,
271 self._context)
273 @ddt.data('cephfs', 'nfs')
274 def test_check_for_setup_error(self, protocol_helper):
275 self._driver.configuration.cephfs_protocol_helper_type = (
276 protocol_helper)
278 self._driver.check_for_setup_error()
280 (self._driver.protocol_helper.check_for_setup_error.
281 assert_called_once_with())
283 def test_create_share(self):
284 create_share_prefix = "fs subvolume create"
285 get_path_prefix = "fs subvolume getpath"
287 create_share_dict = {
288 "vol_name": self._driver.volname,
289 "sub_name": self._share["id"],
290 "size": self._share["size"] * units.Gi,
291 "namespace_isolated": True,
292 "mode": DEFAULT_VOLUME_MODE,
293 }
295 get_path_dict = {
296 "vol_name": self._driver.volname,
297 "sub_name": self._share["id"],
298 }
300 self._driver.create_share(self._context, self._share)
302 driver.rados_command.assert_has_calls([
303 mock.call(self._driver.rados_client,
304 create_share_prefix,
305 create_share_dict),
306 mock.call(self._driver.rados_client,
307 get_path_prefix,
308 get_path_dict)])
310 self.assertEqual(2, driver.rados_command.call_count)
312 def test_create_share_error(self):
313 share = fake_share.fake_share(share_proto='NFS')
315 self.assertRaises(exception.ShareBackendException,
316 self._driver.create_share,
317 self._context,
318 share)
320 def _setup_manage_subvolume_test(self):
321 fake_els = [
322 {'path': 'fake/path'}
323 ]
324 share_with_el = fake_share.fake_share(export_locations=fake_els)
325 expected_subvolume_info_argdict = {
326 "vol_name": self._driver.volname,
327 "sub_name": fake_els[0]["path"],
328 }
329 subvolume_info_mock_result = {
330 'atime': '2024-07-23 16:50:03',
331 'bytes_pcent': '0.00',
332 'bytes_quota': 2147483648,
333 'bytes_used': 0,
334 'created_at': '2024-07-23 16:50:03',
335 'ctime': '2024-07-23 17:24:49',
336 'data_pool': 'cephfs.cephfs.data',
337 'features': ['snapshot-clone', 'snapshot-autoprotect'],
338 'gid': 0,
339 'mode': 755,
340 'mon_addrs': ['10.0.0.1:6342'],
341 'mtime': '2024-07-23 16:50:03',
342 'path': '/volumes/_nogroup/subbvol/475a-4972-9f6b-fe025a8d383f',
343 'pool_namespace': 'fsvolumes_cephfs',
344 'state': 'complete',
345 'type': 'subvolume',
346 'uid': 0
347 }
349 return (
350 share_with_el, expected_subvolume_info_argdict,
351 subvolume_info_mock_result
352 )
354 def test_manage_existing_no_subvolume_name(self):
355 self.assertRaises(
356 exception.ShareBackendException,
357 self._driver.manage_existing,
358 {
359 'id': 'fake_project_uuid_1',
360 'export_locations': [{'path': None}]
361 },
362 {}
363 )
365 def test_manage_existing_subvolume_not_found(self):
366 driver.rados_command.side_effect = exception.ShareBackendException(
367 msg="does not exist"
368 )
369 fake_els = [
370 {'path': 'fake/path'}
371 ]
372 share_with_el = fake_share.fake_share(export_locations=fake_els)
373 expected_info_argdict = {
374 "vol_name": self._driver.volname,
375 "sub_name": fake_els[0]["path"],
376 }
378 self.assertRaises(
379 exception.ShareBackendException,
380 self._driver.manage_existing,
381 share_with_el,
382 {}
383 )
385 driver.rados_command.assert_called_once_with(
386 self._driver.rados_client, "fs subvolume info",
387 expected_info_argdict,
388 json_obj=True
389 )
391 def test_manage_existing_subvolume_infinite_no_provided_size(self):
392 share_with_el, expected_info_argdict, subvolume_info = (
393 self._setup_manage_subvolume_test()
394 )
395 subvolume_info['bytes_quota'] = "infinite"
396 driver.rados_command.return_value = subvolume_info
398 self.assertRaises(
399 exception.ShareBackendException,
400 self._driver.manage_existing,
401 share_with_el,
402 {}
403 )
404 driver.rados_command.assert_called_once_with(
405 self._driver.rados_client, "fs subvolume info",
406 expected_info_argdict,
407 json_obj=True
408 )
410 @ddt.data(
411 exception.ShareShrinkingPossibleDataLoss,
412 exception.ShareBackendException
413 )
414 def test_manage_existing_subvolume_infinite_size(self, expected_exception):
415 share_with_el, expected_info_argdict, subvolume_info = (
416 self._setup_manage_subvolume_test()
417 )
418 subvolume_info['bytes_quota'] = "infinite"
419 driver.rados_command.return_value = subvolume_info
420 new_size = 1
422 mock_resize = self.mock_object(
423 self._driver, '_resize_share',
424 mock.Mock(side_effect=expected_exception('fake'))
425 )
427 self.assertRaises(
428 expected_exception,
429 self._driver.manage_existing,
430 share_with_el,
431 {'size': new_size}
432 )
434 driver.rados_command.assert_called_once_with(
435 self._driver.rados_client, "fs subvolume info",
436 expected_info_argdict,
437 json_obj=True
438 )
439 mock_resize.assert_called_once_with(
440 share_with_el, new_size, no_shrink=True
441 )
443 @ddt.data(True, False)
444 def test_manage_existing(self, current_size_is_smaller):
445 share_with_el, expected_info_argdict, subvolume_info = (
446 self._setup_manage_subvolume_test()
447 )
448 if current_size_is_smaller:
449 # set this to half gb, to ensure it will turn into 1gb
450 subvolume_info['bytes_quota'] = 536870912
451 subvolume_name = share_with_el["export_locations"][0]["path"]
452 expected_share_metadata = {"subvolume_name": subvolume_name}
453 expected_share_updates = {
454 "size": int(
455 math.ceil(int(subvolume_info['bytes_quota']) / units.Gi)),
456 "export_locations": subvolume_name
457 }
459 driver.rados_command.return_value = subvolume_info
460 self.mock_object(
461 self._driver, '_get_export_locations',
462 mock.Mock(return_value=subvolume_name))
463 mock_resize_share = self.mock_object(self._driver, '_resize_share')
465 share_updates = self._driver.manage_existing(share_with_el, {})
467 self.assertEqual(expected_share_updates, share_updates)
468 driver.rados_command.assert_called_once_with(
469 self._driver.rados_client, "fs subvolume info",
470 expected_info_argdict,
471 json_obj=True
472 )
473 self._driver.private_storage.update.assert_called_once_with(
474 share_with_el['id'], expected_share_metadata
475 )
476 self._driver._get_export_locations.assert_called_once_with(
477 share_with_el, subvolume_name=subvolume_name
478 )
479 if current_size_is_smaller:
480 mock_resize_share.assert_called_once_with(
481 share_with_el, 1, no_shrink=True
482 )
483 else:
484 mock_resize_share.assert_not_called()
486 def test_manage_existing_snapshot_no_snapshot_name(self):
487 self.assertRaises(
488 exception.ShareBackendException,
489 self._driver.manage_existing_snapshot,
490 {
491 'id': 'fake_project_uuid_1',
492 'provider_location': None,
493 },
494 {}
495 )
497 def test_manage_existing_snapshot_subvolume_not_found(self):
498 driver.rados_command.side_effect = exception.ShareBackendException(
499 msg="does not exist"
500 )
501 snapshot_instance = {
502 'id': 'fake_project_uuid_1',
503 'provider_location': 'fake/provider/location',
504 'share_instance_id': 'fake_share_instance_id'
505 }
506 expected_info_argdict = {
507 "vol_name": self._driver.volname,
508 "sub_name": snapshot_instance["share_instance_id"]
509 }
511 self.assertRaises(
512 exception.ShareBackendException,
513 self._driver.manage_existing_snapshot,
514 snapshot_instance,
515 {}
516 )
518 driver.rados_command.assert_called_once_with(
519 self._driver.rados_client, "fs subvolume info",
520 expected_info_argdict,
521 json_obj=True
522 )
524 def test_manage_existing_snapshot_snapshot_not_found(self):
525 _, expected_info_argdict, subvolume_info = (
526 self._setup_manage_subvolume_test()
527 )
528 expected_snapshot_name = 'fake/provider/location'
529 snapshot_instance = {
530 'id': 'fake_project_uuid_1',
531 'provider_location': expected_snapshot_name,
532 'share_instance_id': 'fake_share_instance_id'
533 }
534 expected_info_argdict = {
535 "vol_name": self._driver.volname,
536 "sub_name": snapshot_instance["share_instance_id"]
537 }
538 expected_snap_info_argdict = {
539 "vol_name": self._driver.volname,
540 "sub_name": snapshot_instance["share_instance_id"],
541 "snap_name": expected_snapshot_name
542 }
543 driver.rados_command.side_effect = [
544 subvolume_info,
545 exception.ShareBackendException(msg="does not exist")
546 ]
548 self.assertRaises(
549 exception.ShareBackendException,
550 self._driver.manage_existing_snapshot,
551 snapshot_instance,
552 {}
553 )
554 driver.rados_command.assert_has_calls([
555 mock.call(
556 self._driver.rados_client, "fs subvolume info",
557 expected_info_argdict, json_obj=True
558 ),
559 mock.call(
560 self._driver.rados_client, "fs subvolume snapshot info",
561 expected_snap_info_argdict,
562 json_obj=True
563 )
564 ])
566 def test_manage_existing_snapshot(self):
567 _, expected_info_argdict, subvolume_info = (
568 self._setup_manage_subvolume_test()
569 )
570 expected_snapshot_name = 'fake_snapshot_name'
571 snapshot_instance = {
572 'id': 'fake_project_uuid_1',
573 'provider_location': expected_snapshot_name,
574 'share_instance_id': 'fake_share_instance_id',
575 'snapshot_id': 'fake_snapshot_id'
576 }
577 expected_info_argdict = {
578 "vol_name": self._driver.volname,
579 "sub_name": snapshot_instance["share_instance_id"]
580 }
581 expected_snap_info_argdict = {
582 "vol_name": self._driver.volname,
583 "sub_name": snapshot_instance["share_instance_id"],
584 "snap_name": expected_snapshot_name
585 }
586 driver.rados_command.side_effect = [
587 subvolume_info,
588 {'name': expected_snapshot_name}
589 ]
590 expected_result = {
591 'provider_location': expected_snapshot_name
592 }
594 result = self._driver.manage_existing_snapshot(
595 snapshot_instance,
596 {}
597 )
599 self.assertEqual(expected_result, result)
601 driver.rados_command.assert_has_calls([
602 mock.call(
603 self._driver.rados_client, "fs subvolume info",
604 expected_info_argdict, json_obj=True
605 ),
606 mock.call(
607 self._driver.rados_client, "fs subvolume snapshot info",
608 expected_snap_info_argdict, json_obj=True
609 )
610 ])
611 self.fake_private_storage.update.assert_called_once_with(
612 snapshot_instance['snapshot_id'],
613 {"subvolume_snapshot_name": expected_snapshot_name}
614 )
616 def test_update_access(self):
617 alice = {
618 'id': 'instance_mapping_id1',
619 'access_id': 'accessid1',
620 'access_level': 'rw',
621 'access_type': 'cephx',
622 'access_to': 'alice'
623 }
624 add_rules = access_rules = [alice, ]
625 delete_rules = []
626 update_rules = []
628 self._driver.update_access(
629 self._context, self._share, access_rules, add_rules, delete_rules,
630 update_rules, None)
632 self._driver.protocol_helper.update_access.assert_called_once_with(
633 self._context, self._share, access_rules, add_rules, delete_rules,
634 update_rules, share_server=None, sub_name=self._share['id'])
636 def test_ensure_shares(self):
637 self._driver.protocol_helper.reapply_rules_while_ensuring_shares = True
638 shares = [
639 fake_share.fake_share(share_id='123', share_proto='NFS'),
640 fake_share.fake_share(share_id='456', share_proto='NFS'),
641 fake_share.fake_share(share_id='789', share_proto='NFS')
642 ]
643 export_locations = [
644 {
645 'path': '1.2.3.4,5.6.7.8:/foo/bar',
646 'is_admin_only': False,
647 'metadata': {},
648 },
649 {
650 'path': '1.2.3.4,5.6.7.8:/foo/quz',
651 'is_admin_only': False,
652 'metadata': {},
653 },
655 ]
656 share_backend_info = {'metadata': {'__mount_options': 'fs=cephfs'}}
657 metadata = share_backend_info.get('metadata')
658 expected_updates = {
659 shares[0]['id']: {
660 'status': constants.STATUS_ERROR,
661 'reapply_access_rules': True,
662 'metadata': metadata,
663 },
664 shares[1]['id']: {
665 'export_locations': export_locations[0],
666 'reapply_access_rules': True,
667 'metadata': metadata,
668 },
669 shares[2]['id']: {
670 'export_locations': export_locations[1],
671 'reapply_access_rules': True,
672 'metadata': metadata,
673 }
674 }
675 err_message = (f"Error ENOENT: subvolume {self._share['id']} does "
676 f"not exist")
677 expected_exception = exception.ShareBackendException(err_message)
679 self.mock_object(
680 self._driver, '_get_export_locations',
681 mock.Mock(side_effect=[expected_exception] + export_locations))
682 self.mock_object(
683 self._driver, 'get_optional_share_creation_data',
684 mock.Mock(return_value=share_backend_info))
686 actual_updates = self._driver.ensure_shares(self._context, shares)
688 self.assertEqual(3, self._driver._get_export_locations.call_count)
689 self._driver._get_export_locations.assert_has_calls([
690 mock.call(shares[0]), mock.call(shares[1]), mock.call(shares[2])])
691 self.assertTrue(self._driver.get_optional_share_creation_data.called)
692 self.assertEqual(expected_updates, actual_updates)
694 def test_delete_share(self):
695 clone_status_prefix = "fs clone status"
697 clone_status_dict = {
698 "vol_name": self._driver.volname,
699 "clone_name": self._share["id"],
700 }
702 delete_share_prefix = "fs subvolume rm"
704 delete_share_dict = {
705 "vol_name": self._driver.volname,
706 "sub_name": self._share["id"],
707 "force": True,
708 }
710 driver.rados_command.side_effect = [driver.rados.Error, mock.Mock()]
712 self._driver.delete_share(self._context, self._share)
714 driver.rados_command.assert_has_calls([
715 mock.call(self._driver.rados_client,
716 clone_status_prefix,
717 clone_status_dict),
718 mock.call(self._driver.rados_client,
719 delete_share_prefix,
720 delete_share_dict)])
722 self.assertEqual(2, driver.rados_command.call_count)
724 def test_extend_share(self):
725 extend_share_prefix = "fs subvolume resize"
727 new_size_gb = self._share['size'] * 2
728 new_size = new_size_gb * units.Gi
730 extend_share_dict = {
731 "vol_name": self._driver.volname,
732 "sub_name": self._share["id"],
733 "new_size": new_size,
734 }
736 self._driver.extend_share(self._share, new_size_gb, None)
738 driver.rados_command.assert_called_once_with(
739 self._driver.rados_client, extend_share_prefix, extend_share_dict)
741 def test_shrink_share(self):
742 shrink_share_prefix = "fs subvolume resize"
744 new_size_gb = self._share['size'] * 0.5
745 new_size = new_size_gb * units.Gi
747 shrink_share_dict = {
748 "vol_name": self._driver.volname,
749 "sub_name": self._share["id"],
750 "new_size": new_size,
751 "no_shrink": True,
752 }
754 self._driver.shrink_share(self._share, new_size_gb, None)
756 driver.rados_command.assert_called_once_with(
757 self._driver.rados_client, shrink_share_prefix, shrink_share_dict)
759 def test_shrink_share_full(self):
760 """That shrink fails when share is too full."""
761 shrink_share_prefix = "fs subvolume resize"
763 new_size_gb = self._share['size'] * 0.5
764 new_size = new_size_gb * units.Gi
766 msg = ("Can't resize the subvolume. "
767 "The new size '{0}' would be lesser "
768 "than the current used size '{1}'".format(
769 new_size, self._share['size']))
770 driver.rados_command.side_effect = exception.ShareBackendException(msg)
772 shrink_share_dict = {
773 "vol_name": self._driver.volname,
774 "sub_name": self._share["id"],
775 "new_size": new_size,
776 "no_shrink": True,
777 }
779 # Pretend to be full up
780 self.assertRaises(exception.ShareShrinkingPossibleDataLoss,
781 self._driver.shrink_share,
782 self._share, new_size_gb, None)
784 driver.rados_command.assert_called_once_with(
785 self._driver.rados_client, shrink_share_prefix, shrink_share_dict)
787 def test_create_snapshot(self):
788 snapshot_create_prefix = "fs subvolume snapshot create"
790 snapshot_create_dict = {
791 "vol_name": self._driver.volname,
792 "sub_name": self._snapshot["share_id"],
793 "snap_name": self._snapshot["snapshot_id"]
794 }
796 self._driver.create_snapshot(self._context, self._snapshot, None)
798 driver.rados_command.assert_called_once_with(
799 self._driver.rados_client,
800 snapshot_create_prefix, snapshot_create_dict)
802 def test_delete_snapshot(self):
803 legacy_snap_name = "_".join(
804 [self._snapshot["snapshot_id"], self._snapshot["id"]])
806 snapshot_remove_prefix = "fs subvolume snapshot rm"
808 snapshot_remove_dict = {
809 "vol_name": self._driver.volname,
810 "sub_name": self._snapshot["share_id"],
811 "snap_name": legacy_snap_name,
812 "force": True
813 }
815 snapshot_remove_dict_2 = snapshot_remove_dict.copy()
816 snapshot_remove_dict_2.update(
817 {"snap_name": self._snapshot["snapshot_id"]})
819 self.mock_object(
820 self._driver,
821 '_get_subvolume_snapshot_name',
822 mock.Mock(return_value=self._snapshot["snapshot_id"]))
824 self._driver.delete_snapshot(self._context,
825 self._snapshot,
826 None)
828 driver.rados_command.assert_has_calls([
829 mock.call(self._driver.rados_client,
830 snapshot_remove_prefix,
831 snapshot_remove_dict),
832 mock.call(self._driver.rados_client,
833 snapshot_remove_prefix,
834 snapshot_remove_dict_2)])
836 self.assertEqual(2, driver.rados_command.call_count)
838 def test_create_share_group(self):
839 group_create_prefix = "fs subvolumegroup create"
841 group_create_dict = {
842 "vol_name": self._driver.volname,
843 "group_name": "grp1",
844 "mode": DEFAULT_VOLUME_MODE,
845 }
847 self._driver.create_share_group(self._context, {"id": "grp1"}, None)
849 driver.rados_command.assert_called_once_with(
850 self._driver.rados_client,
851 group_create_prefix, group_create_dict)
853 def test_delete_share_group(self):
854 group_delete_prefix = "fs subvolumegroup rm"
856 group_delete_dict = {
857 "vol_name": self._driver.volname,
858 "group_name": "grp1",
859 "force": True,
860 }
862 self._driver.delete_share_group(self._context, {"id": "grp1"}, None)
864 driver.rados_command.assert_called_once_with(
865 self._driver.rados_client,
866 group_delete_prefix, group_delete_dict)
868 def test_create_share_group_snapshot(self):
869 msg = ("Share group snapshot feature is no longer supported in "
870 "mainline CephFS (existing group snapshots can still be "
871 "listed and deleted).")
872 driver.rados_command.side_effect = exception.ShareBackendException(msg)
874 self.assertRaises(exception.ShareBackendException,
875 self._driver.create_share_group_snapshot,
876 self._context, {'share_group_id': 'sgid',
877 'id': 'snapid'})
879 def test_delete_share_group_snapshot(self):
880 group_snapshot_delete_prefix = "fs subvolumegroup snapshot rm"
882 group_snapshot_delete_dict = {
883 "vol_name": self._driver.volname,
884 "group_name": "sgid",
885 "snap_name": "snapid",
886 "force": True,
887 }
889 self._driver.delete_share_group_snapshot(self._context, {
890 'share_group_id': 'sgid',
891 'id': 'snapid',
892 "force": True,
893 })
895 driver.rados_command.assert_called_once_with(
896 self._driver.rados_client,
897 group_snapshot_delete_prefix, group_snapshot_delete_dict)
899 def test_create_share_from_snapshot(self):
900 parent_share = {
901 'id': 'fakeparentshareid',
902 'name': 'fakeparentshare',
903 }
905 create_share_from_snapshot_prefix = "fs subvolume snapshot clone"
907 create_share_from_snapshot_dict = {
908 "vol_name": self._driver.volname,
909 "sub_name": parent_share["id"],
910 "snap_name": self._snapshot["snapshot_id"],
911 "target_sub_name": self._share["id"]
912 }
914 get_clone_status_prefix = "fs clone status"
915 get_clone_status_dict = {
916 "vol_name": self._driver.volname,
917 "clone_name": self._share["id"],
918 }
919 driver.rados_command.return_value = {
920 'status': {
921 'state': 'in-progress',
922 },
923 }
925 self._driver.create_share_from_snapshot(
926 self._context, self._share, self._snapshot, None,
927 parent_share=parent_share
928 )
930 driver.rados_command.assert_has_calls([
931 mock.call(self._driver.rados_client,
932 create_share_from_snapshot_prefix,
933 create_share_from_snapshot_dict),
934 mock.call(self._driver.rados_client,
935 get_clone_status_prefix,
936 get_clone_status_dict,
937 True)])
939 self.assertEqual(2, driver.rados_command.call_count)
941 def test_delete_share_from_snapshot(self):
942 clone_status_prefix = "fs clone status"
944 clone_status_dict = {
945 "vol_name": self._driver.volname,
946 "clone_name": self._share["id"],
947 }
949 clone_cancel_prefix = "fs clone cancel"
951 clone_cancel_dict = {
952 "vol_name": self._driver.volname,
953 "clone_name": self._share["id"],
954 "force": True,
955 }
957 delete_share_prefix = "fs subvolume rm"
959 delete_share_dict = {
960 "vol_name": self._driver.volname,
961 "sub_name": self._share["id"],
962 "force": True,
963 }
965 driver.rados_command.side_effect = [
966 'in-progress', mock.Mock(), mock.Mock()]
968 self._driver.delete_share(self._context, self._share)
970 driver.rados_command.assert_has_calls([
971 mock.call(self._driver.rados_client,
972 clone_status_prefix,
973 clone_status_dict),
974 mock.call(self._driver.rados_client,
975 clone_cancel_prefix,
976 clone_cancel_dict),
977 mock.call(self._driver.rados_client,
978 delete_share_prefix,
979 delete_share_dict)])
981 self.assertEqual(3, driver.rados_command.call_count)
983 def test_delete_driver(self):
984 # Create share to prompt volume_client construction
985 self._driver.create_share(self._context,
986 self._share)
988 rc = self._driver._rados_client
989 del self._driver
991 rc.shutdown.assert_called_once_with()
993 def test_delete_driver_no_client(self):
994 self.assertIsNone(self._driver._rados_client)
995 del self._driver
997 @ddt.data(
998 [21474836480, 293878, 97848372],
999 [21474836480, "infinite", 97848372],
1000 ["infinite", "infinite", "infinite"],
1001 )
1002 def test__get_cephfs_filesystem_allocation(self, share_sizes):
1003 subvolume_ls_args = {"vol_name": self._driver.volname}
1004 rados_returns = []
1005 rados_subvolume_list_result = []
1006 subvolume_info_mock_calls = []
1007 subvolume_names = []
1008 expected_allocated_size_gb = 0
1010 for idx, size in enumerate(share_sizes):
1011 subvolume_name = f"subvolume{idx}"
1012 subvolume_names.append(subvolume_name)
1013 rados_returns.append({"bytes_quota": share_sizes[idx]})
1014 rados_subvolume_list_result.append({"name": subvolume_name})
1015 if size != "infinite":
1016 expected_allocated_size_gb += size
1018 if expected_allocated_size_gb > 0:
1019 expected_allocated_size_gb = (
1020 round(int(expected_allocated_size_gb) / units.Gi, 2)
1021 )
1023 # first call we make to rados is the subvolume ls
1024 rados_returns.insert(0, rados_subvolume_list_result)
1025 driver.rados_command.side_effect = rados_returns
1027 allocated_size_gb = self._driver._get_cephfs_filesystem_allocation()
1029 self.assertEqual(allocated_size_gb, expected_allocated_size_gb)
1030 for name in subvolume_names:
1031 subvolume_info_arg_dict = {
1032 "vol_name": self._driver.volname,
1033 "sub_name": name
1034 }
1035 subvolume_info_mock_calls.append(
1036 mock.call(
1037 self._driver._rados_client,
1038 "fs subvolume info",
1039 subvolume_info_arg_dict, json_obj=True
1040 )
1041 )
1042 driver.rados_command.assert_has_calls([
1043 mock.call(
1044 self._driver._rados_client,
1045 "fs subvolume ls", subvolume_ls_args, json_obj=True),
1046 *subvolume_info_mock_calls
1047 ])
1049 @ddt.data(True, False)
1050 def test_update_share_stats(self, cache_expired):
1051 allocated_capacity_gb = 20.0
1052 self._driver.get_configured_ip_versions = mock.Mock(return_value=[4])
1053 self._driver.configuration.local_conf.set_override(
1054 'reserved_share_percentage', 5)
1055 self._driver.configuration.local_conf.set_override(
1056 'reserved_share_from_snapshot_percentage', 2)
1057 self._driver.configuration.local_conf.set_override(
1058 'reserved_share_extend_percentage', 2)
1059 self._driver._cached_allocated_capacity_gb.is_expired = mock.Mock(
1060 return_value=cache_expired
1061 )
1062 self.mock_object(
1063 self._driver, '_get_cephfs_filesystem_allocation',
1064 mock.Mock(return_value=20.0)
1065 )
1066 self.mock_object(
1067 self._driver, '_get_cephfs_filesystem_allocation',
1068 mock.Mock(return_value=allocated_capacity_gb)
1069 )
1071 self._driver._update_share_stats()
1072 result = self._driver._stats
1074 self.assertEqual(5, result['pools'][0]['reserved_percentage'])
1075 self.assertEqual(2, result['pools'][0]['reserved_snapshot_percentage'])
1076 self.assertEqual(
1077 2, result['pools'][0]['reserved_share_extend_percentage'])
1078 self.assertEqual(164.94, result['pools'][0]['total_capacity_gb'])
1079 self.assertEqual(149.84, result['pools'][0]['free_capacity_gb'])
1080 self.assertEqual(20.0, result['pools'][0]['allocated_capacity_gb'])
1081 self.assertTrue(result['ipv4_support'])
1082 self.assertFalse(result['ipv6_support'])
1083 self.assertEqual("CEPHFS", result['storage_protocol'])
1084 if cache_expired:
1085 self._driver._get_cephfs_filesystem_allocation.assert_called_once()
1086 (self._driver._cached_allocated_capacity_gb
1087 .update_data.assert_called_once_with(allocated_capacity_gb))
1088 else:
1089 (self._driver._cached_allocated_capacity_gb
1090 .get_data.assert_called_once())
1092 @ddt.data('cephfs', 'nfs')
1093 def test_get_configured_ip_versions(self, protocol_helper):
1094 self._driver.configuration.cephfs_protocol_helper_type = (
1095 protocol_helper)
1097 self._driver.get_configured_ip_versions()
1099 (self._driver.protocol_helper.get_configured_ip_versions.
1100 assert_called_once_with())
1102 @ddt.data(
1103 ([{'id': 'instance_mapping_id1', 'access_id': 'accessid1',
1104 'access_level': 'rw', 'access_type': 'cephx', 'access_to': 'alice'
1105 }], 'fake_project_uuid_1'),
1106 ([{'id': 'instance_mapping_id1', 'access_id': 'accessid1',
1107 'access_level': 'rw', 'access_type': 'cephx', 'access_to': 'alice'
1108 }], 'fake_project_uuid_2'),
1109 ([], 'fake_project_uuid_1'),
1110 ([], 'fake_project_uuid_2'),
1111 )
1112 @ddt.unpack
1113 def test_transfer_accept(self, access_rules, new_project):
1114 fake_share_1 = {"project_id": "fake_project_uuid_1"}
1115 same_project = new_project == 'fake_project_uuid_1'
1116 if access_rules and not same_project:
1117 self.assertRaises(exception.DriverCannotTransferShareWithRules,
1118 self._driver.transfer_accept,
1119 self._context, fake_share_1,
1120 'new_user', new_project, access_rules)
1122 def test_get_share_status_returns_none_for_unexpected_status(self):
1123 """Test get_share_status returns None for non-creating status."""
1124 share = fake_share.fake_share(status=constants.STATUS_AVAILABLE)
1126 result = self._driver.get_share_status(share)
1128 self.assertIsNone(result)
1130 def test__need_to_cancel_clone_returns_false_for_regular_subvolume(self):
1131 """Test _need_to_cancel_clone handles non-clone subvolumes."""
1132 driver.rados_command.side_effect = (
1133 exception.ShareBackendException(msg="not allowed on subvolume"))
1135 result = self._driver._need_to_cancel_clone(
1136 self._share, self._share['id'])
1138 self.assertFalse(result)
1141@ddt.ddt
1142class NativeProtocolHelperTestCase(test.TestCase):
1144 def setUp(self):
1145 super(NativeProtocolHelperTestCase, self).setUp()
1146 self.fake_conf = configuration.Configuration(None)
1147 self._context = context.get_admin_context()
1148 self._share = fake_share.fake_share_instance(share_proto='CEPHFS')
1150 self.fake_conf.set_default('driver_handles_share_servers', False)
1152 self.mock_object(driver, "rados_command")
1154 driver.ceph_default_target = ('mon-mgr', )
1156 self._native_protocol_helper = driver.NativeProtocolHelper(
1157 None,
1158 self.fake_conf,
1159 rados_client=MockRadosModule.Rados(),
1160 volname="cephfs"
1161 )
1163 self._rados_client = self._native_protocol_helper.rados_client
1165 self._native_protocol_helper.get_mon_addrs = mock.Mock(
1166 return_value=['1.2.3.4', '5.6.7.8'])
1168 def test_check_for_setup_error(self):
1169 expected = None
1171 result = self._native_protocol_helper.check_for_setup_error()
1173 self.assertEqual(expected, result)
1175 def test_get_export_locations(self):
1176 fake_cephfs_subvolume_path = '/foo/bar'
1177 expected_export_locations = {
1178 'path': '1.2.3.4,5.6.7.8:/foo/bar',
1179 'is_admin_only': False,
1180 'metadata': {},
1181 }
1183 export_locations = self._native_protocol_helper.get_export_locations(
1184 self._share, fake_cephfs_subvolume_path)
1186 self.assertEqual(expected_export_locations, export_locations)
1187 self._native_protocol_helper.get_mon_addrs.assert_called_once_with()
1189 @ddt.data(constants.ACCESS_LEVEL_RW, constants.ACCESS_LEVEL_RO)
1190 def test_allow_access_rw_ro(self, mode):
1191 access_allow_prefix = "fs subvolume authorize"
1192 access_allow_mode = "r" if mode == "ro" else "rw"
1194 access_allow_dict = {
1195 "vol_name": self._native_protocol_helper.volname,
1196 "sub_name": self._share["id"],
1197 "auth_id": "alice",
1198 "tenant_id": self._share["project_id"],
1199 "access_level": access_allow_mode,
1200 }
1202 rule = {
1203 'access_level': mode,
1204 'access_to': 'alice',
1205 'access_type': 'cephx',
1206 }
1208 driver.rados_command.return_value = 'native-zorilla'
1210 auth_key = self._native_protocol_helper._allow_access(
1211 self._context, self._share, rule, sub_name=self._share['id'])
1213 self.assertEqual("native-zorilla", auth_key)
1215 driver.rados_command.assert_called_once_with(
1216 self._rados_client,
1217 access_allow_prefix, access_allow_dict)
1219 def test_allow_access_wrong_type(self):
1220 self.assertRaises(
1221 exception.InvalidShareAccessType,
1222 self._native_protocol_helper._allow_access,
1223 self._context,
1224 self._share,
1225 {
1226 'access_level': constants.ACCESS_LEVEL_RW,
1227 'access_type': 'RHUBARB',
1228 'access_to': 'alice'
1229 },
1230 self._share['id']
1231 )
1233 def test_allow_access_same_cephx_id_as_manila_service(self):
1234 self.assertRaises(
1235 exception.InvalidShareAccess,
1236 self._native_protocol_helper._allow_access,
1237 self._context,
1238 self._share,
1239 {
1240 'access_level': constants.ACCESS_LEVEL_RW,
1241 'access_type': 'cephx',
1242 'access_to': 'manila',
1243 },
1244 self._share['id']
1245 )
1247 def test_allow_access_to_preexisting_ceph_user(self):
1248 msg = ("auth ID: admin exists and not created by "
1249 "ceph manager plugin. Not allowed to modify")
1250 driver.rados_command.side_effect = exception.ShareBackendException(msg)
1252 self.assertRaises(exception.InvalidShareAccess,
1253 self._native_protocol_helper._allow_access,
1254 self._context, self._share,
1255 {
1256 'access_level': constants.ACCESS_LEVEL_RW,
1257 'access_type': 'cephx',
1258 'access_to': 'admin'
1259 },
1260 self._share['id']
1261 )
1263 def test_deny_access(self):
1264 access_deny_prefix = "fs subvolume deauthorize"
1266 access_deny_dict = {
1267 "vol_name": self._native_protocol_helper.volname,
1268 "sub_name": self._share["id"],
1269 "auth_id": "alice",
1270 }
1272 evict_prefix = "fs subvolume evict"
1274 evict_dict = access_deny_dict
1276 self._native_protocol_helper._deny_access(
1277 self._context,
1278 self._share,
1279 {
1280 'access_level': 'rw',
1281 'access_type': 'cephx',
1282 'access_to': 'alice'
1283 },
1284 sub_name=self._share['id']
1285 )
1287 driver.rados_command.assert_has_calls([
1288 mock.call(self._native_protocol_helper.rados_client,
1289 access_deny_prefix,
1290 access_deny_dict),
1291 mock.call(self._native_protocol_helper.rados_client,
1292 evict_prefix,
1293 evict_dict)])
1295 self.assertEqual(2, driver.rados_command.call_count)
1297 def test_deny_access_missing_access_rule(self):
1298 access_deny_prefix = "fs subvolume deauthorize"
1300 exception_msg = (
1301 f"json_command failed - prefix=fs subvolume deauthorize, "
1302 f"argdict='vol_name': {self._native_protocol_helper.volname}, "
1303 f"'sub_name': '{self._share['id']}', 'auth_id': 'alice', "
1304 f"'format': 'json' - exception message: [errno -2] "
1305 f"auth ID: alice doesn't exist.")
1307 driver.rados_command.side_effect = exception.ShareBackendException(
1308 msg=exception_msg)
1310 access_deny_dict = {
1311 "vol_name": self._native_protocol_helper.volname,
1312 "sub_name": self._share["id"],
1313 "auth_id": "alice",
1314 }
1316 self._native_protocol_helper._deny_access(
1317 self._context,
1318 self._share,
1319 {
1320 'access_level': 'rw',
1321 'access_type': 'cephx',
1322 'access_to': 'alice'
1323 },
1324 sub_name=self._share['id']
1325 )
1327 driver.rados_command.assert_called_once_with(
1328 self._native_protocol_helper.rados_client,
1329 access_deny_prefix, access_deny_dict)
1331 self.assertEqual(1, driver.rados_command.call_count)
1333 def test_update_access_add_rm(self):
1334 alice = {
1335 'id': 'instance_mapping_id1',
1336 'access_id': 'accessid1',
1337 'access_level': 'rw',
1338 'access_type': 'cephx',
1339 'access_to': 'alice'
1340 }
1341 bob = {
1342 'id': 'instance_mapping_id2',
1343 'access_id': 'accessid2',
1344 'access_level': 'ro',
1345 'access_type': 'cephx',
1346 'access_to': 'bob'
1347 }
1348 manila = {
1349 'id': 'instance_mapping_id3',
1350 'access_id': 'accessid3',
1351 'access_level': 'ro',
1352 'access_type': 'cephx',
1353 'access_to': 'manila'
1354 }
1355 admin = {
1356 'id': 'instance_mapping_id4',
1357 'access_id': 'accessid4',
1358 'access_level': 'rw',
1359 'access_type': 'cephx',
1360 'access_to': 'admin'
1361 }
1362 dabo = {
1363 'id': 'instance_mapping_id5',
1364 'access_id': 'accessid5',
1365 'access_level': 'rwx',
1366 'access_type': 'cephx',
1367 'access_to': 'dabo'
1368 }
1370 allow_access_side_effects = [
1371 'abc123',
1372 exception.InvalidShareAccess(reason='not'),
1373 exception.InvalidShareAccess(reason='allowed'),
1374 exception.InvalidShareAccessLevel(level='rwx')
1375 ]
1376 self.mock_object(self._native_protocol_helper.message_api, 'create')
1377 self.mock_object(self._native_protocol_helper, '_deny_access')
1378 self.mock_object(self._native_protocol_helper,
1379 '_allow_access',
1380 mock.Mock(side_effect=allow_access_side_effects))
1382 access_updates = self._native_protocol_helper.update_access(
1383 self._context,
1384 self._share,
1385 access_rules=[alice, manila, admin, dabo],
1386 add_rules=[alice, manila, admin, dabo],
1387 delete_rules=[bob],
1388 update_rules=[],
1389 sub_name=self._share['id']
1390 )
1392 expected_access_updates = {
1393 'accessid1': {'access_key': 'abc123'},
1394 'accessid3': {'state': 'error'},
1395 'accessid4': {'state': 'error'},
1396 'accessid5': {'state': 'error'}
1397 }
1398 self.assertEqual(expected_access_updates, access_updates)
1399 self._native_protocol_helper._allow_access.assert_has_calls(
1400 [mock.call(self._context, self._share, alice,
1401 sub_name=self._share['id']),
1402 mock.call(self._context, self._share, manila,
1403 sub_name=self._share['id']),
1404 mock.call(self._context, self._share, admin,
1405 sub_name=self._share['id'])])
1406 self._native_protocol_helper._deny_access.assert_called_once_with(
1407 self._context, self._share, bob, sub_name=self._share['id'])
1408 self.assertEqual(
1409 3, self._native_protocol_helper.message_api.create.call_count)
1411 def test_update_access_all(self):
1412 get_authorized_ids_prefix = "fs subvolume authorized_list"
1414 get_authorized_ids_dict = {
1415 "vol_name": self._native_protocol_helper.volname,
1416 "sub_name": self._share["id"]
1417 }
1419 access_allow_prefix = "fs subvolume authorize"
1421 access_allow_dict = {
1422 "vol_name": self._native_protocol_helper.volname,
1423 "sub_name": self._share["id"],
1424 "auth_id": "alice",
1425 "tenant_id": self._share["project_id"],
1426 "access_level": "rw",
1427 }
1429 access_deny_prefix = "fs subvolume deauthorize"
1431 access_deny_john_dict = {
1432 "vol_name": self._native_protocol_helper.volname,
1433 "sub_name": self._share["id"],
1434 "auth_id": "john",
1435 }
1437 access_deny_paul_dict = {
1438 "vol_name": self._native_protocol_helper.volname,
1439 "sub_name": self._share["id"],
1440 "auth_id": "paul",
1441 }
1443 evict_prefix = "fs subvolume evict"
1445 alice = {
1446 'id': 'instance_mapping_id1',
1447 'access_id': 'accessid1',
1448 'access_level': 'rw',
1449 'access_type': 'cephx',
1450 'access_to': 'alice',
1451 }
1453 driver.rados_command.side_effect = [
1454 [{"john": "rw"}, {"paul": "r"}],
1455 'abc123',
1456 mock.Mock(), mock.Mock(),
1457 mock.Mock(), mock.Mock()]
1459 access_updates = self._native_protocol_helper.update_access(
1460 self._context, self._share, access_rules=[alice], add_rules=[],
1461 delete_rules=[], update_rules=[], sub_name=self._share['id'])
1463 self.assertEqual(
1464 {'accessid1': {'access_key': 'abc123'}}, access_updates)
1466 driver.rados_command.assert_has_calls([
1467 mock.call(self._native_protocol_helper.rados_client,
1468 get_authorized_ids_prefix,
1469 get_authorized_ids_dict,
1470 json_obj=True),
1471 mock.call(self._native_protocol_helper.rados_client,
1472 access_allow_prefix,
1473 access_allow_dict),
1474 mock.call(self._native_protocol_helper.rados_client,
1475 access_deny_prefix,
1476 access_deny_john_dict),
1477 mock.call(self._native_protocol_helper.rados_client,
1478 evict_prefix,
1479 access_deny_john_dict),
1480 mock.call(self._native_protocol_helper.rados_client,
1481 access_deny_prefix,
1482 access_deny_paul_dict),
1483 mock.call(self._native_protocol_helper.rados_client,
1484 evict_prefix,
1485 access_deny_paul_dict)], any_order=True)
1487 self.assertEqual(6, driver.rados_command.call_count)
1489 def test_get_configured_ip_versions(self):
1490 expected = [4]
1492 result = self._native_protocol_helper.get_configured_ip_versions()
1494 self.assertEqual(expected, result)
1497@ddt.ddt
1498class NFSProtocolHelperTestCase(test.TestCase):
1500 def setUp(self):
1501 super(NFSProtocolHelperTestCase, self).setUp()
1502 self._execute = mock.Mock()
1503 self._share = fake_share.fake_share(share_proto='NFS')
1504 self._rados_client = MockRadosModule.Rados()
1505 self._volname = "cephfs"
1506 self.fake_conf = configuration.Configuration(None)
1508 self.fake_conf.set_default('cephfs_ganesha_server_ip',
1509 'fakeip')
1510 self.mock_object(driver.ganesha_utils, 'SSHExecutor')
1511 self.mock_object(driver.ganesha_utils, 'RootExecutor')
1512 self.mock_object(driver.socket, 'gethostname')
1513 self.mock_object(driver, "rados_command")
1515 driver.ceph_default_target = ('mon-mgr', )
1517 self._nfs_helper = driver.NFSProtocolHelper(
1518 self._execute,
1519 self.fake_conf,
1520 rados_client=self._rados_client,
1521 volname=self._volname)
1523 @ddt.data(
1524 (['fakehost', 'some.host.name', 'some.host.name.', '1.1.1.0'], False),
1525 (['fakehost', 'some.host.name', 'some.host.name.', '1.1..1.0'], True),
1526 (['fakehost', 'some.host.name', 'some.host.name', '1.1.1.256'], True),
1527 (['fakehost..', 'some.host.name', 'some.host.name', '1.1.1.0'], True),
1528 (['fakehost', 'some.host.name..', 'some.host.name', '1.1.1.0'], True),
1529 (['fakehost', 'some.host.name', 'some.host.name.', '1.1..1.0'], True),
1530 (['fakehost', 'some.host.name', '1.1.1.0/24'], True),
1531 (['fakehost', 'some.host.name', '1.1.1.0', '1001::1001'], False),
1532 (['fakehost', 'some.host.name', '1.1.1.0', '1001:1001'], True),
1533 (['fakehost', 'some.host.name', '1.1.1.0', '1001::1001:'], True),
1534 (['fakehost', 'some.host.name', '1.1.1.0', '1001::1001.'], True),
1535 (['fakehost', 'some.host.name', '1.1.1.0', '1001::1001/129.'], True),
1536 )
1537 @ddt.unpack
1538 def test_check_for_setup_error(self, cephfs_ganesha_export_ips, raises):
1539 fake_conf = configuration.Configuration(None)
1540 fake_conf.set_default('cephfs_ganesha_export_ips',
1541 cephfs_ganesha_export_ips)
1543 helper = driver.NFSProtocolHelper(
1544 self._execute,
1545 fake_conf,
1546 rados_client=MockRadosModule.Rados(),
1547 volname="cephfs"
1548 )
1550 if raises:
1551 self.assertRaises(exception.InvalidParameterValue,
1552 helper.check_for_setup_error)
1553 else:
1554 self.assertIsNone(helper.check_for_setup_error())
1556 @ddt.data(False, True)
1557 def test_init_executor_type(self, ganesha_server_is_remote):
1558 fake_conf = configuration.Configuration(None)
1559 conf_args_list = [
1560 ('cephfs_ganesha_server_is_remote', ganesha_server_is_remote),
1561 ('cephfs_ganesha_server_ip', 'fakeip'),
1562 ('cephfs_ganesha_server_username', 'fake_username'),
1563 ('cephfs_ganesha_server_password', 'fakepwd'),
1564 ('cephfs_ganesha_path_to_private_key', 'fakepathtokey')]
1565 for args in conf_args_list:
1566 fake_conf.set_default(*args)
1568 driver.NFSProtocolHelper(
1569 self._execute,
1570 fake_conf,
1571 rados_client=MockRadosModule.Rados(),
1572 volname="cephfs"
1573 )
1575 if ganesha_server_is_remote:
1576 driver.ganesha_utils.SSHExecutor.assert_has_calls(
1577 [mock.call('fakeip', 22, None, 'fake_username',
1578 password='fakepwd',
1579 privatekey='fakepathtokey')])
1580 else:
1581 driver.ganesha_utils.RootExecutor.assert_has_calls(
1582 [mock.call(self._execute)])
1584 @ddt.data('fakeip', None)
1585 def test_init_identify_local_host(self, ganesha_server_ip):
1586 self.mock_object(driver.LOG, 'info')
1587 fake_conf = configuration.Configuration(None)
1588 conf_args_list = [
1589 ('cephfs_ganesha_server_ip', ganesha_server_ip),
1590 ('cephfs_ganesha_server_username', 'fake_username'),
1591 ('cephfs_ganesha_server_password', 'fakepwd'),
1592 ('cephfs_ganesha_path_to_private_key', 'fakepathtokey')]
1593 for args in conf_args_list:
1594 fake_conf.set_default(*args)
1596 driver.NFSProtocolHelper(
1597 self._execute,
1598 fake_conf,
1599 rados_client=MockRadosModule.Rados(),
1600 volname="cephfs"
1601 )
1603 driver.ganesha_utils.RootExecutor.assert_has_calls(
1604 [mock.call(self._execute)])
1605 if ganesha_server_ip:
1606 self.assertFalse(driver.socket.gethostname.called)
1607 self.assertFalse(driver.LOG.info.called)
1608 else:
1609 driver.socket.gethostname.assert_called_once_with()
1610 driver.LOG.info.assert_called_once()
1612 def test_get_export_locations_no_export_ips_configured(self):
1613 cephfs_subvolume_path = "/foo/bar"
1614 fake_conf = configuration.Configuration(None)
1615 fake_conf.set_default('cephfs_ganesha_server_ip', '1.2.3.4')
1617 helper = driver.NFSProtocolHelper(
1618 self._execute,
1619 fake_conf,
1620 rados_client=MockRadosModule.Rados(),
1621 volname="cephfs"
1622 )
1624 ret = helper.get_export_locations(self._share,
1625 cephfs_subvolume_path)
1626 self.assertEqual(
1627 [{
1628 'path': '1.2.3.4:/foo/bar',
1629 'is_admin_only': False,
1630 'metadata': {
1631 'preferred': False,
1632 },
1633 }], ret)
1635 def test_get_export_locations_with_export_ips_configured(self):
1636 fake_conf = configuration.Configuration(None)
1637 conf_args_list = [
1638 ('cephfs_ganesha_server_ip', '1.2.3.4'),
1639 ('cephfs_ganesha_export_ips',
1640 ['127.0.0.1', 'fd3f:c057:1192:1::1', '::1'])]
1641 for args in conf_args_list:
1642 fake_conf.set_default(*args)
1644 helper = driver.NFSProtocolHelper(
1645 self._execute,
1646 fake_conf,
1647 rados_client=MockRadosModule.Rados(),
1648 volname="cephfs"
1649 )
1651 cephfs_subvolume_path = "/foo/bar"
1653 ret = helper.get_export_locations(self._share, cephfs_subvolume_path)
1655 self._assertEqualListsOfObjects(
1656 [
1657 {
1658 'path': '127.0.0.1:/foo/bar',
1659 'is_admin_only': False,
1660 'metadata': {
1661 'preferred': False,
1662 },
1663 },
1664 {
1665 'path': '[fd3f:c057:1192:1::1]:/foo/bar',
1666 'is_admin_only': False,
1667 'metadata': {
1668 'preferred': False,
1669 },
1670 },
1671 {
1672 'path': '[::1]:/foo/bar',
1673 'is_admin_only': False,
1674 'metadata': {
1675 'preferred': False,
1676 },
1677 },
1678 ], ret)
1680 @ddt.data(('some.host.name', None, [4, 6]), ('host.', None, [4, 6]),
1681 ('1001::1001', None, [6]), ('1.1.1.0', None, [4]),
1682 (None, ['1001::1001', '1.1.1.0'], [6, 4]),
1683 (None, ['1001::1001'], [6]), (None, ['1.1.1.0'], [4]),
1684 (None, ['1001::1001/129', '1.1.1.0'], [4, 6]))
1685 @ddt.unpack
1686 def test_get_configured_ip_versions(
1687 self, cephfs_ganesha_server_ip, cephfs_ganesha_export_ips,
1688 configured_ip_version):
1689 fake_conf = configuration.Configuration(None)
1690 conf_args_list = [
1691 ('cephfs_ganesha_server_ip', cephfs_ganesha_server_ip),
1692 ('cephfs_ganesha_export_ips', cephfs_ganesha_export_ips)]
1694 for args in conf_args_list:
1695 fake_conf.set_default(*args)
1697 helper = driver.NFSProtocolHelper(
1698 self._execute,
1699 fake_conf,
1700 rados_client=MockRadosModule.Rados(),
1701 volname="cephfs"
1702 )
1704 self.assertEqual(set(configured_ip_version),
1705 set(helper.get_configured_ip_versions()))
1706 self.assertEqual(set(configured_ip_version),
1707 helper.configured_ip_versions)
1709 def test_get_configured_ip_versions_already_set(self):
1710 fake_conf = configuration.Configuration(None)
1711 helper = driver.NFSProtocolHelper(
1712 self._execute,
1713 fake_conf,
1714 rados_client=MockRadosModule.Rados(),
1715 volname="cephfs"
1716 )
1718 ip_versions = ['foo', 'bar']
1720 helper.configured_ip_versions = ip_versions
1722 result = helper.get_configured_ip_versions()
1724 self.assertEqual(ip_versions, result)
1726 def test_default_config_hook(self):
1727 fake_conf_dict = {'key': 'value1'}
1728 self.mock_object(driver.ganesha.GaneshaNASHelper,
1729 '_default_config_hook',
1730 mock.Mock(return_value={}))
1731 self.mock_object(driver.ganesha_utils, 'path_from',
1732 mock.Mock(return_value='/fakedir/cephfs/conf'))
1733 self.mock_object(self._nfs_helper, '_load_conf_dir',
1734 mock.Mock(return_value=fake_conf_dict))
1736 ret = self._nfs_helper._default_config_hook()
1738 (driver.ganesha.GaneshaNASHelper._default_config_hook.
1739 assert_called_once_with())
1740 driver.ganesha_utils.path_from.assert_called_once_with(
1741 driver.__file__, 'conf')
1742 self._nfs_helper._load_conf_dir.assert_called_once_with(
1743 '/fakedir/cephfs/conf')
1744 self.assertEqual(fake_conf_dict, ret)
1746 def test_fsal_hook(self):
1747 access_allow_prefix = "fs subvolume authorize"
1749 access_allow_dict = {
1750 "vol_name": self._nfs_helper.volname,
1751 "sub_name": self._share["id"],
1752 "auth_id": "ganesha-fakeid",
1753 "tenant_id": self._share["project_id"],
1754 "access_level": "rw",
1755 }
1757 expected_ret = {
1758 "Name": "Ceph",
1759 "User_Id": "ganesha-fakeid",
1760 "Secret_Access_Key": "ganesha-zorilla",
1761 "Filesystem": self._nfs_helper.volname
1762 }
1764 driver.rados_command.return_value = 'ganesha-zorilla'
1766 ret = self._nfs_helper._fsal_hook(
1767 None, self._share, None, self._share['id']
1768 )
1770 driver.rados_command.assert_called_once_with(
1771 self._nfs_helper.rados_client,
1772 access_allow_prefix, access_allow_dict)
1774 self.assertEqual(expected_ret, ret)
1776 def test_cleanup_fsal_hook(self):
1777 access_deny_prefix = "fs subvolume deauthorize"
1779 access_deny_dict = {
1780 "vol_name": self._nfs_helper.volname,
1781 "sub_name": self._share["id"],
1782 "auth_id": "ganesha-fakeid",
1783 }
1785 ret = self._nfs_helper._cleanup_fsal_hook(
1786 None, self._share, None, self._share['id']
1787 )
1789 driver.rados_command.assert_called_once_with(
1790 self._nfs_helper.rados_client,
1791 access_deny_prefix, access_deny_dict)
1793 self.assertIsNone(ret)
1795 def test_get_export_path(self):
1796 get_path_prefix = "fs subvolume getpath"
1798 get_path_dict = {
1799 "vol_name": self._nfs_helper.volname,
1800 "sub_name": self._share["id"],
1801 }
1803 driver.rados_command.return_value = '/foo/bar'
1805 ret = self._nfs_helper._get_export_path(self._share)
1807 driver.rados_command.assert_called_once_with(
1808 self._nfs_helper.rados_client,
1809 get_path_prefix, get_path_dict)
1811 self.assertEqual('/foo/bar', ret)
1813 def test_get_export_pseudo_path(self):
1814 get_path_prefix = "fs subvolume getpath"
1816 get_path_dict = {
1817 "vol_name": self._nfs_helper.volname,
1818 "sub_name": self._share["id"],
1819 }
1821 driver.rados_command.return_value = '/foo/bar'
1823 ret = self._nfs_helper._get_export_pseudo_path(self._share)
1825 driver.rados_command.assert_called_once_with(
1826 self._nfs_helper.rados_client,
1827 get_path_prefix, get_path_dict)
1829 self.assertEqual('/foo/bar', ret)
1832@ddt.ddt
1833class NFSClusterProtocolHelperTestCase(test.TestCase):
1835 def setUp(self):
1836 super(NFSClusterProtocolHelperTestCase, self).setUp()
1837 self._execute = mock.Mock()
1838 self._context = context.get_admin_context()
1839 self._share = fake_share.fake_share(share_proto='NFS')
1840 self._rados_client = MockRadosModule.Rados()
1841 self._volname = "cephfs"
1842 self.fake_conf = configuration.Configuration(None)
1844 self.mock_object(driver.NFSClusterProtocolHelper,
1845 '_get_export_path',
1846 mock.Mock(return_value="ganesha:/foo/bar"))
1847 self.mock_object(driver.NFSClusterProtocolHelper,
1848 '_get_export_pseudo_path',
1849 mock.Mock(return_value="ganesha:/foo/bar"))
1850 self.mock_object(driver, "rados_command")
1852 driver.ceph_default_target = ('mon-mgr', )
1854 self._nfscluster_protocol_helper = driver.NFSClusterProtocolHelper(
1855 self._execute,
1856 self.fake_conf,
1857 rados_client=self._rados_client,
1858 volname=self._volname)
1860 type(self._nfscluster_protocol_helper).nfs_clusterid = (
1861 mock.PropertyMock(return_value='fs-manila'))
1863 def test_get_export_ips_no_backends(self):
1864 fake_conf = configuration.Configuration(None)
1865 cluster_info = {
1866 "fs-manila": {
1867 "virtual_ip": None,
1868 "backend": []
1869 }
1870 }
1872 driver.rados_command.return_value = json.dumps(cluster_info)
1874 helper = driver.NFSClusterProtocolHelper(
1875 self._execute,
1876 fake_conf,
1877 rados_client=self._rados_client,
1878 volname=self._volname
1879 )
1881 self.assertRaises(exception.ShareBackendException,
1882 helper._get_export_ips)
1884 @ddt.data(constants.ACCESS_LEVEL_RW, constants.ACCESS_LEVEL_RO)
1885 def test_allow_access_rw_ro_when_export_does_not_exist(self, mode):
1886 export_info_prefix = "nfs export info"
1887 access_allow_prefix = "nfs export apply"
1888 nfs_clusterid = self._nfscluster_protocol_helper.nfs_clusterid
1889 volname = self._nfscluster_protocol_helper.volname
1891 driver.rados_command.return_value = {}
1893 clients = {
1894 'access_type': mode,
1895 'addresses': ['10.0.0.1'],
1896 'squash': 'none'
1897 }
1899 export_info_dict = {
1900 "cluster_id": nfs_clusterid,
1901 "pseudo_path": "ganesha:/foo/bar",
1902 }
1904 access_allow_dict = {
1905 "cluster_id": nfs_clusterid,
1906 }
1908 export = {
1909 "path": "ganesha:/foo/bar",
1910 "cluster_id": nfs_clusterid,
1911 "pseudo": "ganesha:/foo/bar",
1912 "squash": "none",
1913 "security_label": True,
1914 "fsal": {
1915 "name": "CEPH",
1916 "fs_name": volname,
1918 },
1919 "clients": clients
1920 }
1922 inbuf = json.dumps(export).encode('utf-8')
1924 self._nfscluster_protocol_helper._allow_access(
1925 self._share, clients, sub_name=self._share['id']
1926 )
1928 driver.rados_command.assert_has_calls([
1929 mock.call(self._rados_client,
1930 export_info_prefix,
1931 export_info_dict, json_obj=True),
1932 mock.call(self._rados_client,
1933 access_allow_prefix,
1934 access_allow_dict, inbuf=inbuf)])
1936 self.assertEqual(2, driver.rados_command.call_count)
1938 @ddt.data(constants.ACCESS_LEVEL_RW, constants.ACCESS_LEVEL_RO)
1939 def test_allow_access_rw_ro_when_export_exist(self, mode):
1940 export_info_prefix = "nfs export info"
1941 access_allow_prefix = "nfs export apply"
1942 nfs_clusterid = self._nfscluster_protocol_helper.nfs_clusterid
1943 volname = self._nfscluster_protocol_helper.volname
1945 new_clients = {
1946 'access_type': mode,
1947 'addresses': ['10.0.0.2'],
1948 'squash': 'none'
1949 }
1951 export_info_dict = {
1952 "cluster_id": nfs_clusterid,
1953 "pseudo_path": "ganesha:/foo/bar",
1954 }
1956 access_allow_dict = {
1957 "cluster_id": nfs_clusterid,
1958 }
1960 export = {
1961 "path": "ganesha:/foo/bar",
1962 "cluster_id": nfs_clusterid,
1963 "pseudo": "ganesha:/foo/bar",
1964 "squash": "none",
1965 "security_label": True,
1966 "fsal": {
1967 "name": "CEPH",
1968 "User_Id": "nfs.user",
1969 "fs_name": volname
1971 },
1972 "clients": {
1973 'access_type': "ro",
1974 'addresses': ['10.0.0.1'],
1975 'squash': 'none'
1976 }
1977 }
1979 driver.rados_command.return_value = export
1980 export['clients'] = new_clients
1981 inbuf = json.dumps(export).encode('utf-8')
1983 self._nfscluster_protocol_helper._allow_access(
1984 self._share, new_clients, sub_name=self._share['id']
1985 )
1987 driver.rados_command.assert_has_calls([
1988 mock.call(self._rados_client,
1989 export_info_prefix,
1990 export_info_dict, json_obj=True),
1991 mock.call(self._rados_client,
1992 access_allow_prefix,
1993 access_allow_dict, inbuf=inbuf)])
1995 self.assertEqual(2, driver.rados_command.call_count)
1997 def test_deny_access(self):
1998 access_deny_prefix = "nfs export rm"
2000 nfs_clusterid = self._nfscluster_protocol_helper.nfs_clusterid
2002 access_deny_dict = {
2003 "cluster_id": nfs_clusterid,
2004 "pseudo_path": "ganesha:/foo/bar"
2005 }
2007 self._nfscluster_protocol_helper._deny_access(
2008 self._share, self._share['id']
2009 )
2011 driver.rados_command.assert_called_once_with(
2012 self._rados_client,
2013 access_deny_prefix, access_deny_dict)
2015 def test_get_export_locations(self):
2016 cluster_info_prefix = "nfs cluster info"
2017 nfs_clusterid = self._nfscluster_protocol_helper.nfs_clusterid
2019 cluster_info_dict = {
2020 "cluster_id": nfs_clusterid,
2021 }
2023 cluster_info = {"fs-manila": {
2024 "virtual_ip": None,
2025 "backend": [
2026 {"hostname": "fake-ceph-node-1",
2027 "ip": "10.0.0.10",
2028 "port": "1010"},
2029 {"hostname": "fake-ceph-node-2",
2030 "ip": "10.0.0.11",
2031 "port": "1011"}
2032 ]
2033 }}
2035 driver.rados_command.return_value = json.dumps(cluster_info)
2037 fake_cephfs_subvolume_path = "/foo/bar"
2038 expected_export_locations = [{
2039 'path': '10.0.0.10:/foo/bar',
2040 'is_admin_only': False,
2041 'metadata': {
2042 'preferred': True,
2043 },
2044 }, {
2045 'path': '10.0.0.11:/foo/bar',
2046 'is_admin_only': False,
2047 'metadata': {
2048 'preferred': True,
2049 },
2050 }]
2052 export_locations = (
2053 self._nfscluster_protocol_helper.get_export_locations(
2054 self._share, fake_cephfs_subvolume_path))
2056 driver.rados_command.assert_called_once_with(
2057 self._rados_client,
2058 cluster_info_prefix, cluster_info_dict)
2060 self._assertEqualListsOfObjects(expected_export_locations,
2061 export_locations)
2063 @ddt.data('cephfs_ganesha_server_ip', 'cephfs_ganesha_export_ips')
2064 def test_get_export_locations_ganesha_still_configured(self, confopt):
2065 if confopt == 'cephfs_ganesha_server_ip':
2066 val = '10.0.0.1'
2067 else:
2068 val = ['10.0.0.2', '10.0.0.3']
2070 cluster_info_prefix = "nfs cluster info"
2071 nfs_clusterid = self._nfscluster_protocol_helper.nfs_clusterid
2072 self.fake_conf.set_default(confopt, val)
2074 cluster_info_dict = {
2075 "cluster_id": nfs_clusterid,
2076 }
2078 cluster_info = {"fs-manila": {
2079 "virtual_ip": None,
2080 "backend": [
2081 {"hostname": "fake-ceph-node-1",
2082 "ip": "10.0.0.10",
2083 "port": "1010"},
2084 {"hostname": "fake-ceph-node-2",
2085 "ip": "10.0.0.11",
2086 "port": "1011"}
2087 ]
2088 }}
2090 driver.rados_command.return_value = json.dumps(cluster_info)
2092 fake_cephfs_subvolume_path = "/foo/bar"
2093 expected_export_locations = [
2094 {
2095 'path': '10.0.0.10:/foo/bar',
2096 'is_admin_only': False,
2097 'metadata': {
2098 'preferred': True,
2099 },
2100 },
2101 {
2102 'path': '10.0.0.11:/foo/bar',
2103 'is_admin_only': False,
2104 'metadata': {
2105 'preferred': True,
2106 },
2107 },
2108 ]
2110 if isinstance(val, list):
2111 for ip in val:
2112 expected_export_locations.append(
2113 {
2114 'path': f'{ip}:/foo/bar',
2115 'is_admin_only': False,
2116 'metadata': {
2117 'preferred': False,
2118 },
2119 },
2120 )
2121 else:
2122 expected_export_locations.append(
2123 {
2124 'path': f'{val}:/foo/bar',
2125 'is_admin_only': False,
2126 'metadata': {
2127 'preferred': False,
2128 },
2129 }
2130 )
2132 expected_export_locations = sorted(
2133 expected_export_locations,
2134 key=lambda d: d['path']
2135 )
2136 export_locations = (
2137 self._nfscluster_protocol_helper.get_export_locations(
2138 self._share, fake_cephfs_subvolume_path)
2139 )
2141 actual_export_locations = sorted(
2142 export_locations,
2143 key=lambda d: d['path']
2144 )
2146 driver.rados_command.assert_called_once_with(
2147 self._rados_client,
2148 cluster_info_prefix, cluster_info_dict)
2150 self.assertEqual(expected_export_locations,
2151 actual_export_locations)
2154@ddt.ddt
2155class CephFSDriverAltConfigTestCase(test.TestCase):
2156 """Test the CephFS driver with non-default config values."""
2158 def setUp(self):
2159 super(CephFSDriverAltConfigTestCase, self).setUp()
2160 self._execute = mock.Mock()
2161 self.fake_conf = configuration.Configuration(None)
2162 self._rados_client = MockRadosModule.Rados()
2163 self._context = context.get_admin_context()
2164 self._share = fake_share.fake_share(share_proto='CEPHFS')
2166 self.fake_conf.set_default('driver_handles_share_servers', False)
2167 self.fake_conf.set_default('cephfs_auth_id', 'manila')
2169 self.mock_object(driver, "rados", MockRadosModule)
2170 self.mock_object(driver, "json_command",
2171 MockCephArgparseModule.json_command)
2172 self.mock_object(driver, "rados_command")
2173 self.mock_object(driver, 'NativeProtocolHelper')
2174 self.mock_object(driver, 'NFSProtocolHelper')
2176 driver.ceph_default_target = ('mon-mgr', )
2178 @ddt.data('cephfs', 'nfs')
2179 def test_do_setup_alt_volume_mode(self, protocol_helper):
2180 self.fake_conf.set_default('cephfs_volume_mode', ALT_VOLUME_MODE)
2181 self._driver = driver.CephFSDriver(execute=self._execute,
2182 configuration=self.fake_conf,
2183 rados_client=self._rados_client)
2184 self.mock_object(
2185 self._driver, '_get_cephfs_filesystem_allocation',
2186 mock.Mock(return_value=10)
2187 )
2189 type(self._driver).volname = mock.PropertyMock(return_value='cephfs')
2191 self._driver.configuration.cephfs_protocol_helper_type = (
2192 protocol_helper)
2194 self._driver.do_setup(self._context)
2196 if protocol_helper == 'cephfs':
2197 driver.NativeProtocolHelper.assert_called_once_with(
2198 self._execute, self._driver.configuration,
2199 rados_client=self._driver.rados_client,
2200 volname=self._driver.volname)
2201 else:
2202 driver.NFSProtocolHelper.assert_called_once_with(
2203 self._execute, self._driver.configuration,
2204 rados_client=self._driver._rados_client,
2205 volname=self._driver.volname)
2207 self._driver.protocol_helper.init_helper.assert_called_once_with()
2209 self.assertEqual(ALT_VOLUME_MODE, self._driver._cephfs_volume_mode)
2211 @ddt.data('0o759', '0x755', '12a3')
2212 def test_volume_mode_exception(self, volume_mode):
2213 # cephfs_volume_mode must be a string representing an int as octal
2214 self.fake_conf.set_default('cephfs_volume_mode', volume_mode)
2216 self.assertRaises(exception.BadConfigurationException,
2217 driver.CephFSDriver, execute=self._execute,
2218 configuration=self.fake_conf)
2221@ddt.ddt
2222class MiscTests(test.TestCase):
2224 @ddt.data({'import_exc': None},
2225 {'import_exc': ImportError})
2226 @ddt.unpack
2227 def test_rados_module_missing(self, import_exc):
2228 driver.rados = None
2229 with mock.patch.object(
2230 driver.importutils,
2231 'import_module',
2232 side_effect=import_exc) as mock_import_module:
2233 if import_exc:
2234 self.assertRaises(
2235 exception.ShareBackendException, driver.setup_rados)
2236 else:
2237 driver.setup_rados()
2238 self.assertEqual(mock_import_module.return_value,
2239 driver.rados)
2241 mock_import_module.assert_called_once_with('rados')
2243 @ddt.data({'import_exc': None},
2244 {'import_exc': ImportError})
2245 @ddt.unpack
2246 def test_setup_json_class_missing(self, import_exc):
2247 driver.json_command = None
2248 with mock.patch.object(
2249 driver.importutils,
2250 'import_class',
2251 side_effect=import_exc) as mock_import_class:
2252 if import_exc:
2253 self.assertRaises(
2254 exception.ShareBackendException, driver.setup_json_command)
2255 else:
2256 driver.setup_json_command()
2257 self.assertEqual(mock_import_class.return_value,
2258 driver.json_command)
2259 mock_import_class.assert_called_once_with(
2260 'ceph_argparse.json_command')