Coverage for manila/tests/share/drivers/zfsonlinux/test_utils.py: 100%
213 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2016 Mirantis, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import time
17from unittest import mock
19import ddt
20from oslo_config import cfg
22from manila import exception
23from manila.share.drivers.ganesha import utils as ganesha_utils
24from manila.share.drivers.zfsonlinux import utils as zfs_utils
25from manila import test
27CONF = cfg.CONF
30def get_fake_configuration(*args, **kwargs):
31 fake_config_options = {
32 "zfs_use_ssh": kwargs.get("zfs_use_ssh", False),
33 "zfs_share_export_ip": kwargs.get(
34 "zfs_share_export_ip", "240.241.242.243"),
35 "zfs_service_ip": kwargs.get("zfs_service_ip", "240.241.242.244"),
36 "ssh_conn_timeout": kwargs.get("ssh_conn_timeout", 123),
37 "zfs_ssh_username": kwargs.get(
38 "zfs_ssh_username", 'fake_username'),
39 "zfs_ssh_user_password": kwargs.get(
40 "zfs_ssh_user_password", 'fake_pass'),
41 "zfs_ssh_private_key_path": kwargs.get(
42 "zfs_ssh_private_key_path", '/fake/path'),
43 "append_config_values": mock.Mock(),
44 }
45 return type("FakeConfig", (object, ), fake_config_options)
48class FakeShareDriver(zfs_utils.ExecuteMixin):
49 def __init__(self, *args, **kwargs):
50 self.configuration = get_fake_configuration(*args, **kwargs)
51 self.init_execute_mixin(*args, **kwargs)
54@ddt.ddt
55class ExecuteMixinTestCase(test.TestCase):
57 def setUp(self):
58 super(ExecuteMixinTestCase, self).setUp()
59 self.ssh_executor = self.mock_object(ganesha_utils, 'SSHExecutor')
60 self.driver = FakeShareDriver()
62 def test_init(self):
63 self.assertIsNone(self.driver.ssh_executor)
64 self.assertEqual(0, self.ssh_executor.call_count)
66 def test_init_ssh(self):
67 driver = FakeShareDriver(zfs_use_ssh=True)
69 self.assertIsNotNone(driver.ssh_executor)
70 self.ssh_executor.assert_called_once_with(
71 ip=driver.configuration.zfs_service_ip,
72 port=22,
73 conn_timeout=driver.configuration.ssh_conn_timeout,
74 login=driver.configuration.zfs_ssh_username,
75 password=driver.configuration.zfs_ssh_user_password,
76 privatekey=driver.configuration.zfs_ssh_private_key_path,
77 max_size=10,
78 )
80 def test_execute_with_provided_executor(self):
81 self.mock_object(self.driver, '_execute')
82 fake_executor = mock.Mock()
84 self.driver.execute('fake', '--foo', '--bar', executor=fake_executor)
86 self.assertFalse(self.driver._execute.called)
87 self.assertFalse(self.ssh_executor.called)
88 fake_executor.assert_called_once_with('fake', '--foo', '--bar')
90 def test_local_shell_execute(self):
91 self.mock_object(self.driver, '_execute')
93 self.driver.execute('fake', '--foo', '--bar')
95 self.assertEqual(0, self.ssh_executor.call_count)
96 self.driver._execute.assert_called_once_with(
97 'fake', '--foo', '--bar')
99 def test_local_shell_execute_with_sudo(self):
100 self.mock_object(self.driver, '_execute')
102 self.driver.execute('sudo', 'fake', '--foo', '--bar')
104 self.assertEqual(0, self.ssh_executor.call_count)
105 self.driver._execute.assert_called_once_with(
106 'fake', '--foo', '--bar', run_as_root=True)
108 def test_ssh_execute(self):
109 driver = FakeShareDriver(zfs_use_ssh=True)
111 self.mock_object(driver, '_execute')
113 driver.execute('fake', '--foo', '--bar')
115 self.assertEqual(0, driver._execute.call_count)
116 self.ssh_executor.return_value.assert_called_once_with(
117 'fake', '--foo', '--bar')
119 def test_ssh_execute_with_sudo(self):
120 driver = FakeShareDriver(zfs_use_ssh=True)
122 self.mock_object(driver, '_execute')
124 driver.execute('sudo', 'fake', '--foo', '--bar')
126 self.assertEqual(0, driver._execute.call_count)
127 self.ssh_executor.return_value.assert_called_once_with(
128 'fake', '--foo', '--bar', run_as_root=True)
130 def test_execute_with_retry(self):
131 self.mock_object(time, 'sleep')
132 self.mock_object(self.driver, 'execute', mock.Mock(
133 side_effect=[exception.ProcessExecutionError('FAKE'), None]))
134 self.driver.execute_with_retry('foo', 'bar')
136 self.assertEqual(2, self.driver.execute.call_count)
137 self.driver.execute.assert_has_calls(
138 [mock.call('foo', 'bar'), mock.call('foo', 'bar')])
140 def test_execute_with_retry_exceeded(self):
141 self.mock_object(time, 'sleep')
142 self.mock_object(self.driver, 'execute', mock.Mock(
143 side_effect=exception.ProcessExecutionError('FAKE')))
145 self.assertRaises(
146 exception.ProcessExecutionError,
147 self.driver.execute_with_retry,
148 'foo', 'bar',
149 )
151 self.assertEqual(36, self.driver.execute.call_count)
153 @ddt.data(True, False)
154 def test__get_option(self, pool_level):
155 out = """NAME PROPERTY VALUE SOURCE\n
156foo_resource_name bar_option_name some_value local"""
157 self.mock_object(
158 self.driver, '_execute', mock.Mock(return_value=(out, '')))
159 res_name = 'foo_resource_name'
160 opt_name = 'bar_option_name'
162 result = self.driver._get_option(
163 res_name, opt_name, pool_level=pool_level)
165 self.assertEqual('some_value', result)
166 self.driver._execute.assert_called_once_with(
167 'zpool' if pool_level else 'zfs', 'get', opt_name, res_name,
168 run_as_root=True)
170 def test_parse_zfs_answer(self):
171 not_parsed_str = ''
172 not_parsed_str = """NAME PROPERTY VALUE SOURCE\n
173foo_res opt_1 bar local
174foo_res opt_2 foo default
175foo_res opt_3 some_value local"""
176 expected = [
177 {'NAME': 'foo_res', 'PROPERTY': 'opt_1', 'VALUE': 'bar',
178 'SOURCE': 'local'},
179 {'NAME': 'foo_res', 'PROPERTY': 'opt_2', 'VALUE': 'foo',
180 'SOURCE': 'default'},
181 {'NAME': 'foo_res', 'PROPERTY': 'opt_3', 'VALUE': 'some_value',
182 'SOURCE': 'local'},
183 ]
185 result = self.driver.parse_zfs_answer(not_parsed_str)
187 self.assertEqual(expected, result)
189 def test_parse_zfs_answer_empty(self):
190 result = self.driver.parse_zfs_answer('')
192 self.assertEqual([], result)
194 def test_get_zpool_option(self):
195 self.mock_object(self.driver, '_get_option')
196 zpool_name = 'foo_resource_name'
197 opt_name = 'bar_option_name'
199 result = self.driver.get_zpool_option(zpool_name, opt_name)
201 self.assertEqual(self.driver._get_option.return_value, result)
202 self.driver._get_option.assert_called_once_with(
203 zpool_name, opt_name, True)
205 def test_get_zfs_option(self):
206 self.mock_object(self.driver, '_get_option')
207 dataset_name = 'foo_resource_name'
208 opt_name = 'bar_option_name'
210 result = self.driver.get_zfs_option(dataset_name, opt_name)
212 self.assertEqual(self.driver._get_option.return_value, result)
213 self.driver._get_option.assert_called_once_with(
214 dataset_name, opt_name, False)
216 def test_zfs(self):
217 self.mock_object(self.driver, 'execute')
218 self.mock_object(self.driver, 'execute_with_retry')
220 self.driver.zfs('foo', 'bar')
222 self.assertEqual(0, self.driver.execute_with_retry.call_count)
223 self.driver.execute.assert_called_once_with(
224 'sudo', 'zfs', 'foo', 'bar')
226 def test_zfs_with_retry(self):
227 self.mock_object(self.driver, 'execute')
228 self.mock_object(self.driver, 'execute_with_retry')
230 self.driver.zfs_with_retry('foo', 'bar')
232 self.assertEqual(0, self.driver.execute.call_count)
233 self.driver.execute_with_retry.assert_called_once_with(
234 'sudo', 'zfs', 'foo', 'bar')
237@ddt.ddt
238class NFSviaZFSHelperTestCase(test.TestCase):
240 def setUp(self):
241 super(NFSviaZFSHelperTestCase, self).setUp()
242 configuration = get_fake_configuration()
243 self.out = "fake_out"
244 self.mock_object(
245 zfs_utils.utils, "execute", mock.Mock(return_value=(self.out, "")))
246 self.helper = zfs_utils.NFSviaZFSHelper(configuration)
248 def test_init(self):
249 zfs_utils.utils.execute.assert_has_calls([
250 mock.call("which", "exportfs"),
251 mock.call("exportfs", run_as_root=True),
252 ])
254 def test_verify_setup_exportfs_not_installed(self):
255 zfs_utils.utils.execute.reset_mock()
256 zfs_utils.utils.execute.side_effect = [('', '')]
258 self.assertRaises(
259 exception.ZFSonLinuxException, self.helper.verify_setup)
261 zfs_utils.utils.execute.assert_called_once_with("which", "exportfs")
263 def test_verify_setup_error_calling_exportfs(self):
264 zfs_utils.utils.execute.reset_mock()
265 zfs_utils.utils.execute.side_effect = [
266 ('fake_out', ''), exception.ProcessExecutionError('Fake')]
268 self.assertRaises(
269 exception.ProcessExecutionError, self.helper.verify_setup)
271 zfs_utils.utils.execute.assert_has_calls([
272 mock.call("which", "exportfs"),
273 mock.call("exportfs", run_as_root=True),
274 ])
276 def test_is_kernel_version_true(self):
277 delattr(self.helper, '_is_kernel_version')
278 zfs_utils.utils.execute.reset_mock()
280 self.assertTrue(self.helper.is_kernel_version)
282 zfs_utils.utils.execute.assert_has_calls([
283 mock.call("modinfo", "zfs"),
284 ])
286 def test_is_kernel_version_false(self):
287 delattr(self.helper, '_is_kernel_version')
288 zfs_utils.utils.execute.reset_mock()
289 zfs_utils.utils.execute.side_effect = (
290 exception.ProcessExecutionError('Fake'))
292 self.assertFalse(self.helper.is_kernel_version)
294 zfs_utils.utils.execute.assert_has_calls([
295 mock.call("modinfo", "zfs"),
296 ])
298 def test_is_kernel_version_second_call(self):
299 delattr(self.helper, '_is_kernel_version')
300 zfs_utils.utils.execute.reset_mock()
302 self.assertTrue(self.helper.is_kernel_version)
303 self.assertTrue(self.helper.is_kernel_version)
305 zfs_utils.utils.execute.assert_has_calls([
306 mock.call("modinfo", "zfs"),
307 ])
309 def test_create_exports(self):
310 self.mock_object(self.helper, 'get_exports')
312 result = self.helper.create_exports('foo')
314 self.assertEqual(
315 self.helper.get_exports.return_value, result)
317 def test_get_exports(self):
318 self.mock_object(
319 self.helper, 'get_zfs_option', mock.Mock(return_value='fake_mp'))
320 expected = [
321 {
322 "path": "%s:fake_mp" % ip,
323 "metadata": {},
324 "is_admin_only": is_admin_only,
325 } for ip, is_admin_only in (
326 (self.helper.configuration.zfs_share_export_ip, False),
327 (self.helper.configuration.zfs_service_ip, True))
328 ]
330 result = self.helper.get_exports('foo')
332 self.assertEqual(expected, result)
333 self.helper.get_zfs_option.assert_called_once_with(
334 'foo', 'mountpoint', executor=None)
336 def test_remove_exports(self):
337 zfs_utils.utils.execute.reset_mock()
338 self.mock_object(
339 self.helper, 'get_zfs_option', mock.Mock(return_value='bar'))
341 self.helper.remove_exports('foo')
343 self.helper.get_zfs_option.assert_called_once_with(
344 'foo', 'sharenfs', executor=None)
345 zfs_utils.utils.execute.assert_called_once_with(
346 'zfs', 'set', 'sharenfs=off', 'foo', run_as_root=True)
348 def test_remove_exports_that_absent(self):
349 zfs_utils.utils.execute.reset_mock()
350 self.mock_object(
351 self.helper, 'get_zfs_option', mock.Mock(return_value='off'))
353 self.helper.remove_exports('foo')
355 self.helper.get_zfs_option.assert_called_once_with(
356 'foo', 'sharenfs', executor=None)
357 self.assertEqual(0, zfs_utils.utils.execute.call_count)
359 @ddt.data(
360 (('fake_modinfo_result', ''),
361 ('sharenfs=rw=1.1.1.1:3.3.3.0/255.255.255.0,no_root_squash,'
362 'ro=2.2.2.2,no_root_squash'), False),
363 (('fake_modinfo_result', ''),
364 ('sharenfs=ro=1.1.1.1:2.2.2.2:3.3.3.0/255.255.255.0,no_root_squash'),
365 True),
366 (exception.ProcessExecutionError('Fake'),
367 ('sharenfs=1.1.1.1:rw,no_root_squash 3.3.3.0/255.255.255.0:rw,'
368 'no_root_squash 2.2.2.2:ro,no_root_squash'), False),
369 (exception.ProcessExecutionError('Fake'),
370 ('sharenfs=1.1.1.1:ro,no_root_squash 2.2.2.2:ro,'
371 'no_root_squash 3.3.3.0/255.255.255.0:ro,no_root_squash'), True),
372 )
373 @ddt.unpack
374 def test_update_access_rw_and_ro(self, modinfo_response, access_str,
375 make_all_ro):
376 delattr(self.helper, '_is_kernel_version')
377 zfs_utils.utils.execute.reset_mock()
378 dataset_name = 'zpoolz/foo_dataset_name/fake'
379 zfs_utils.utils.execute.side_effect = [
380 modinfo_response,
381 ("""NAME USED AVAIL REFER MOUNTPOINT\n
382%(dn)s 2.58M 14.8G 27.5K /%(dn)s\n
383%(dn)s_some_other 3.58M 15.8G 28.5K /%(dn)s\n
384 """ % {'dn': dataset_name}, ''),
385 ('fake_set_opt_result', ''),
386 ("""NAME PROPERTY VALUE SOURCE\n
387%s mountpoint /%s default\n
388 """ % (dataset_name, dataset_name), ''),
389 ('fake_1_result', ''),
390 ('fake_2_result', ''),
391 ('fake_3_result', ''),
392 ('fake_4_result', ''),
393 ('fake_5_result', ''),
394 ]
395 access_rules = [
396 {'access_type': 'ip', 'access_level': 'rw',
397 'access_to': '1.1.1.1'},
398 {'access_type': 'ip', 'access_level': 'ro',
399 'access_to': '2.2.2.2'},
400 {'access_type': 'ip', 'access_level': 'rw',
401 'access_to': '3.3.3.0/24'},
402 ]
403 delete_rules = [
404 {'access_type': 'ip', 'access_level': 'rw',
405 'access_to': '4.4.4.4'},
406 {'access_type': 'ip', 'access_level': 'ro',
407 'access_to': '5.5.5.5/32'},
408 {'access_type': 'ip', 'access_level': 'ro',
409 'access_to': '5.5.5.6/16'},
410 {'access_type': 'ip', 'access_level': 'ro',
411 'access_to': '5.5.5.7/0'},
412 {'access_type': 'user', 'access_level': 'rw',
413 'access_to': '6.6.6.6'},
414 {'access_type': 'user', 'access_level': 'ro',
415 'access_to': '7.7.7.7'},
416 ]
418 self.helper.update_access(
419 dataset_name, access_rules, [], delete_rules,
420 make_all_ro=make_all_ro)
422 zfs_utils.utils.execute.assert_has_calls([
423 mock.call('modinfo', 'zfs'),
424 mock.call('zfs', 'list', '-r', 'zpoolz', run_as_root=True),
425 mock.call(
426 'zfs', 'set',
427 access_str,
428 dataset_name, run_as_root=True),
429 mock.call(
430 'zfs', 'get', 'mountpoint', dataset_name, run_as_root=True),
431 mock.call(
432 'exportfs', '-u', '4.4.4.4:/%s' % dataset_name,
433 run_as_root=True),
434 mock.call(
435 'exportfs', '-u', '5.5.5.5:/%s' % dataset_name,
436 run_as_root=True),
437 mock.call(
438 'exportfs', '-u', '5.5.5.6/255.255.0.0:/%s' % dataset_name,
439 run_as_root=True),
440 mock.call(
441 'exportfs', '-u', '5.5.5.7/0.0.0.0:/%s' % dataset_name,
442 run_as_root=True),
443 ])
445 def test_update_access_dataset_not_found(self):
446 self.mock_object(zfs_utils.LOG, 'warning')
447 zfs_utils.utils.execute.reset_mock()
448 dataset_name = 'zpoolz/foo_dataset_name/fake'
449 zfs_utils.utils.execute.side_effect = [
450 ('fake_modinfo_result', ''),
451 ('fake_dataset_not_found_result', ''),
452 ('fake_set_opt_result', ''),
453 ]
454 access_rules = [
455 {'access_type': 'ip', 'access_level': 'rw',
456 'access_to': '1.1.1.1'},
457 {'access_type': 'ip', 'access_level': 'ro',
458 'access_to': '1.1.1.2'},
459 ]
461 self.helper.update_access(dataset_name, access_rules, [], [])
463 zfs_utils.utils.execute.assert_has_calls([
464 mock.call('zfs', 'list', '-r', 'zpoolz', run_as_root=True),
465 ])
466 zfs_utils.LOG.warning.assert_called_once_with(
467 mock.ANY, {'name': dataset_name})
469 @ddt.data(exception.ProcessExecutionError('Fake'), ('Ok', ''))
470 def test_update_access_no_rules(self, first_execute_result):
471 zfs_utils.utils.execute.reset_mock()
472 dataset_name = 'zpoolz/foo_dataset_name/fake'
473 zfs_utils.utils.execute.side_effect = [
474 ("""NAME USED AVAIL REFER MOUNTPOINT\n
475%s 2.58M 14.8G 27.5K /%s\n
476 """ % (dataset_name, dataset_name), ''),
477 ('fake_set_opt_result', ''),
478 ]
480 self.helper.update_access(dataset_name, [], [], [])
482 zfs_utils.utils.execute.assert_has_calls([
483 mock.call('zfs', 'list', '-r', 'zpoolz', run_as_root=True),
484 mock.call('zfs', 'set', 'sharenfs=off', dataset_name,
485 run_as_root=True),
486 ])
488 @ddt.data('user', 'cert', 'cephx', '', 'fake', 'i', 'p')
489 def test_update_access_not_ip_access_type(self, access_type):
490 zfs_utils.utils.execute.reset_mock()
491 dataset_name = 'zpoolz/foo_dataset_name/fake'
492 access_rules = [
493 {'access_type': access_type, 'access_level': 'rw',
494 'access_to': '1.1.1.1'},
495 {'access_type': 'ip', 'access_level': 'ro',
496 'access_to': '1.1.1.2'},
497 ]
499 self.assertRaises(
500 exception.InvalidShareAccess,
501 self.helper.update_access,
502 dataset_name, access_rules, access_rules, [],
503 )
505 self.assertEqual(0, zfs_utils.utils.execute.call_count)
507 @ddt.data('', 'r', 'o', 'w', 'fake', 'su')
508 def test_update_access_neither_rw_nor_ro_access_level(self, access_level):
509 zfs_utils.utils.execute.reset_mock()
510 dataset_name = 'zpoolz/foo_dataset_name/fake'
511 access_rules = [
512 {'access_type': 'ip', 'access_level': access_level,
513 'access_to': '1.1.1.1'},
514 {'access_type': 'ip', 'access_level': 'ro',
515 'access_to': '1.1.1.2'},
516 ]
518 self.assertRaises(
519 exception.InvalidShareAccess,
520 self.helper.update_access,
521 dataset_name, access_rules, access_rules, [],
522 )
524 self.assertEqual(0, zfs_utils.utils.execute.call_count)