Coverage for manila/tests/share/drivers/test_glusterfs.py: 100%
261 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2014 Red Hat, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import copy
17import socket
18from unittest import mock
20import ddt
21from oslo_config import cfg
23from manila import context
24from manila import exception
25from manila.share import configuration as config
26from manila.share.drivers import ganesha
27from manila.share.drivers import glusterfs
28from manila.share.drivers.glusterfs import layout
29from manila import test
30from manila.tests import fake_share
31from manila.tests import fake_utils
34CONF = cfg.CONF
37fake_gluster_manager_attrs = {
38 'export': '127.0.0.1:/testvol',
39 'host': '127.0.0.1',
40 'qualified': 'testuser@127.0.0.1:/testvol',
41 'user': 'testuser',
42 'volume': 'testvol',
43 'path_to_private_key': '/fakepath/to/privatekey',
44 'remote_server_password': 'fakepassword',
45}
47fake_share_name = 'fakename'
48NFS_EXPORT_DIR = 'nfs.export-dir'
49NFS_EXPORT_VOL = 'nfs.export-volumes'
50NFS_RPC_AUTH_ALLOW = 'nfs.rpc-auth-allow'
51NFS_RPC_AUTH_REJECT = 'nfs.rpc-auth-reject'
54@ddt.ddt
55class GlusterfsShareDriverTestCase(test.TestCase):
56 """Tests GlusterfsShareDriver."""
58 def setUp(self):
59 super(GlusterfsShareDriverTestCase, self).setUp()
60 fake_utils.stub_out_utils_execute(self)
61 self._execute = fake_utils.fake_execute
62 self._context = context.get_admin_context()
63 self.addCleanup(fake_utils.fake_execute_set_repliers, [])
64 self.addCleanup(fake_utils.fake_execute_clear_log)
66 CONF.set_default('reserved_share_percentage', 50)
67 CONF.set_default('reserved_share_from_snapshot_percentage', 30)
68 CONF.set_default('reserved_share_extend_percentage', 30)
69 CONF.set_default('driver_handles_share_servers', False)
71 self.fake_conf = config.Configuration(None)
72 self._driver = glusterfs.GlusterfsShareDriver(
73 execute=self._execute,
74 configuration=self.fake_conf)
75 self.share = fake_share.fake_share(share_proto='NFS')
77 def test_do_setup(self):
78 self.mock_object(self._driver, '_get_helper')
79 self.mock_object(layout.GlusterfsShareDriverBase, 'do_setup')
80 _context = mock.Mock()
82 self._driver.do_setup(_context)
84 self._driver._get_helper.assert_called_once_with()
85 layout.GlusterfsShareDriverBase.do_setup.assert_called_once_with(
86 _context)
88 @ddt.data(True, False)
89 def test_setup_via_manager(self, has_parent):
90 gmgr = mock.Mock()
91 share_mgr_parent = mock.Mock() if has_parent else None
92 nfs_helper = mock.Mock()
93 nfs_helper.get_export = mock.Mock(return_value='host:/vol')
94 self._driver.nfs_helper = mock.Mock(return_value=nfs_helper)
96 ret = self._driver._setup_via_manager(
97 {'manager': gmgr, 'share': self.share},
98 share_manager_parent=share_mgr_parent)
100 gmgr.set_vol_option.assert_called_once_with(
101 'nfs.export-volumes', False)
102 self._driver.nfs_helper.assert_called_once_with(
103 self._execute, self.fake_conf, gluster_manager=gmgr)
104 nfs_helper.get_export.assert_called_once_with(self.share)
105 self.assertEqual('host:/vol', ret)
107 @ddt.data({'helpercls': None, 'path': '/fakepath'},
108 {'helpercls': None, 'path': None},
109 {'helpercls': glusterfs.GlusterNFSHelper, 'path': '/fakepath'},
110 {'helpercls': glusterfs.GlusterNFSHelper, 'path': None})
111 @ddt.unpack
112 def test_setup_via_manager_path(self, helpercls, path):
113 gmgr = mock.Mock()
114 gmgr.path = path
115 if not helpercls:
116 helper = mock.Mock()
117 helper.get_export = mock.Mock(return_value='host:/vol')
118 helpercls = mock.Mock(return_value=helper)
119 self._driver.nfs_helper = helpercls
120 if helpercls == glusterfs.GlusterNFSHelper and path is None:
121 gmgr.get_vol_option = mock.Mock(return_value=True)
123 self._driver._setup_via_manager(
124 {'manager': gmgr, 'share': self.share})
126 if helpercls == glusterfs.GlusterNFSHelper and path is None:
127 gmgr.get_vol_option.assert_called_once_with(
128 NFS_EXPORT_VOL, boolean=True)
129 args = (NFS_RPC_AUTH_REJECT, '*')
130 else:
131 args = (NFS_EXPORT_VOL, False)
132 gmgr.set_vol_option.assert_called_once_with(*args)
134 def test_setup_via_manager_export_volumes_off(self):
135 gmgr = mock.Mock()
136 gmgr.path = None
137 gmgr.get_vol_option = mock.Mock(return_value=False)
138 self._driver.nfs_helper = glusterfs.GlusterNFSHelper
140 self.assertRaises(exception.GlusterfsException,
141 self._driver._setup_via_manager,
142 {'manager': gmgr, 'share': self.share})
144 gmgr.get_vol_option.assert_called_once_with(NFS_EXPORT_VOL,
145 boolean=True)
147 def test_check_for_setup_error(self):
148 self._driver.check_for_setup_error()
150 def test_update_share_stats(self):
151 self.mock_object(layout.GlusterfsShareDriverBase,
152 '_update_share_stats')
154 self._driver._update_share_stats()
156 (layout.GlusterfsShareDriverBase._update_share_stats.
157 assert_called_once_with({'storage_protocol': 'NFS',
158 'vendor_name': 'Red Hat',
159 'share_backend_name': 'GlusterFS',
160 'reserved_percentage': 50,
161 'reserved_snapshot_percentage': 30,
162 'reserved_share_extend_percentage': 30}))
164 def test_get_network_allocations_number(self):
165 self.assertEqual(0, self._driver.get_network_allocations_number())
167 def test_get_helper(self):
168 ret = self._driver._get_helper()
169 self.assertIsInstance(ret, self._driver.nfs_helper)
171 @ddt.data({'path': '/fakepath', 'helper': glusterfs.GlusterNFSHelper},
172 {'path': None, 'helper': glusterfs.GlusterNFSVolHelper})
173 @ddt.unpack
174 def test_get_helper_vol(self, path, helper):
175 self._driver.nfs_helper = glusterfs.GlusterNFSHelper
177 gmgr = mock.Mock(path=path)
178 ret = self._driver._get_helper(gmgr)
180 self.assertIsInstance(ret, helper)
182 @ddt.data('type', 'level')
183 def test_supported_access_features(self, feature):
184 nfs_helper = mock.Mock()
185 supported_access_feature = mock.Mock()
186 setattr(nfs_helper, 'supported_access_%ss' % feature,
187 supported_access_feature)
188 self.mock_object(self._driver, 'nfs_helper', nfs_helper)
190 ret = getattr(self._driver, 'supported_access_%ss' % feature)
192 self.assertEqual(supported_access_feature, ret)
194 def test_update_access_via_manager(self):
195 self.mock_object(self._driver, '_get_helper')
196 gmgr = mock.Mock()
197 add_rules = mock.Mock()
198 delete_rules = mock.Mock()
200 self._driver._update_access_via_manager(
201 gmgr, self._context, self.share,
202 add_rules, delete_rules, recovery=True)
204 self._driver._get_helper.assert_called_once_with(gmgr)
205 self._driver._get_helper().update_access.assert_called_once_with(
206 '/', self.share, add_rules, delete_rules, recovery=True)
209@ddt.ddt
210class GlusterNFSHelperTestCase(test.TestCase):
211 """Tests GlusterNFSHelper."""
213 def setUp(self):
214 super(GlusterNFSHelperTestCase, self).setUp()
215 fake_utils.stub_out_utils_execute(self)
216 gluster_manager = mock.Mock(**fake_gluster_manager_attrs)
217 self._execute = mock.Mock(return_value=('', ''))
218 self.fake_conf = config.Configuration(None)
219 self._helper = glusterfs.GlusterNFSHelper(
220 self._execute, self.fake_conf, gluster_manager=gluster_manager)
222 def test_get_export(self):
223 ret = self._helper.get_export(mock.Mock())
225 self.assertEqual(fake_gluster_manager_attrs['export'], ret)
227 @ddt.data({'output_str': '/foo(10.0.0.1|10.0.0.2),/bar(10.0.0.1)',
228 'expected': {'foo': ['10.0.0.1', '10.0.0.2'],
229 'bar': ['10.0.0.1']}},
230 {'output_str': None, 'expected': {}})
231 @ddt.unpack
232 def test_get_export_dir_dict(self, output_str, expected):
233 self.mock_object(self._helper.gluster_manager,
234 'get_vol_option',
235 mock.Mock(return_value=output_str))
237 ret = self._helper._get_export_dir_dict()
239 self.assertEqual(expected, ret)
240 (self._helper.gluster_manager.get_vol_option.
241 assert_called_once_with(NFS_EXPORT_DIR))
243 @ddt.data({'delta': (['10.0.0.2'], []), 'extra_exports': {},
244 'new_exports': '/fakename(10.0.0.1|10.0.0.2)'},
245 {'delta': (['10.0.0.1'], []), 'extra_exports': {},
246 'new_exports': '/fakename(10.0.0.1)'},
247 {'delta': ([], ['10.0.0.2']), 'extra_exports': {},
248 'new_exports': '/fakename(10.0.0.1)'},
249 {'delta': ([], ['10.0.0.1']), 'extra_exports': {},
250 'new_exports': None},
251 {'delta': ([], ['10.0.0.1']),
252 'extra_exports': {'elsewhere': ['10.0.1.3']},
253 'new_exports': '/elsewhere(10.0.1.3)'})
254 @ddt.unpack
255 def test_update_access(self, delta, extra_exports, new_exports):
256 gluster_manager_attrs = {'path': '/fakename'}
257 gluster_manager_attrs.update(fake_gluster_manager_attrs)
258 gluster_mgr = mock.Mock(**gluster_manager_attrs)
259 helper = glusterfs.GlusterNFSHelper(
260 self._execute, self.fake_conf, gluster_manager=gluster_mgr)
261 export_dir_dict = {'fakename': ['10.0.0.1']}
262 export_dir_dict.update(extra_exports)
263 helper._get_export_dir_dict = mock.Mock(return_value=export_dir_dict)
264 _share = mock.Mock()
266 add_rules, delete_rules = (
267 map(lambda a: {'access_to': a}, r) for r in delta)
268 helper.update_access('/', _share, add_rules, delete_rules)
270 helper._get_export_dir_dict.assert_called_once_with()
271 gluster_mgr.set_vol_option.assert_called_once_with(NFS_EXPORT_DIR,
272 new_exports)
274 @ddt.data({}, {'elsewhere': '10.0.1.3'})
275 def test_update_access_disjoint(self, export_dir_dict):
276 gluster_manager_attrs = {'path': '/fakename'}
277 gluster_manager_attrs.update(fake_gluster_manager_attrs)
278 gluster_mgr = mock.Mock(**gluster_manager_attrs)
279 helper = glusterfs.GlusterNFSHelper(
280 self._execute, self.fake_conf, gluster_manager=gluster_mgr)
281 helper._get_export_dir_dict = mock.Mock(return_value=export_dir_dict)
282 _share = mock.Mock()
284 helper.update_access('/', _share, [], [{'access_to': '10.0.0.2'}])
286 helper._get_export_dir_dict.assert_called_once_with()
287 self.assertFalse(gluster_mgr.set_vol_option.called)
290@ddt.ddt
291class GlusterNFSVolHelperTestCase(test.TestCase):
292 """Tests GlusterNFSVolHelper."""
294 def setUp(self):
295 super(GlusterNFSVolHelperTestCase, self).setUp()
296 fake_utils.stub_out_utils_execute(self)
297 gluster_manager = mock.Mock(**fake_gluster_manager_attrs)
298 self._execute = mock.Mock(return_value=('', ''))
299 self.fake_conf = config.Configuration(None)
300 self._helper = glusterfs.GlusterNFSVolHelper(
301 self._execute, self.fake_conf, gluster_manager=gluster_manager)
303 @ddt.data({'output_str': '10.0.0.1,10.0.0.2',
304 'expected': ['10.0.0.1', '10.0.0.2']},
305 {'output_str': None, 'expected': []})
306 @ddt.unpack
307 def test_get_vol_exports(self, output_str, expected):
308 self.mock_object(self._helper.gluster_manager,
309 'get_vol_option',
310 mock.Mock(return_value=output_str))
312 ret = self._helper._get_vol_exports()
314 self.assertEqual(expected, ret)
315 (self._helper.gluster_manager.get_vol_option.
316 assert_called_once_with(NFS_RPC_AUTH_ALLOW))
318 @ddt.data({'delta': (["10.0.0.1"], []), 'expected': "10.0.0.1,10.0.0.3"},
319 {'delta': (["10.0.0.2"], []),
320 'expected': "10.0.0.1,10.0.0.2,10.0.0.3"},
321 {'delta': ([], ["10.0.0.1"]), 'expected': "10.0.0.3"},
322 {'delta': ([], ["10.0.0.2"]), 'expected': "10.0.0.1,10.0.0.3"})
323 @ddt.unpack
324 def test_update_access(self, delta, expected):
325 self.mock_object(self._helper, '_get_vol_exports', mock.Mock(
326 return_value=["10.0.0.1", "10.0.0.3"]))
327 _share = mock.Mock()
329 add_rules, delete_rules = (
330 map(lambda a: {'access_to': a}, r) for r in delta)
331 self._helper.update_access("/", _share, add_rules, delete_rules)
333 self._helper._get_vol_exports.assert_called_once_with()
334 argseq = [(NFS_RPC_AUTH_ALLOW, expected), (NFS_RPC_AUTH_REJECT, None)]
335 self.assertEqual(
336 [mock.call(*a) for a in argseq],
337 self._helper.gluster_manager.set_vol_option.call_args_list)
339 def test_update_access_empty(self):
340 self.mock_object(self._helper, '_get_vol_exports', mock.Mock(
341 return_value=["10.0.0.1"]))
342 _share = mock.Mock()
344 self._helper.update_access("/", _share, [],
345 [{'access_to': "10.0.0.1"}])
347 self._helper._get_vol_exports.assert_called_once_with()
348 argseq = [(NFS_RPC_AUTH_ALLOW, None), (NFS_RPC_AUTH_REJECT, "*")]
349 self.assertEqual(
350 [mock.call(*a) for a in argseq],
351 self._helper.gluster_manager.set_vol_option.call_args_list)
354class GaneshaNFSHelperTestCase(test.TestCase):
355 """Tests GaneshaNFSHelper."""
357 def setUp(self):
358 super(GaneshaNFSHelperTestCase, self).setUp()
359 self.gluster_manager = mock.Mock(**fake_gluster_manager_attrs)
360 self._execute = mock.Mock(return_value=('', ''))
361 self._root_execute = mock.Mock(return_value=('', ''))
362 self.access = fake_share.fake_access()
363 self.fake_conf = config.Configuration(None)
364 self.fake_template = {'key': 'value'}
365 self.share = fake_share.fake_share()
366 self.mock_object(glusterfs.ganesha_utils, 'RootExecutor',
367 mock.Mock(return_value=self._root_execute))
368 self.mock_object(glusterfs.ganesha.GaneshaNASHelper, '__init__',
369 mock.Mock())
370 socket.gethostname = mock.Mock(return_value='example.com')
371 self._helper = glusterfs.GaneshaNFSHelper(
372 self._execute, self.fake_conf,
373 gluster_manager=self.gluster_manager)
374 self._helper.tag = 'GLUSTER-Ganesha-localhost'
376 def test_init_local_ganesha_server(self):
377 glusterfs.ganesha_utils.RootExecutor.assert_called_once_with(
378 self._execute)
379 socket.gethostname.assert_has_calls([mock.call()])
380 glusterfs.ganesha.GaneshaNASHelper.__init__.assert_has_calls(
381 [mock.call(self._root_execute, self.fake_conf,
382 tag='GLUSTER-Ganesha-example.com')])
384 def test_get_export(self):
385 ret = self._helper.get_export(self.share)
387 self.assertEqual('example.com:/fakename--<access-id>', ret)
389 def test_init_remote_ganesha_server(self):
390 ssh_execute = mock.Mock(return_value=('', ''))
391 CONF.set_default('glusterfs_ganesha_server_ip', 'fakeip')
392 self.mock_object(glusterfs.ganesha_utils, 'SSHExecutor',
393 mock.Mock(return_value=ssh_execute))
394 glusterfs.GaneshaNFSHelper(
395 self._execute, self.fake_conf,
396 gluster_manager=self.gluster_manager)
397 glusterfs.ganesha_utils.SSHExecutor.assert_called_once_with(
398 'fakeip', 22, None, 'root', password=None, privatekey=None)
399 glusterfs.ganesha.GaneshaNASHelper.__init__.assert_has_calls(
400 [mock.call(ssh_execute, self.fake_conf,
401 tag='GLUSTER-Ganesha-fakeip')])
403 def test_init_helper(self):
404 ganeshelper = mock.Mock()
405 exptemp = mock.Mock()
407 def set_attributes(*a, **kw):
408 self._helper.ganesha = ganeshelper
409 self._helper.export_template = exptemp
411 self.mock_object(ganesha.GaneshaNASHelper, 'init_helper',
412 mock.Mock(side_effect=set_attributes))
413 self.assertEqual({}, glusterfs.GaneshaNFSHelper.shared_data)
415 self._helper.init_helper()
417 ganesha.GaneshaNASHelper.init_helper.assert_called_once_with()
418 self.assertEqual(ganeshelper, self._helper.ganesha)
419 self.assertEqual(exptemp, self._helper.export_template)
420 self.assertEqual({
421 'GLUSTER-Ganesha-localhost': {
422 'ganesha': ganeshelper,
423 'export_template': exptemp}},
424 glusterfs.GaneshaNFSHelper.shared_data)
426 other_helper = glusterfs.GaneshaNFSHelper(
427 self._execute, self.fake_conf,
428 gluster_manager=self.gluster_manager)
429 other_helper.tag = 'GLUSTER-Ganesha-localhost'
431 other_helper.init_helper()
433 self.assertEqual(ganeshelper, other_helper.ganesha)
434 self.assertEqual(exptemp, other_helper.export_template)
436 def test_default_config_hook(self):
437 fake_conf_dict = {'key': 'value1'}
438 mock_ganesha_utils_patch = mock.Mock()
440 def fake_patch_run(tmpl1, tmpl2):
441 mock_ganesha_utils_patch(
442 copy.deepcopy(tmpl1), tmpl2)
443 tmpl1.update(tmpl2)
445 self.mock_object(glusterfs.ganesha.GaneshaNASHelper,
446 '_default_config_hook',
447 mock.Mock(return_value=self.fake_template))
448 self.mock_object(glusterfs.ganesha_utils, 'path_from',
449 mock.Mock(return_value='/fakedir/glusterfs/conf'))
450 self.mock_object(self._helper, '_load_conf_dir',
451 mock.Mock(return_value=fake_conf_dict))
452 self.mock_object(glusterfs.ganesha_utils, 'patch',
453 mock.Mock(side_effect=fake_patch_run))
455 ret = self._helper._default_config_hook()
457 (glusterfs.ganesha.GaneshaNASHelper._default_config_hook.
458 assert_called_once_with())
459 glusterfs.ganesha_utils.path_from.assert_called_once_with(
460 glusterfs.__file__, 'conf')
461 self._helper._load_conf_dir.assert_called_once_with(
462 '/fakedir/glusterfs/conf')
463 glusterfs.ganesha_utils.patch.assert_called_once_with(
464 self.fake_template, fake_conf_dict)
465 self.assertEqual(fake_conf_dict, ret)
467 def test_fsal_hook(self):
468 self._helper.gluster_manager.path = '/fakename'
469 output = {
470 'Hostname': '127.0.0.1',
471 'Volume': 'testvol',
472 'Volpath': '/fakename'
473 }
475 ret = self._helper._fsal_hook('/fakepath', self.share, self.access)
477 self.assertEqual(output, ret)