Coverage for manila/tests/share/drivers/glusterfs/test_layout.py: 100%
176 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2015 Red Hat, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import errno
17import os
18from unittest import mock
20import ddt
21from oslo_config import cfg
22from oslo_utils import importutils
24from manila import exception
25from manila.share import configuration as config
26from manila.share import driver
27from manila.share.drivers.glusterfs import layout
28from manila import test
29from manila.tests import fake_share
30from manila.tests import fake_utils
33CONF = cfg.CONF
36fake_local_share_path = '/mnt/nfs/testvol/fakename'
38fake_path_to_private_key = '/fakepath/to/privatekey'
39fake_remote_server_password = 'fakepassword'
42def fake_access(kwargs):
43 fake_access_rule = fake_share.fake_access(**kwargs)
44 fake_access_rule.to_dict = lambda: fake_access_rule.values
45 return fake_access_rule
48class GlusterfsFakeShareDriver(layout.GlusterfsShareDriverBase):
50 supported_layouts = ('layout_fake.FakeLayout',
51 'layout_something.SomeLayout')
52 supported_protocols = ('NFS,')
53 _supported_access_types = ('ip',)
54 _supported_access_levels = ('rw',)
57@ddt.ddt
58class GlusterfsShareDriverBaseTestCase(test.TestCase):
59 """Tests GlusterfsShareDriverBase."""
61 def setUp(self):
62 super(GlusterfsShareDriverBaseTestCase, self).setUp()
63 CONF.set_default('driver_handles_share_servers', False)
64 fake_conf, __ = self._setup()
65 self._driver = GlusterfsFakeShareDriver(False, configuration=fake_conf)
66 self.fake_share = mock.Mock(name='fake_share')
67 self.fake_context = mock.Mock(name='fake_context')
68 self.fake_access = mock.Mock(name='fake_access')
70 def _setup(self):
71 fake_conf = config.Configuration(None)
72 fake_layout = mock.Mock()
73 self.mock_object(importutils, "import_object",
74 mock.Mock(return_value=fake_layout))
75 return fake_conf, fake_layout
77 def test_init(self):
78 self.assertRaises(IndexError, layout.GlusterfsShareDriverBase, False,
79 configuration=config.Configuration(None))
81 @ddt.data({'has_snap': None, 'layout_name': None},
82 {'has_snap': False, 'layout_name': 'layout_fake.FakeLayout'},
83 {'has_snap': True, 'layout_name': 'layout_something.SomeLayout'})
84 @ddt.unpack
85 def test_init_subclass(self, has_snap, layout_name):
86 conf, _layout = self._setup()
87 if layout_name is not None:
88 conf.glusterfs_share_layout = layout_name
89 if has_snap is None:
90 del _layout._snapshots_are_supported
91 else:
92 _layout._snapshots_are_supported = has_snap
94 _driver = GlusterfsFakeShareDriver(False, configuration=conf)
96 snap_result = {None: False}.get(has_snap, has_snap)
97 layout_result = {None: 'layout_fake.FakeLayout'}.get(layout_name,
98 layout_name)
100 importutils.import_object.assert_called_once_with(
101 'manila.share.drivers.glusterfs.%s' % layout_result,
102 _driver, configuration=conf)
103 self.assertEqual(_layout, _driver.layout)
104 self.assertEqual(snap_result, _driver.snapshots_are_supported)
106 def test_init_nosupp_layout(self):
107 conf = config.Configuration(None)
108 conf.glusterfs_share_layout = 'nonsense_layout'
110 self.assertRaises(exception.GlusterfsException,
111 GlusterfsFakeShareDriver, False, configuration=conf)
113 def test_setup_via_manager(self):
114 self.assertIsNone(self._driver._setup_via_manager(mock.Mock()))
116 def test_supported_access_types(self):
117 self.assertEqual(('ip',), self._driver.supported_access_types)
119 def test_supported_access_levels(self):
120 self.assertEqual(('rw',), self._driver.supported_access_levels)
122 def test_access_rule_validator(self):
123 rule = mock.Mock()
124 abort = mock.Mock()
125 valid = mock.Mock()
126 self.mock_object(layout.ganesha_utils, 'validate_access_rule',
127 mock.Mock(return_value=valid))
129 ret = self._driver._access_rule_validator(abort)(rule)
131 self.assertEqual(valid, ret)
132 layout.ganesha_utils.validate_access_rule.assert_called_once_with(
133 ('ip',), ('rw',), rule, abort)
135 @ddt.data({'inset': ([], ['ADD'], []), 'outset': (['ADD'], []),
136 'recovery': False},
137 {'inset': ([], [], ['DELETE']), 'outset': ([], ['DELETE']),
138 'recovery': False},
139 {'inset': (['EXISTING'], ['ADD'], ['DELETE']),
140 'outset': (['ADD'], ['DELETE']), 'recovery': False},
141 {'inset': (['EXISTING'], [], []), 'outset': (['EXISTING'], []),
142 'recovery': True})
143 @ddt.unpack
144 def test_update_access(self, inset, outset, recovery):
145 conf, _layout = self._setup()
146 gluster_mgr = mock.Mock(name='gluster_mgr')
147 self.mock_object(_layout, '_share_manager',
148 mock.Mock(return_value=gluster_mgr))
149 _driver = GlusterfsFakeShareDriver(False, configuration=conf)
150 self.mock_object(_driver, '_update_access_via_manager', mock.Mock())
151 rulemap = {t: fake_access({'access_type': "ip",
152 'access_level': "rw",
153 'access_to': t}) for t in (
154 'EXISTING', 'ADD', 'DELETE')}
155 in_rules, out_rules = (
156 [
157 [
158 rulemap[t] for t in r
159 ] for r in rs
160 ] for rs in (inset, outset))
162 _driver.update_access(
163 self.fake_context, self.fake_share, *in_rules, [])
165 _layout._share_manager.assert_called_once_with(self.fake_share)
166 _driver._update_access_via_manager.assert_called_once_with(
167 gluster_mgr, self.fake_context, self.fake_share,
168 *out_rules, recovery=recovery)
170 def test_update_access_via_manager(self):
171 self.assertRaises(NotImplementedError,
172 self._driver._update_access_via_manager,
173 mock.Mock(), self.fake_context, self.fake_share,
174 [self.fake_access], [self.fake_access])
176 @ddt.data('NFS', 'PROTATO')
177 def test_check_proto_baseclass(self, proto):
178 self.assertRaises(exception.ShareBackendException,
179 layout.GlusterfsShareDriverBase._check_proto,
180 {'share_proto': proto})
182 def test_check_proto(self):
183 GlusterfsFakeShareDriver._check_proto({'share_proto': 'NFS'})
185 def test_check_proto_notsupported(self):
186 self.assertRaises(exception.ShareBackendException,
187 GlusterfsFakeShareDriver._check_proto,
188 {'share_proto': 'PROTATO'})
190 @ddt.data('', '_from_snapshot')
191 def test_create_share(self, variant):
192 conf, _layout = self._setup()
193 _driver = GlusterfsFakeShareDriver(False, configuration=conf)
194 self.mock_object(_driver, '_check_proto', mock.Mock())
196 getattr(_driver, 'create_share%s' % variant)(self.fake_context,
197 self.fake_share)
199 _driver._check_proto.assert_called_once_with(self.fake_share)
200 getattr(_layout,
201 'create_share%s' % variant).assert_called_once_with(
202 self.fake_context, self.fake_share)
204 @ddt.data(True, False)
205 def test_update_share_stats(self, internal_exception):
206 data = mock.Mock()
207 conf, _layout = self._setup()
209 def raise_exception(*args, **kwargs):
210 raise NotImplementedError
211 layoutstats = mock.Mock()
213 mock_kw = ({'side_effect': raise_exception} if internal_exception
214 else {'return_value': layoutstats})
216 self.mock_object(_layout, '_update_share_stats', mock.Mock(**mock_kw))
217 self.mock_object(driver.ShareDriver, '_update_share_stats',
218 mock.Mock())
219 _driver = GlusterfsFakeShareDriver(False, configuration=conf)
220 _driver._update_share_stats(data)
222 if internal_exception:
223 self.assertFalse(data.update.called)
224 else:
225 data.update.assert_called_once_with(layoutstats)
226 driver.ShareDriver._update_share_stats.assert_called_once_with(
227 data)
229 @ddt.data('do_setup', 'create_snapshot', 'delete_share', 'delete_snapshot',
230 'ensure_share', 'manage_existing', 'unmanage', 'extend_share',
231 'shrink_share')
232 def test_delegated_methods(self, method):
233 conf, _layout = self._setup()
234 _driver = GlusterfsFakeShareDriver(False, configuration=conf)
235 fake_args = (mock.Mock(), mock.Mock(), mock.Mock())
237 getattr(_driver, method)(*fake_args)
239 getattr(_layout, method).assert_called_once_with(*fake_args)
242@ddt.ddt
243class GlusterfsShareLayoutBaseTestCase(test.TestCase):
244 """Tests GlusterfsShareLayoutBaseTestCase."""
246 def setUp(self):
247 super(GlusterfsShareLayoutBaseTestCase, self).setUp()
248 fake_utils.stub_out_utils_execute(self)
249 self._execute = fake_utils.fake_execute
250 self.addCleanup(fake_utils.fake_execute_set_repliers, [])
251 self.addCleanup(fake_utils.fake_execute_clear_log)
252 self.fake_driver = mock.Mock()
253 self.mock_object(self.fake_driver, '_execute',
254 self._execute)
256 class FakeLayout(layout.GlusterfsShareLayoutBase):
258 def _share_manager(self, share):
259 """Return GlusterManager object representing share's backend."""
261 def do_setup(self, context):
262 """Any initialization the share driver does while starting."""
264 def create_share(self, context, share, share_server=None):
265 """Is called to create share."""
267 def create_share_from_snapshot(self, context, share, snapshot,
268 share_server=None, parent_share=None):
269 """Is called to create share from snapshot."""
271 def create_snapshot(self, context, snapshot, share_server=None):
272 """Is called to create snapshot."""
274 def delete_share(self, context, share, share_server=None):
275 """Is called to remove share."""
277 def delete_snapshot(self, context, snapshot, share_server=None):
278 """Is called to remove snapshot."""
280 def ensure_share(self, context, share, share_server=None):
281 """Invoked to ensure that share is exported."""
283 def manage_existing(self, share, driver_options):
284 """Brings an existing share under Manila management."""
286 def unmanage(self, share):
287 """Removes the specified share from Manila management."""
289 def extend_share(self, share, new_size, share_server=None):
290 """Extends size of existing share."""
292 def shrink_share(self, share, new_size, share_server=None):
293 """Shrinks size of existing share."""
295 def test_init_invalid(self):
296 self.assertRaises(TypeError, layout.GlusterfsShareLayoutBase,
297 mock.Mock())
299 def test_subclass(self):
300 fake_conf = mock.Mock()
301 _layout = self.FakeLayout(self.fake_driver, configuration=fake_conf)
303 self.assertEqual(fake_conf, _layout.configuration)
304 self.assertRaises(NotImplementedError, _layout._update_share_stats)
306 def test_check_mount_glusterfs(self):
307 fake_conf = mock.Mock()
308 _driver = mock.Mock()
309 _driver._execute = mock.Mock()
310 _layout = self.FakeLayout(_driver, configuration=fake_conf)
312 _layout._check_mount_glusterfs()
314 _driver._execute.assert_called_once_with(
315 'mount.glusterfs',
316 check_exit_code=False)
318 @ddt.data({'_errno': errno.ENOENT,
319 '_exception': exception.GlusterfsException},
320 {'_errno': errno.EACCES, '_exception': OSError})
321 @ddt.unpack
322 def test_check_mount_glusterfs_not_installed(self, _errno, _exception):
323 fake_conf = mock.Mock()
324 _layout = self.FakeLayout(self.fake_driver, configuration=fake_conf)
326 def exec_runner(*ignore_args, **ignore_kwargs):
327 raise OSError(_errno, os.strerror(_errno))
329 expected_exec = ['mount.glusterfs']
330 fake_utils.fake_execute_set_repliers([(expected_exec[0], exec_runner)])
332 self.assertRaises(_exception, _layout._check_mount_glusterfs)