Coverage for manila/tests/share/drivers/inspur/instorage/test_instorage.py: 100%
677 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2019 Inspur Corp.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""
17Share driver test for Inspur InStorage
18"""
20from unittest import mock
22import ddt
23from oslo_concurrency import processutils
24from oslo_config import cfg
25import paramiko
27from manila import context
28from manila import exception
29from manila.share import driver
30from manila.share.drivers.inspur.instorage import cli_helper
31from manila.share.drivers.inspur.instorage import instorage
32from manila import ssh_utils
33from manila import test
34from manila.tests import fake_share
35from manila import utils as manila_utils
37CONF = cfg.CONF
40class FakeConfig(object):
41 def __init__(self, *args, **kwargs):
42 self.driver_handles_share_servers = False
43 self.share_driver = 'fake_share_driver_name'
44 self.share_backend_name = 'fake_instorage'
45 self.instorage_nas_ip = kwargs.get(
46 'instorage_nas_ip', 'some_ip')
47 self.instorage_nas_port = kwargs.get(
48 'instorage_nas_port', 'some_port')
49 self.instorage_nas_login = kwargs.get(
50 'instorage_nas_login', 'username')
51 self.instorage_nas_password = kwargs.get(
52 'instorage_nas_password', 'password')
53 self.instorage_nas_pools = kwargs.get(
54 'instorage_nas_pools', ['fakepool'])
55 self.network_config_group = kwargs.get(
56 "network_config_group", "fake_network_config_group")
57 self.admin_network_config_group = kwargs.get(
58 "admin_network_config_group", "fake_admin_network_config_group")
59 self.config_group = kwargs.get("config_group", "fake_config_group")
60 self.reserved_share_percentage = kwargs.get(
61 "reserved_share_percentage", 0)
62 self.reserved_share_from_snapshot_percentage = kwargs.get(
63 "reserved_share_from_snapshot_percentage", 0)
64 self.reserved_share_extend_percentage = kwargs.get(
65 "reserved_share_extend_percentage", 0)
66 self.max_over_subscription_ratio = kwargs.get(
67 "max_over_subscription_ratio", 0)
68 self.filter_function = kwargs.get("filter_function", None)
69 self.goodness_function = kwargs.get("goodness_function", None)
71 def safe_get(self, key):
72 return getattr(self, key)
74 def append_config_values(self, *args, **kwargs):
75 pass
78@ddt.ddt
79class InStorageShareDriverTestCase(test.TestCase):
80 def __init__(self, *args, **kwargs):
81 super(InStorageShareDriverTestCase, self).__init__(*args, **kwargs)
82 self._ctxt = context.get_admin_context()
83 self.configuration = FakeConfig()
84 self.share = fake_share.fake_share()
85 self.share_instance = fake_share.fake_share_instance(
86 self.share, host='H@B#P'
87 )
89 def setUp(self):
90 self.mock_object(instorage.CONF, '_check_required_opts')
91 self.driver = instorage.InStorageShareDriver(
92 configuration=self.configuration
93 )
94 super(InStorageShareDriverTestCase, self).setUp()
96 def test_check_for_setup_error_failed_no_nodes(self):
97 mock_gni = mock.Mock(return_value={})
98 self.mock_object(
99 instorage.InStorageAssistant, 'get_nodes_info', mock_gni
100 )
102 self.assertRaises(
103 exception.ShareBackendException,
104 self.driver.check_for_setup_error
105 )
107 def test_check_for_setup_error_failed_pool_invalid(self):
108 mock_gni = mock.Mock(return_value={'node1': {}})
109 self.mock_object(
110 instorage.InStorageAssistant, 'get_nodes_info', mock_gni
111 )
112 mock_gap = mock.Mock(return_value=['pool0'])
113 self.mock_object(
114 instorage.InStorageAssistant, 'get_available_pools', mock_gap
115 )
117 self.assertRaises(
118 exception.InvalidParameterValue,
119 self.driver.check_for_setup_error
120 )
122 def test_check_for_setup_error_success(self):
123 mock_gni = mock.Mock(return_value={'node1': {}})
124 self.mock_object(
125 instorage.InStorageAssistant, 'get_nodes_info', mock_gni
126 )
127 mock_gap = mock.Mock(return_value=['fakepool', 'pool0'])
128 self.mock_object(
129 instorage.InStorageAssistant, 'get_available_pools', mock_gap
130 )
132 self.driver.check_for_setup_error()
133 mock_gni.assert_called_once()
134 mock_gap.assert_called_once()
136 def test__update_share_stats(self):
137 pool_attr = {
138 'pool0': {
139 'pool_name': 'pool0',
140 'total_capacity_gb': 110,
141 'free_capacity_gb': 100,
142 'allocated_capacity_gb': 10,
143 'reserved_percentage': 0,
144 'reserved_snapshot_percentage': 0,
145 'reserved_share_extend_percentage': 0,
146 'qos': False,
147 'dedupe': False,
148 'compression': False,
149 'thin_provisioning': False,
150 'max_over_subscription_ratio': 0
151 }
152 }
153 mock_gpa = mock.Mock(return_value=pool_attr)
154 self.mock_object(
155 instorage.InStorageAssistant, 'get_pools_attr', mock_gpa
156 )
157 mock_uss = mock.Mock()
158 self.mock_object(driver.ShareDriver, '_update_share_stats', mock_uss)
160 self.driver._update_share_stats()
162 mock_gpa.assert_called_once_with(['fakepool'])
163 stats = {
164 'share_backend_name': 'fake_instorage',
165 'vendor_name': 'INSPUR',
166 'driver_version': '1.0.0',
167 'storage_protocol': 'NFS_CIFS',
168 'reserved_percentage': 0,
169 'reserved_snapshot_percentage': 0,
170 'reserved_share_extend_percentage': 0,
171 'max_over_subscription_ratio': 0,
172 'snapshot_support': False,
173 'create_share_from_snapshot_support': False,
174 'revert_to_snapshot_support': False,
175 'qos': False,
176 'total_capacity_gb': 110,
177 'free_capacity_gb': 100,
178 'pools': [pool_attr['pool0']]
179 }
180 mock_uss.assert_called_once_with(stats)
182 @ddt.data(
183 {'id': 'abc-123', 'real': 'abc123'},
184 {'id': '123-abc', 'real': 'B23abc'})
185 @ddt.unpack
186 def test_generate_share_name(self, id, real):
187 ret = self.driver.generate_share_name({'id': id})
188 self.assertEqual(real, ret)
190 def test_get_network_allocations_number(self):
191 ret = self.driver.get_network_allocations_number()
192 self.assertEqual(0, ret)
194 def test_create_share(self):
195 mock_cs = self.mock_object(
196 instorage.InStorageAssistant, 'create_share'
197 )
198 mock_gel = self.mock_object(
199 instorage.InStorageAssistant,
200 'get_export_locations',
201 mock.Mock(return_value=['fake_export_location'])
202 )
204 ret = self.driver.create_share(self._ctxt, self.share_instance)
206 self.assertEqual(['fake_export_location'], ret)
207 mock_cs.assert_called_once_with('fakeinstanceid', 'P', 1, 'fake_proto')
208 mock_gel.assert_called_once_with('fakeinstanceid', 'fake_proto')
210 def test_delete_share(self):
211 mock_ds = self.mock_object(
212 instorage.InStorageAssistant, 'delete_share'
213 )
215 self.driver.delete_share(self._ctxt, self.share_instance)
217 mock_ds.assert_called_once_with('fakeinstanceid', 'fake_proto')
219 def test_extend_share(self):
220 mock_es = self.mock_object(
221 instorage.InStorageAssistant, 'extend_share'
222 )
224 self.driver.extend_share(self.share_instance, 3)
226 mock_es.assert_called_once_with('fakeinstanceid', 3)
228 def test_ensure_share(self):
229 mock_gel = self.mock_object(
230 instorage.InStorageAssistant,
231 'get_export_locations',
232 mock.Mock(return_value=['fake_export_location'])
233 )
235 ret = self.driver.ensure_share(self._ctxt, self.share_instance)
237 self.assertEqual(['fake_export_location'], ret)
238 mock_gel.assert_called_once_with('fakeinstanceid', 'fake_proto')
240 def test_update_access(self):
241 mock_ua = self.mock_object(
242 instorage.InStorageAssistant, 'update_access'
243 )
245 self.driver.update_access(
246 self._ctxt, self.share_instance, [], [], [], [])
248 mock_ua.assert_called_once_with(
249 'fakeinstanceid', 'fake_proto', [], [], []
250 )
253class FakeSSH(object):
254 def __enter__(self):
255 return self
257 def __exit__(self, exec_type, exec_val, exec_tb):
258 if exec_val:
259 raise
262class FakeSSHPool(object):
263 def __init__(self, ssh):
264 self.fakessh = ssh
266 def item(self):
267 return self.fakessh
270class SSHRunnerTestCase(test.TestCase):
271 def setUp(self):
272 self.fakessh = FakeSSH()
273 self.fakePool = FakeSSHPool(self.fakessh)
274 super(SSHRunnerTestCase, self).setUp()
276 def test___call___success(self):
277 mock_csi = self.mock_object(manila_utils, 'check_ssh_injection')
278 mock_sshpool = mock.Mock(return_value=self.fakePool)
279 self.mock_object(ssh_utils, 'SSHPool', mock_sshpool)
280 mock_se = mock.Mock(return_value='fake_value')
281 self.mock_object(cli_helper.SSHRunner, '_ssh_execute', mock_se)
283 runner = cli_helper.SSHRunner(
284 '127.0.0.1', '22', 'fakeuser', 'fakepassword'
285 )
286 ret = runner(['mcsinq', 'lsvdisk'])
288 mock_csi.assert_called_once_with(['mcsinq', 'lsvdisk'])
289 mock_sshpool.assert_called_once_with(
290 '127.0.0.1', '22', 60, 'fakeuser',
291 password='fakepassword',
292 privatekey=None,
293 min_size=1,
294 max_size=10
295 )
296 mock_se.assert_called_once_with(
297 self.fakePool,
298 'mcsinq lsvdisk',
299 True,
300 1
301 )
302 self.assertEqual('fake_value', ret)
304 def test___call___ssh_pool_failed(self):
305 mock_csi = self.mock_object(manila_utils, 'check_ssh_injection')
306 mock_sshpool = mock.Mock(side_effect=paramiko.SSHException())
307 self.mock_object(ssh_utils, 'SSHPool', mock_sshpool)
309 runner = cli_helper.SSHRunner(
310 '127.0.0.1', '22', 'fakeuser', 'fakepassword'
311 )
313 self.assertRaises(paramiko.SSHException, runner, ['mcsinq', 'lsvdisk'])
314 mock_csi.assert_called_once_with(['mcsinq', 'lsvdisk'])
316 def test___call___ssh_exec_failed(self):
317 mock_csi = self.mock_object(manila_utils, 'check_ssh_injection')
318 mock_sshpool = mock.Mock(return_value=self.fakePool)
319 self.mock_object(ssh_utils, 'SSHPool', mock_sshpool)
320 exception = processutils.ProcessExecutionError()
321 mock_se = mock.Mock(side_effect=exception)
322 self.mock_object(cli_helper.SSHRunner, '_ssh_execute', mock_se)
324 runner = cli_helper.SSHRunner(
325 '127.0.0.1', '22', 'fakeuser', 'fakepassword'
326 )
328 self.assertRaises(
329 processutils.ProcessExecutionError,
330 runner,
331 ['mcsinq', 'lsvdisk']
332 )
333 mock_csi.assert_called_once_with(['mcsinq', 'lsvdisk'])
334 mock_sshpool.assert_called_once_with(
335 '127.0.0.1', '22', 60, 'fakeuser',
336 password='fakepassword',
337 privatekey=None,
338 min_size=1,
339 max_size=10
340 )
342 def test__ssh_execute_success(self):
343 mock_se = mock.Mock(return_value='fake_value')
344 self.mock_object(processutils, 'ssh_execute', mock_se)
346 runner = cli_helper.SSHRunner(
347 '127.0.0.1', '22', 'fakeuser', 'fakepassword'
348 )
349 ret = runner._ssh_execute(self.fakePool, 'mcsinq lsvdisk')
351 mock_se.assert_called_once_with(
352 self.fakessh,
353 'mcsinq lsvdisk',
354 check_exit_code=True
355 )
356 self.assertEqual('fake_value', ret)
358 @mock.patch('time.sleep')
359 def test__ssh_execute_success_run_again(self, mock_sleep):
360 mock_se = mock.Mock(side_effect=[Exception(), 'fake_value'])
361 self.mock_object(processutils, 'ssh_execute', mock_se)
363 runner = cli_helper.SSHRunner(
364 '127.0.0.1', '22', 'fakeuser', 'fakepassword'
365 )
366 ret = runner._ssh_execute(
367 self.fakePool,
368 'mcsinq lsvdisk',
369 check_exit_code=True,
370 attempts=2
371 )
373 call = mock.call(self.fakessh, 'mcsinq lsvdisk', check_exit_code=True)
374 mock_se.assert_has_calls([call, call])
375 mock_sleep.assert_called_once_with(1)
376 self.assertEqual('fake_value', ret)
378 @mock.patch('time.sleep')
379 def test__ssh_execute_failed_exec_failed(self, mock_sleep):
380 exception = Exception()
381 exception.exit_code = '1'
382 exception.stdout = 'fake_stdout'
383 exception.stderr = 'fake_stderr'
384 exception.cmd = 'fake_cmd_list'
385 mock_se = mock.Mock(side_effect=exception)
386 self.mock_object(processutils, 'ssh_execute', mock_se)
388 runner = cli_helper.SSHRunner(
389 '127.0.0.1', '22', 'fakeuser', 'fakepassword'
390 )
392 self.assertRaises(
393 processutils.ProcessExecutionError,
394 runner._ssh_execute,
395 self.fakePool,
396 'mcsinq lsvdisk',
397 check_exit_code=True,
398 attempts=1
399 )
400 mock_se.assert_called_once_with(
401 self.fakessh,
402 'mcsinq lsvdisk',
403 check_exit_code=True
404 )
405 mock_sleep.assert_called_once_with(1)
407 @mock.patch('time.sleep')
408 def test__ssh_execute_failed_exec_failed_exception_error(self, mock_sleep):
409 mock_se = mock.Mock(side_effect=Exception())
410 self.mock_object(processutils, 'ssh_execute', mock_se)
412 runner = cli_helper.SSHRunner(
413 '127.0.0.1', '22', 'fakeuser', 'fakepassword'
414 )
416 self.assertRaises(
417 processutils.ProcessExecutionError,
418 runner._ssh_execute,
419 self.fakePool,
420 'mcsinq lsvdisk',
421 check_exit_code=True,
422 attempts=1
423 )
424 mock_se.assert_called_once_with(
425 self.fakessh,
426 'mcsinq lsvdisk',
427 check_exit_code=True
428 )
429 mock_sleep.assert_called_once_with(1)
432class CLIParserTestCase(test.TestCase):
433 def test_cliparser_with_header(self):
434 cmdlist = ['mcsinq', 'lsnasportip', '-delim', '!']
435 response = [
436 'head1!head2',
437 'r1c1!r1c2',
438 'r2c1!r2c2'
439 ]
440 response = '\n'.join(response)
442 ret = cli_helper.CLIParser(
443 response, cmdlist, delim='!', with_header=True
444 )
446 self.assertEqual(2, len(ret))
447 self.assertEqual('r1c1', ret[0]['head1'])
448 self.assertEqual('r1c2', ret[0]['head2'])
449 self.assertEqual('r2c1', ret[1]['head1'])
450 self.assertEqual('r2c2', ret[1]['head2'])
452 value = [(v['head1'], v['head2']) for v in ret]
453 self.assertEqual([('r1c1', 'r1c2'), ('r2c1', 'r2c2')], value)
455 def test_cliparser_without_header(self):
456 cmdlist = ['mcsinq', 'lsnasportip', '-delim', '!']
457 response = [
458 'head1!p1v1',
459 'head2!p1v2',
460 '',
461 'head1!p2v1',
462 'head2!p2v2'
463 ]
464 response = '\n'.join(response)
466 ret = cli_helper.CLIParser(
467 response, cmdlist, delim='!', with_header=False
468 )
470 self.assertEqual(2, len(ret))
471 self.assertEqual('p1v1', ret[0]['head1'])
472 self.assertEqual('p1v2', ret[0]['head2'])
473 self.assertEqual('p2v1', ret[1]['head1'])
474 self.assertEqual('p2v2', ret[1]['head2'])
477@ddt.ddt
478class InStorageSSHTestCase(test.TestCase):
479 def setUp(self):
480 self.sshMock = mock.Mock()
481 self.ssh = cli_helper.InStorageSSH(self.sshMock)
482 super(InStorageSSHTestCase, self).setUp()
484 def tearDown(self):
485 super(InStorageSSHTestCase, self).tearDown()
487 @ddt.data(None, 'node1')
488 def test_lsnode(self, node_id):
489 if node_id:
490 cmd = ['mcsinq', 'lsnode', '-delim', '!', node_id]
491 response = [
492 'id!1',
493 'name!node1'
494 ]
495 else:
496 cmd = ['mcsinq', 'lsnode', '-delim', '!']
497 response = [
498 'id!name',
499 '1!node1',
500 '2!node2'
501 ]
503 response = '\n'.join(response)
504 self.sshMock.return_value = (response, '')
506 ret = self.ssh.lsnode(node_id)
508 if node_id:
509 self.sshMock.assert_called_once_with(cmd)
510 self.assertEqual('node1', ret[0]['name'])
511 else:
512 self.sshMock.assert_called_once_with(cmd)
513 self.assertEqual('node1', ret[0]['name'])
514 self.assertEqual('node2', ret[1]['name'])
516 @ddt.data(None, 'Pool0')
517 def test_lsnaspool(self, pool_id):
518 response = [
519 'pool_name!available_capacity',
520 'Pool0!2GB'
521 ]
522 if pool_id is None:
523 response.append('Pool1!3GB')
525 response = '\n'.join(response)
526 self.sshMock.return_value = (response, '')
528 ret = self.ssh.lsnaspool(pool_id)
530 if pool_id is None:
531 cmd = ['mcsinq', 'lsnaspool', '-delim', '!']
532 self.sshMock.assert_called_once_with(cmd)
533 self.assertEqual('Pool0', ret[0]['pool_name'])
534 self.assertEqual('2GB', ret[0]['available_capacity'])
535 self.assertEqual('Pool1', ret[1]['pool_name'])
536 self.assertEqual('3GB', ret[1]['available_capacity'])
537 else:
538 cmd = ['mcsinq', 'lsnaspool', '-delim', '!', pool_id]
539 self.sshMock.assert_called_once_with(cmd)
540 self.assertEqual('Pool0', ret[0]['pool_name'])
541 self.assertEqual('2GB', ret[0]['available_capacity'])
543 @ddt.data({'node_name': 'node1', 'fsname': 'fs1'},
544 {'node_name': 'node1', 'fsname': None},
545 {'node_name': None, 'fsname': 'fs1'},
546 {'node_name': None, 'fsname': None})
547 @ddt.unpack
548 def test_lsfs(self, node_name, fsname):
549 response = [
550 'pool_name!fs_name!total_capacity!used_capacity',
551 'pool0!fs0!10GB!1GB',
552 'pool1!fs1!8GB!3GB'
553 ]
554 response = '\n'.join(response)
555 self.sshMock.return_value = (response, '')
557 if fsname and not node_name:
558 self.assertRaises(exception.InvalidParameterValue,
559 self.ssh.lsfs,
560 node_name=node_name,
561 fsname=fsname)
562 else:
563 ret = self.ssh.lsfs(node_name, fsname)
565 cmdlist = []
566 if node_name and not fsname:
567 cmdlist = ['mcsinq', 'lsfs', '-delim', '!', '-node', '"node1"']
568 elif node_name and fsname:
569 cmdlist = ['mcsinq', 'lsfs', '-delim', '!',
570 '-node', '"node1"', '-name', '"fs1"']
571 else:
572 cmdlist = ['mcsinq', 'lsfs', '-delim', '!', '-all']
574 self.sshMock.assert_called_once_with(cmdlist)
575 self.assertEqual('pool0', ret[0]['pool_name'])
576 self.assertEqual('fs0', ret[0]['fs_name'])
577 self.assertEqual('10GB', ret[0]['total_capacity'])
578 self.assertEqual('1GB', ret[0]['used_capacity'])
579 self.assertEqual('pool1', ret[1]['pool_name'])
580 self.assertEqual('fs1', ret[1]['fs_name'])
581 self.assertEqual('8GB', ret[1]['total_capacity'])
582 self.assertEqual('3GB', ret[1]['used_capacity'])
584 def test_addfs(self):
585 self.sshMock.return_value = ('', '')
587 self.ssh.addfs('fsname', 'fake_pool', 1, 'node1')
589 cmdlist = ['mcsop', 'addfs', '-name', '"fsname"',
590 '-pool', '"fake_pool"', '-size', '1g', '-node', '"node1"']
591 self.sshMock.assert_called_once_with(cmdlist)
593 def test_rmfs(self):
594 self.sshMock.return_value = ('', '')
596 self.ssh.rmfs('fsname')
598 cmdlist = ['mcsop', 'rmfs', '-name', '"fsname"']
599 self.sshMock.assert_called_once_with(cmdlist)
601 def test_expandfs(self):
602 self.sshMock.return_value = ('', '')
604 self.ssh.expandfs('fsname', 2)
606 cmdlist = ['mcsop', 'expandfs', '-name', '"fsname"', '-size', '2g']
607 self.sshMock.assert_called_once_with(cmdlist)
609 def test_lsnasdir(self):
610 response = [
611 'parent_dir!name',
612 '/fs/test_01!share_01'
613 ]
615 response = '\n'.join(response)
616 self.sshMock.return_value = (response, '')
618 ret = self.ssh.lsnasdir('/fs/test_01')
620 cmdlist = ['mcsinq', 'lsnasdir', '-delim', '!', '"/fs/test_01"']
621 self.sshMock.assert_called_once_with(cmdlist)
622 self.assertEqual('/fs/test_01', ret[0]['parent_dir'])
623 self.assertEqual('share_01', ret[0]['name'])
625 def test_addnasdir(self):
626 self.sshMock.return_value = ('', '')
628 self.ssh.addnasdir('/fs/test_01/share_01')
630 cmdlist = ['mcsop', 'addnasdir', '"/fs/test_01/share_01"']
631 self.sshMock.assert_called_once_with(cmdlist)
633 def test_chnasdir(self):
634 self.sshMock.return_value = ('', '')
636 self.ssh.chnasdir('/fs/test_01/share_01', '/fs/test_01/share_02')
638 cmdlist = ['mcsop', 'chnasdir', '-oldpath', '"/fs/test_01/share_01"',
639 '-newpath', '"/fs/test_01/share_02"']
640 self.sshMock.assert_called_once_with(cmdlist)
642 def test_rmnasdir(self):
643 self.sshMock.return_value = ('', '')
645 self.ssh.rmnasdir('/fs/test_01/share_01')
647 cmdlist = ['mcsop', 'rmnasdir', '"/fs/test_01/share_01"']
648 self.sshMock.assert_called_once_with(cmdlist)
650 def test_rmnfs(self):
651 self.sshMock.return_value = ('', '')
653 self.ssh.rmnfs('/fs/test_01/share_01')
655 cmdlist = ['mcsop', 'rmnfs', '"/fs/test_01/share_01"']
656 self.sshMock.assert_called_once_with(cmdlist)
658 @ddt.data(None, '/fs/test_01')
659 def test_lsnfslist(self, prefix):
660 cmdlist = ['mcsinq', 'lsnfslist', '-delim', '!']
661 if prefix:
662 cmdlist.append('"/fs/test_01"')
663 response = '\n'.join([
664 'path',
665 '/fs/test_01/share_01',
666 '/fs/test_01/share_02'
667 ])
668 self.sshMock.return_value = (response, '')
670 ret = self.ssh.lsnfslist(prefix)
671 self.sshMock.assert_called_once_with(cmdlist)
672 self.assertEqual('/fs/test_01/share_01', ret[0]['path'])
673 self.assertEqual('/fs/test_01/share_02', ret[1]['path'])
675 def test_lsnfsinfo(self):
676 cmdlist = [
677 'mcsinq', 'lsnfsinfo', '-delim', '!', '"/fs/test_01/share_01"'
678 ]
679 response = '\n'.join([
680 'ip!mask!rights!root_squash!all_squash',
681 '192.168.1.0!255.255.255.0!rw!root_squash!all_squash'
682 ])
683 self.sshMock.return_value = (response, '')
685 ret = self.ssh.lsnfsinfo('/fs/test_01/share_01')
687 self.sshMock.assert_called_once_with(cmdlist)
688 self.assertEqual('192.168.1.0', ret[0]['ip'])
689 self.assertEqual('255.255.255.0', ret[0]['mask'])
690 self.assertEqual('rw', ret[0]['rights'])
692 def test_addnfsclient(self):
693 self.sshMock.return_value = ('', '')
695 cmdlist = [
696 'mcsop', 'addnfsclient', '-path', '"/fs/test_01/share_01"',
697 '-client', '192.168.1.0/255.255.255.0:rw:ALL_SQUASH:ROOT_SQUASH'
698 ]
700 self.ssh.addnfsclient(
701 '/fs/test_01/share_01',
702 '192.168.1.0/255.255.255.0:rw:ALL_SQUASH:ROOT_SQUASH'
703 )
705 self.sshMock.assert_called_once_with(cmdlist)
707 def test_chnfsclient(self):
708 self.sshMock.return_value = ('', '')
710 cmdlist = [
711 'mcsop', 'chnfsclient', '-path', '"/fs/test_01/share_01"',
712 '-client', '192.168.1.0/255.255.255.0:rw:ALL_SQUASH:ROOT_SQUASH'
713 ]
715 self.ssh.chnfsclient(
716 '/fs/test_01/share_01',
717 '192.168.1.0/255.255.255.0:rw:ALL_SQUASH:ROOT_SQUASH'
718 )
720 self.sshMock.assert_called_once_with(cmdlist)
722 def test_rmnfsclient(self):
723 self.sshMock.return_value = ('', '')
725 cmdlist = [
726 'mcsop', 'rmnfsclient', '-path', '"/fs/test_01/share_01"',
727 '-client', '192.168.1.0/255.255.255.0'
728 ]
730 self.ssh.rmnfsclient(
731 '/fs/test_01/share_01',
732 '192.168.1.0/255.255.255.0:rw:ALL_SQUASH:ROOT_SQUASH'
733 )
735 self.sshMock.assert_called_once_with(cmdlist)
737 @ddt.data(None, 'cifs')
738 def test_lscifslist(self, filter):
739 cmdlist = ['mcsinq', 'lscifslist', '-delim', '!']
740 if filter:
741 cmdlist.append('"%s"' % filter)
742 response = '\n'.join([
743 'name!path',
744 'cifs!/fs/test_01/share_01'
745 ])
746 self.sshMock.return_value = (response, '')
748 ret = self.ssh.lscifslist(filter)
750 self.sshMock.assert_called_once_with(cmdlist)
751 self.assertEqual('cifs', ret[0]['name'])
752 self.assertEqual('/fs/test_01/share_01', ret[0]['path'])
754 def test_lscifsinfo(self):
755 cmdlist = ['mcsinq', 'lscifsinfo', '-delim', '!', '"cifs"']
756 response = '\n'.join([
757 'path!oplocks!type!name!rights',
758 '/fs/test_01/share_01!on!LU!user1!rw'
759 ])
760 self.sshMock.return_value = (response, '')
762 ret = self.ssh.lscifsinfo('cifs')
764 self.sshMock.assert_called_once_with(cmdlist)
765 self.assertEqual('/fs/test_01/share_01', ret[0]['path'])
766 self.assertEqual('on', ret[0]['oplocks'])
767 self.assertEqual('LU', ret[0]['type'])
768 self.assertEqual('user1', ret[0]['name'])
769 self.assertEqual('rw', ret[0]['rights'])
771 def test_addcifs(self):
772 self.sshMock.return_value = ('', '')
774 cmdlist = [
775 'mcsop', 'addcifs', '-name', 'cifs',
776 '-path', '/fs/test_01/share_01', '-oplocks', 'off'
777 ]
779 self.ssh.addcifs('cifs', '/fs/test_01/share_01', 'off')
780 self.sshMock.assert_called_once_with(cmdlist)
782 def test_rmcifs(self):
783 self.sshMock.return_value = ('', '')
785 cmdlist = ['mcsop', 'rmcifs', 'cifs']
787 self.ssh.rmcifs('cifs')
788 self.sshMock.assert_called_once_with(cmdlist)
790 def test_chcifs(self):
791 self.sshMock.return_value = ('', '')
793 cmdlist = ['mcsop', 'chcifs', '-name', 'cifs', '-oplocks', 'off']
795 self.ssh.chcifs('cifs', 'off')
796 self.sshMock.assert_called_once_with(cmdlist)
798 def test_addcifsuser(self):
799 self.sshMock.return_value = ('', '')
801 cmdlist = [
802 'mcsop', 'addcifsuser', '-name', 'cifs', '-rights', 'LU:user1:rw'
803 ]
805 self.ssh.addcifsuser('cifs', 'LU:user1:rw')
806 self.sshMock.assert_called_once_with(cmdlist)
808 def test_chcifsuser(self):
809 self.sshMock.return_value = ('', '')
811 cmdlist = [
812 'mcsop', 'chcifsuser', '-name', 'cifs', '-rights', 'LU:user1:rw'
813 ]
815 self.ssh.chcifsuser('cifs', 'LU:user1:rw')
816 self.sshMock.assert_called_once_with(cmdlist)
818 def test_rmcifsuser(self):
819 self.sshMock.return_value = ('', '')
821 cmdlist = [
822 'mcsop', 'rmcifsuser', '-name', 'cifs', '-rights', 'LU:user1'
823 ]
825 self.ssh.rmcifsuser('cifs', 'LU:user1:rw')
826 self.sshMock.assert_called_once_with(cmdlist)
828 def test_lsnasportip(self):
829 cmdlist = ['mcsinq', 'lsnasportip', '-delim', '!']
830 response = '\n'.join([
831 'node_name!id!ip!mask!gw!link_state',
832 'node1!1!192.168.10.1!255.255.255.0!192.168.10.254!active',
833 'node2!1!192.168.10.2!255.255.255.0!192.168.10.254!inactive'
834 ])
836 self.sshMock.return_value = (response, '')
838 ret = self.ssh.lsnasportip()
840 self.sshMock.assert_called_once_with(cmdlist)
841 self.assertEqual('node1', ret[0]['node_name'])
842 self.assertEqual('1', ret[0]['id'])
843 self.assertEqual('192.168.10.1', ret[0]['ip'])
844 self.assertEqual('255.255.255.0', ret[0]['mask'])
845 self.assertEqual('192.168.10.254', ret[0]['gw'])
846 self.assertEqual('active', ret[0]['link_state'])
847 self.assertEqual('node2', ret[1]['node_name'])
848 self.assertEqual('1', ret[1]['id'])
849 self.assertEqual('192.168.10.2', ret[1]['ip'])
850 self.assertEqual('255.255.255.0', ret[1]['mask'])
851 self.assertEqual('192.168.10.254', ret[1]['gw'])
852 self.assertEqual('inactive', ret[1]['link_state'])
855@ddt.ddt
856class InStorageAssistantTestCase(test.TestCase):
857 def setUp(self):
858 self.sshMock = mock.Mock()
859 self.assistant = instorage.InStorageAssistant(self.sshMock)
860 super(InStorageAssistantTestCase, self).setUp()
862 def tearDown(self):
863 super(InStorageAssistantTestCase, self).tearDown()
865 @ddt.data(
866 {'size': '1000MB', 'gb_size': 1},
867 {'size': '3GB', 'gb_size': 3},
868 {'size': '4TB', 'gb_size': 4096},
869 {'size': '5PB', 'gb_size': 5242880})
870 @ddt.unpack
871 def test_size_to_gb(self, size, gb_size):
872 ret = self.assistant.size_to_gb(size)
873 self.assertEqual(gb_size, ret)
875 def test_get_available_pools(self):
876 response_for_lsnaspool = ('\n'.join([
877 'pool_name!available_capacity',
878 'pool0!100GB',
879 'pool1!150GB'
880 ]), '')
881 cmdlist = ['mcsinq', 'lsnaspool', '-delim', '!']
882 self.sshMock.return_value = response_for_lsnaspool
884 ret = self.assistant.get_available_pools()
886 pools = ['pool0', 'pool1']
887 self.assertEqual(pools, ret)
888 self.sshMock.assert_called_once_with(cmdlist)
890 def test_get_pools_attr(self):
891 response_for_lsfs = ('\n'.join([
892 'pool_name!fs_name!total_capacity!used_capacity',
893 'pool0!fs0!10GB!1GB',
894 'pool1!fs1!8GB!3GB'
895 ]), '')
896 call_for_lsfs = mock.call(['mcsinq', 'lsfs', '-delim', '!', '-all'])
897 response_for_lsnaspool = ('\n'.join([
898 'pool_name!available_capacity',
899 'pool0!100GB',
900 'pool1!150GB'
901 ]), '')
902 call_for_lsnaspool = mock.call(['mcsinq', 'lsnaspool', '-delim', '!'])
903 self.sshMock.side_effect = [
904 response_for_lsfs,
905 response_for_lsnaspool
906 ]
908 ret = self.assistant.get_pools_attr(['pool0'])
909 pools = {
910 'pool0': {
911 'pool_name': 'pool0',
912 'total_capacity_gb': 110,
913 'free_capacity_gb': 100,
914 'allocated_capacity_gb': 10,
915 'qos': False,
916 'reserved_percentage': 0,
917 'reserved_snapshot_percentage': 0,
918 'reserved_share_extend_percentage': 0,
919 'dedupe': False,
920 'compression': False,
921 'thin_provisioning': False,
922 'max_over_subscription_ratio': 0
923 }
924 }
925 self.assertEqual(pools, ret)
926 self.sshMock.assert_has_calls([call_for_lsfs, call_for_lsnaspool])
928 def test_get_nodes_info(self):
929 response_for_lsnasportip = ('\n'.join([
930 'node_name!id!ip!mask!gw!link_state',
931 'node1!1!192.168.10.1!255.255.255.0!192.168.10.254!active',
932 'node2!1!192.168.10.2!255.255.255.0!192.168.10.254!inactive',
933 'node1!2!!!!inactive',
934 'node2!2!!!!inactive'
935 ]), '')
936 call_for_lsnasportip = mock.call([
937 'mcsinq', 'lsnasportip', '-delim', '!'
938 ])
939 self.sshMock.side_effect = [response_for_lsnasportip]
941 ret = self.assistant.get_nodes_info()
942 nodes = {
943 'node1': {
944 '1': {
945 'node_name': 'node1',
946 'id': '1',
947 'ip': '192.168.10.1',
948 'mask': '255.255.255.0',
949 'gw': '192.168.10.254',
950 'link_state': 'active'
951 }
952 },
953 'node2': {
954 '1': {
955 'node_name': 'node2',
956 'id': '1',
957 'ip': '192.168.10.2',
958 'mask': '255.255.255.0',
959 'gw': '192.168.10.254',
960 'link_state': 'inactive'
961 }
962 }
963 }
964 self.assertEqual(nodes, ret)
965 self.sshMock.assert_has_calls([call_for_lsnasportip])
967 @ddt.data(
968 {'name': '1' * 30, 'fsname': '1' * 30},
969 {'name': '1' * 40, 'fsname': '1' * 32})
970 @ddt.unpack
971 def test_get_fsname_by_name(self, name, fsname):
972 ret = self.assistant.get_fsname_by_name(name)
974 self.assertEqual(fsname, ret)
976 @ddt.data(
977 {'name': '1' * 30, 'dirname': '1' * 30},
978 {'name': '1' * 40, 'dirname': '1' * 32})
979 @ddt.unpack
980 def test_get_dirsname_by_name(self, name, dirname):
981 ret = self.assistant.get_dirname_by_name(name)
983 self.assertEqual(dirname, ret)
985 @ddt.data(
986 {'name': '1' * 30, 'dirpath': '/fs/' + '1' * 30 + '/' + '1' * 30},
987 {'name': '1' * 40, 'dirpath': '/fs/' + '1' * 32 + '/' + '1' * 32})
988 @ddt.unpack
989 def test_get_dirpath_by_name(self, name, dirpath):
990 ret = self.assistant.get_dirpath_by_name(name)
992 self.assertEqual(dirpath, ret)
994 @ddt.data('CIFS', 'NFS')
995 def test_create_share(self, proto):
996 response_for_lsnasportip = ('\n'.join([
997 'node_name!id!ip!mask!gw!link_state',
998 'node1!1!192.168.10.1!255.255.255.0!192.168.10.254!active'
999 ]), '')
1000 call_for_lsnasportip = mock.call([
1001 'mcsinq', 'lsnasportip', '-delim', '!'
1002 ])
1003 response_for_addfs = ('', '')
1004 call_for_addfs = mock.call([
1005 'mcsop', 'addfs', '-name', '"fakename"', '-pool', '"fakepool"',
1006 '-size', '10g', '-node', '"node1"'
1007 ])
1008 response_for_addnasdir = ('', '')
1009 call_for_addnasdir = mock.call([
1010 'mcsop', 'addnasdir', '"/fs/fakename/fakename"'
1011 ])
1012 response_for_addcifs = ('', '')
1013 call_for_addcifs = mock.call([
1014 'mcsop', 'addcifs', '-name', 'fakename',
1015 '-path', '/fs/fakename/fakename', '-oplocks', 'off'
1016 ])
1018 side_effect = [
1019 response_for_lsnasportip,
1020 response_for_addfs,
1021 response_for_addnasdir
1022 ]
1023 calls = [call_for_lsnasportip, call_for_addfs, call_for_addnasdir]
1024 if proto == 'CIFS':
1025 side_effect.append(response_for_addcifs)
1026 calls.append(call_for_addcifs)
1027 self.sshMock.side_effect = side_effect
1029 self.assistant.create_share('fakename', 'fakepool', 10, proto)
1031 self.sshMock.assert_has_calls(calls)
1033 @ddt.data(True, False)
1034 def test_check_share_exist(self, exist):
1035 response_for_lsfs = ('\n'.join([
1036 'pool_name!fs_name!total_capacity!used_capacity',
1037 'pool0!fs0!10GB!1GB',
1038 'pool1!fs1!8GB!3GB'
1039 ]), '')
1040 call_for_lsfs = mock.call([
1041 'mcsinq', 'lsfs', '-delim', '!', '-all'
1042 ])
1043 self.sshMock.side_effect = [
1044 response_for_lsfs
1045 ]
1047 share_name = 'fs0' if exist else 'fs2'
1049 ret = self.assistant.check_share_exist(share_name)
1051 self.assertEqual(exist, ret)
1052 self.sshMock.assert_has_calls([call_for_lsfs])
1054 @ddt.data({'proto': 'CIFS', 'share_exist': False},
1055 {'proto': 'CIFS', 'share_exist': True},
1056 {'proto': 'NFS', 'share_exist': False},
1057 {'proto': 'NFS', 'share_exist': True})
1058 @ddt.unpack
1059 def test_delete_share(self, proto, share_exist):
1060 mock_cse = self.mock_object(
1061 instorage.InStorageAssistant,
1062 'check_share_exist',
1063 mock.Mock(return_value=share_exist)
1064 )
1065 response_for_rmcifs = ('', '')
1066 call_for_rmcifs = mock.call([
1067 'mcsop', 'rmcifs', 'fakename'
1068 ])
1069 response_for_rmnasdir = ('', '')
1070 call_for_rmnasdir = mock.call([
1071 'mcsop', 'rmnasdir', '"/fs/fakename/fakename"'
1072 ])
1073 response_for_rmfs = ('', '')
1074 call_for_rmfs = mock.call([
1075 'mcsop', 'rmfs', '-name', '"fakename"'
1076 ])
1078 side_effect = [response_for_rmnasdir, response_for_rmfs]
1079 calls = [call_for_rmnasdir, call_for_rmfs]
1080 if proto == 'CIFS':
1081 side_effect.insert(0, response_for_rmcifs)
1082 calls.insert(0, call_for_rmcifs)
1083 self.sshMock.side_effect = side_effect
1085 self.assistant.delete_share('fakename', proto)
1087 mock_cse.assert_called_once_with('fakename')
1088 if share_exist:
1089 self.sshMock.assert_has_calls(calls)
1090 else:
1091 self.sshMock.assert_not_called()
1093 def test_extend_share(self):
1094 response_for_lsfs = ('\n'.join([
1095 'pool_name!fs_name!total_capacity!used_capacity',
1096 'pool0!fs0!10GB!1GB',
1097 'pool1!fs1!8GB!3GB'
1098 ]), '')
1099 call_for_lsfs = mock.call([
1100 'mcsinq', 'lsfs', '-delim', '!', '-all'
1101 ])
1102 response_for_expandfs = ('', '')
1103 call_for_expandfs = mock.call([
1104 'mcsop', 'expandfs', '-name', '"fs0"', '-size', '2g'
1105 ])
1106 self.sshMock.side_effect = [response_for_lsfs, response_for_expandfs]
1108 self.assistant.extend_share('fs0', 12)
1110 self.sshMock.assert_has_calls([call_for_lsfs, call_for_expandfs])
1112 @ddt.data('CIFS', 'NFS')
1113 def test_get_export_locations(self, proto):
1114 response_for_lsnode = ('\n'.join([
1115 'id!name',
1116 '1!node1',
1117 '2!node2'
1118 ]), '')
1119 call_for_lsnode = mock.call([
1120 'mcsinq', 'lsnode', '-delim', '!'
1121 ])
1122 response_for_lsfs_node1 = ('\n'.join([
1123 'pool_name!fs_name!total_capacity!used_capacity',
1124 'pool0!fs0!10GB!1GB'
1125 ]), '')
1126 call_for_lsfs_node1 = mock.call([
1127 'mcsinq', 'lsfs', '-delim', '!', '-node', '"node1"'
1128 ])
1129 response_for_lsfs_node2 = ('\n'.join([
1130 'pool_name!fs_name!total_capacity!used_capacity',
1131 'pool1!fs1!10GB!1GB'
1132 ]), '')
1133 call_for_lsfs_node2 = mock.call([
1134 'mcsinq', 'lsfs', '-delim', '!', '-node', '"node2"'
1135 ])
1136 response_for_lsnasportip = ('\n'.join([
1137 'node_name!id!ip!mask!gw!link_state',
1138 'node1!1!192.168.10.1!255.255.255.0!192.168.10.254!active',
1139 'node1!2!192.168.10.2!255.255.255.0!192.168.10.254!active',
1140 'node1!3!!!!inactive',
1141 'node2!1!192.168.10.3!255.255.255.0!192.168.10.254!active',
1142 'node2!2!192.168.10.4!255.255.255.0!192.168.10.254!active',
1143 'node2!3!!!!inactive'
1144 ]), '')
1145 call_for_lsnasportip = mock.call([
1146 'mcsinq', 'lsnasportip', '-delim', '!'
1147 ])
1148 self.sshMock.side_effect = [
1149 response_for_lsnode,
1150 response_for_lsfs_node1,
1151 response_for_lsfs_node2,
1152 response_for_lsnasportip
1153 ]
1154 calls = [
1155 call_for_lsnode,
1156 call_for_lsfs_node1,
1157 call_for_lsfs_node2,
1158 call_for_lsnasportip
1159 ]
1161 ret = self.assistant.get_export_locations('fs1', proto)
1162 if proto == 'CIFS':
1163 locations = [
1164 {
1165 'path': '\\\\192.168.10.3\\fs1',
1166 'is_admin_only': False,
1167 'metadata': {}
1168 },
1169 {
1170 'path': '\\\\192.168.10.4\\fs1',
1171 'is_admin_only': False,
1172 'metadata': {}
1173 }
1174 ]
1175 else:
1176 locations = [
1177 {
1178 'path': '192.168.10.3:/fs/fs1/fs1',
1179 'is_admin_only': False,
1180 'metadata': {}
1181 },
1182 {
1183 'path': '192.168.10.4:/fs/fs1/fs1',
1184 'is_admin_only': False,
1185 'metadata': {}
1186 }
1187 ]
1188 self.assertEqual(locations, ret)
1189 self.sshMock.assert_has_calls(calls)
1191 def test_classify_nfs_client_spec_has_nfsinfo(self):
1192 response_for_lsnfslist = ('\n'.join([
1193 'path',
1194 '/fs/fs01/fs01'
1195 ]), '')
1196 call_for_lsnfslist = mock.call([
1197 'mcsinq', 'lsnfslist', '-delim', '!', '"/fs/fs01/fs01"'
1198 ])
1199 response_for_lsnfsinfo = ('\n'.join([
1200 'ip!mask!rights!all_squash!root_squash',
1201 '192.168.1.0!255.255.255.0!rw!all_squash!root_squash',
1202 '192.168.2.0!255.255.255.0!rw!all_squash!root_squash'
1203 ]), '')
1204 call_for_lsnfsinfo = mock.call([
1205 'mcsinq', 'lsnfsinfo', '-delim', '!', '"/fs/fs01/fs01"'
1206 ])
1207 self.sshMock.side_effect = [
1208 response_for_lsnfslist, response_for_lsnfsinfo
1209 ]
1210 calls = [call_for_lsnfslist, call_for_lsnfsinfo]
1212 client_spec = [
1213 '192.168.2.0/255.255.255.0:rw:all_squash:root_squash',
1214 '192.168.3.0/255.255.255.0:rw:all_squash:root_squash'
1215 ]
1216 add_spec, del_spec = self.assistant.classify_nfs_client_spec(
1217 client_spec, '/fs/fs01/fs01'
1218 )
1220 self.assertEqual(
1221 add_spec, ['192.168.3.0/255.255.255.0:rw:all_squash:root_squash']
1222 )
1223 self.assertEqual(
1224 del_spec, ['192.168.1.0/255.255.255.0:rw:all_squash:root_squash']
1225 )
1226 self.sshMock.assert_has_calls(calls)
1228 def test_classify_nfs_client_spec_has_no_nfsinfo(self):
1229 cmdlist = [
1230 'mcsinq', 'lsnfslist', '-delim', '!', '"/fs/fs01/fs01"'
1231 ]
1232 self.sshMock.return_value = ('', '')
1234 client_spec = [
1235 '192.168.2.0/255.255.255.0:rw:all_squash:root_squash',
1236 ]
1237 add_spec, del_spec = self.assistant.classify_nfs_client_spec(
1238 client_spec, '/fs/fs01/fs01'
1239 )
1241 self.assertEqual(client_spec, add_spec)
1242 self.assertEqual([], del_spec)
1243 self.sshMock.assert_called_once_with(cmdlist)
1245 def test_access_rule_to_client_spec(self):
1246 rule = {
1247 'access_type': 'ip',
1248 'access_to': '192.168.10.0/24',
1249 'access_level': 'rw'
1250 }
1252 ret = self.assistant.access_rule_to_client_spec(rule)
1254 spec = '192.168.10.0/255.255.255.0:rw:all_squash:root_squash'
1255 self.assertEqual(spec, ret)
1257 def test_access_rule_to_client_spec_type_failed(self):
1258 rule = {
1259 'access_type': 'user',
1260 'access_to': 'test01',
1261 'access_level': 'rw'
1262 }
1264 self.assertRaises(
1265 exception.ShareBackendException,
1266 self.assistant.access_rule_to_client_spec,
1267 rule
1268 )
1270 def test_access_rule_to_client_spec_ipversion_failed(self):
1271 rule = {
1272 'access_type': 'ip',
1273 'access_to': '2001:db8::/64',
1274 'access_level': 'rw'
1275 }
1277 self.assertRaises(
1278 exception.ShareBackendException,
1279 self.assistant.access_rule_to_client_spec,
1280 rule
1281 )
1283 @ddt.data(True, False)
1284 def test_update_nfs_access(self, check_del_add):
1285 response_for_rmnfsclient = ('', '')
1286 call_for_rmnfsclient = mock.call(
1287 ['mcsop', 'rmnfsclient', '-path', '"/fs/fs01/fs01"', '-client',
1288 '192.168.1.0/255.255.255.0']
1289 )
1290 response_for_addnfsclient = ('', '')
1291 call_for_addnfsclient = mock.call(
1292 ['mcsop', 'addnfsclient', '-path', '"/fs/fs01/fs01"', '-client',
1293 '192.168.3.0/255.255.255.0:rw:all_squash:root_squash']
1294 )
1295 access_rules = [
1296 {
1297 'access_type': 'ip',
1298 'access_to': '192.168.2.0/24',
1299 'access_level': 'rw'
1300 },
1301 {
1302 'access_type': 'ip',
1303 'access_to': '192.168.3.0/24',
1304 'access_level': 'rw'
1305 }
1306 ]
1307 add_rules = [
1308 {
1309 'access_type': 'ip',
1310 'access_to': '192.168.3.0/24',
1311 'access_level': 'rw'
1312 }
1313 ]
1314 del_rules = [
1315 {
1316 'access_type': 'ip',
1317 'access_to': '192.168.1.0/24',
1318 'access_level': 'rw'
1319 },
1320 {
1321 'access_type': 'ip',
1322 'access_to': '192.168.4.0/24',
1323 'access_level': 'rw'
1324 }
1325 ]
1327 cncs_mock = mock.Mock(return_value=(
1328 ['192.168.3.0/255.255.255.0:rw:all_squash:root_squash'],
1329 ['192.168.1.0/255.255.255.0:rw:all_squash:root_squash']
1330 ))
1331 self.mock_object(self.assistant, 'classify_nfs_client_spec', cncs_mock)
1332 self.sshMock.side_effect = [
1333 response_for_rmnfsclient, response_for_addnfsclient
1334 ]
1336 if check_del_add:
1337 self.assistant.update_nfs_access('fs01', [], add_rules, del_rules)
1338 else:
1339 self.assistant.update_nfs_access('fs01', access_rules, [], [])
1341 if check_del_add:
1342 cncs_mock.assert_called_once_with(
1343 [], '/fs/fs01/fs01'
1344 )
1345 else:
1346 cncs_mock.assert_called_once_with(
1347 [
1348 '192.168.2.0/255.255.255.0:rw:all_squash:root_squash',
1349 '192.168.3.0/255.255.255.0:rw:all_squash:root_squash'
1350 ],
1351 '/fs/fs01/fs01'
1352 )
1354 self.sshMock.assert_has_calls(
1355 [call_for_rmnfsclient, call_for_addnfsclient]
1356 )
1358 def test_classify_cifs_rights(self):
1359 cmdlist = ['mcsinq', 'lscifsinfo', '-delim', '!', '"fs01"']
1360 response_for_lscifsinfo = '\n'.join([
1361 'path!oplocks!type!name!rights',
1362 '/fs/fs01/fs01!on!LU!user1!rw',
1363 '/fs/fs01/fs01!on!LU!user2!rw'
1364 ])
1365 self.sshMock.return_value = (response_for_lscifsinfo, '')
1367 access_rights = [
1368 'LU:user2:rw',
1369 'LU:user3:rw'
1370 ]
1371 add_rights, del_rights = self.assistant.classify_cifs_rights(
1372 access_rights, 'fs01'
1373 )
1375 self.sshMock.assert_called_once_with(cmdlist)
1376 self.assertEqual(['LU:user3:rw'], add_rights)
1377 self.assertEqual(['LU:user1:rw'], del_rights)
1379 def test_access_rule_to_rights(self):
1380 rule = {
1381 'access_type': 'user',
1382 'access_to': 'test01',
1383 'access_level': 'rw'
1384 }
1386 ret = self.assistant.access_rule_to_rights(rule)
1387 self.assertEqual('LU:test01:rw', ret)
1389 def test_access_rule_to_rights_fail_type(self):
1390 rule = {
1391 'access_type': 'ip',
1392 'access_to': '192.168.1.0/24',
1393 'access_level': 'rw'
1394 }
1396 self.assertRaises(
1397 exception.ShareBackendException,
1398 self.assistant.access_rule_to_rights,
1399 rule
1400 )
1402 @ddt.data(True, False)
1403 def test_update_cifs_access(self, check_del_add):
1404 response_for_rmcifsuser = ('', None)
1405 call_for_rmcifsuser = mock.call(
1406 ['mcsop', 'rmcifsuser', '-name', 'fs01', '-rights', 'LU:user1']
1407 )
1408 response_for_addcifsuser = ('', None)
1409 call_for_addcifsuser = mock.call(
1410 ['mcsop', 'addcifsuser', '-name', 'fs01', '-rights', 'LU:user3:rw']
1411 )
1412 access_rules = [
1413 {
1414 'access_type': 'user',
1415 'access_to': 'user2',
1416 'access_level': 'rw'
1417 },
1418 {
1419 'access_type': 'user',
1420 'access_to': 'user3',
1421 'access_level': 'rw'
1422 }
1423 ]
1424 add_rules = [
1425 {
1426 'access_type': 'user',
1427 'access_to': 'user3',
1428 'access_level': 'rw'
1429 }
1430 ]
1431 del_rules = [
1432 {
1433 'access_type': 'user',
1434 'access_to': 'user1',
1435 'access_level': 'rw'
1436 }
1437 ]
1439 ccr_mock = mock.Mock(return_value=(['LU:user3:rw'], ['LU:user1:rw']))
1440 self.mock_object(self.assistant, 'classify_cifs_rights', ccr_mock)
1441 self.sshMock.side_effect = [
1442 response_for_rmcifsuser, response_for_addcifsuser
1443 ]
1445 if check_del_add:
1446 self.assistant.update_cifs_access('fs01', [], add_rules, del_rules)
1447 else:
1448 self.assistant.update_cifs_access('fs01', access_rules, [], [])
1450 if not check_del_add:
1451 ccr_mock.assert_called_once_with(
1452 ['LU:user2:rw', 'LU:user3:rw'], 'fs01'
1453 )
1455 self.sshMock.assert_has_calls(
1456 [call_for_rmcifsuser, call_for_addcifsuser]
1457 )
1459 def test_check_access_type(self):
1460 rules1 = {
1461 'access_type': 'ip',
1462 'access_to': '192.168.1.0/24',
1463 'access_level': 'rw'
1464 }
1465 rules2 = {
1466 'access_type': 'ip',
1467 'access_to': '192.168.2.0/24',
1468 'access_level': 'rw'
1469 }
1470 rules3 = {
1471 'access_type': 'user',
1472 'access_to': 'user1',
1473 'access_level': 'rw'
1474 }
1475 rules4 = {
1476 'access_type': 'user',
1477 'access_to': 'user2',
1478 'access_level': 'rw'
1479 }
1481 ret = self.assistant.check_access_type('ip', [rules1], [rules2])
1482 self.assertTrue(ret)
1483 ret = self.assistant.check_access_type('user', [rules3], [rules4])
1484 self.assertTrue(ret)
1485 ret = self.assistant.check_access_type('ip', [rules1], [rules3])
1486 self.assertFalse(ret)
1487 ret = self.assistant.check_access_type('user', [rules3], [rules1])
1488 self.assertFalse(ret)
1490 @ddt.data(
1491 {'proto': 'CIFS', 'ret': True},
1492 {'proto': 'CIFS', 'ret': False},
1493 {'proto': 'NFS', 'ret': True},
1494 {'proto': 'NFS', 'ret': False},
1495 {'proto': 'unknown', 'ret': True})
1496 @ddt.unpack
1497 def test_update_access(self, proto, ret):
1498 uca_mock = self.mock_object(
1499 self.assistant, 'update_cifs_access', mock.Mock()
1500 )
1501 una_mock = self.mock_object(
1502 self.assistant, 'update_nfs_access', mock.Mock()
1503 )
1504 cat_mock = self.mock_object(
1505 self.assistant, 'check_access_type', mock.Mock(return_value=ret)
1506 )
1508 if proto == 'unknown':
1509 self.assertRaises(
1510 exception.ShareBackendException,
1511 self.assistant.update_access,
1512 'fs01',
1513 proto,
1514 [],
1515 [],
1516 []
1517 )
1518 cat_mock.assert_not_called()
1519 elif ret is False:
1520 self.assertRaises(
1521 exception.InvalidShareAccess,
1522 self.assistant.update_access,
1523 'fs01',
1524 proto,
1525 [],
1526 [],
1527 []
1528 )
1529 cat_mock.assert_called_once()
1530 else:
1531 self.assistant.update_access(
1532 'fs01',
1533 proto,
1534 [],
1535 [],
1536 []
1537 )
1538 if proto == 'CIFS':
1539 uca_mock.assert_called_once_with('fs01', [], [], [])
1540 una_mock.assert_not_called()
1541 else:
1542 una_mock.assert_called_once_with('fs01', [], [], [])
1543 uca_mock.assert_not_called()
1544 cat_mock.assert_called_once()