Coverage for manila/tests/share/drivers/infortrend/test_infortrend_nas.py: 100%
242 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2019 Infortrend Technology, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16from unittest import mock
18import ddt
19from oslo_config import cfg
21from manila import context
22from manila import exception
23from manila.share import configuration
24from manila.share.drivers.infortrend import driver
25from manila.share.drivers.infortrend import infortrend_nas
26from manila import test
27from manila.tests.share.drivers.infortrend import fake_infortrend_manila_data
28from manila.tests.share.drivers.infortrend import fake_infortrend_nas_data
30CONF = cfg.CONF
32SUCCEED = (0, [])
35@ddt.ddt
36class InfortrendNASDriverTestCase(test.TestCase):
37 def __init__(self, *args, **kwargs):
38 super(InfortrendNASDriverTestCase, self).__init__(*args, **kwargs)
39 self._ctxt = context.get_admin_context()
40 self.nas_data = fake_infortrend_nas_data.InfortrendNASTestData()
41 self.m_data = fake_infortrend_manila_data.InfortrendManilaTestData()
43 def setUp(self):
44 CONF.set_default('driver_handles_share_servers', False)
45 CONF.set_default('infortrend_nas_ip', '172.27.1.1')
46 CONF.set_default('infortrend_nas_user', 'fake_user')
47 CONF.set_default('infortrend_nas_password', 'fake_password')
48 CONF.set_default('infortrend_nas_ssh_key', 'fake_sshkey')
49 CONF.set_default('infortrend_share_pools', 'share-pool-01')
50 CONF.set_default('infortrend_share_channels', '0,1')
51 self.fake_conf = configuration.Configuration(None)
52 super(InfortrendNASDriverTestCase, self).setUp()
54 def _get_driver(self, fake_conf, init_dict=False):
55 self._driver = driver.InfortrendNASDriver(
56 configuration=fake_conf)
57 self._iftnas = self._driver.ift_nas
58 self.pool_id = ['6541BAFB2E6C57B6']
59 self.pool_path = ['/share-pool-01/LV-1/']
61 if init_dict:
62 self._iftnas.pool_dict = {
63 'share-pool-01': {
64 'id': self.pool_id[0],
65 'path': self.pool_path[0],
66 }
67 }
68 self._iftnas.channel_dict = {
69 '0': self.nas_data.fake_channel_ip[0],
70 '1': self.nas_data.fake_channel_ip[1],
71 }
73 def test_no_login_ssh_key_and_pass(self):
74 self.fake_conf.set_default('infortrend_nas_password', None)
75 self.fake_conf.set_default('infortrend_nas_ssh_key', None)
77 self.assertRaises(
78 exception.InvalidParameterValue,
79 self._get_driver,
80 self.fake_conf)
82 def test_parser_with_service_status(self):
83 self._get_driver(self.fake_conf)
84 expect_service_status = [{
85 'A': {
86 'NFS': {
87 'displayName': 'NFS',
88 'state_time': '2017-05-04 14:19:53',
89 'enabled': True,
90 'cpu_rate': '0.0',
91 'mem_rate': '0.0',
92 'state': 'exited',
93 'type': 'share',
94 }
95 }
96 }]
98 rc, service_status = self._iftnas._parser(
99 self.nas_data.fake_service_status_data)
101 self.assertEqual(0, rc)
102 self.assertDictListMatch(expect_service_status, service_status)
104 def test_parser_with_folder_status(self):
105 self._get_driver(self.fake_conf)
106 expect_folder_status = [{
107 'utility': '1.00',
108 'used': '33886208',
109 'subshare': True,
110 'share': False,
111 'worm': '',
112 'free': '321931374592',
113 'fsType': 'xfs',
114 'owner': 'A',
115 'readOnly': False,
116 'modifyTime': '2017-04-27 16:16',
117 'directory': self.pool_path[0][:-1],
118 'volumeId': self.pool_id[0],
119 'mounted': True,
120 'size': '321965260800'}, {
121 'utility': '1.00',
122 'used': '33779712',
123 'subshare': False,
124 'share': False,
125 'worm': '',
126 'free': '107287973888',
127 'fsType': 'xfs',
128 'owner': 'A',
129 'readOnly': False,
130 'modifyTime': '2017-04-27 15:45',
131 'directory': '/share-pool-02/LV-1',
132 'volumeId': '147A8FB67DA39914',
133 'mounted': True,
134 'size': '107321753600'
135 }]
137 rc, folder_status = self._iftnas._parser(
138 self.nas_data.fake_folder_status_data)
140 self.assertEqual(0, rc)
141 self.assertDictListMatch(expect_folder_status, folder_status)
143 def test_ensure_service_on(self):
144 self._get_driver(self.fake_conf)
145 mock_execute = mock.Mock(
146 side_effect=[(0, self.nas_data.fake_nfs_status_off), SUCCEED])
147 self._iftnas._execute = mock_execute
149 self._iftnas._ensure_service_on('nfs')
151 mock_execute.assert_called_with(['service', 'restart', 'nfs'])
153 def test_check_channels_status(self):
154 self._get_driver(self.fake_conf)
155 expect_channel_dict = {
156 '0': self.nas_data.fake_channel_ip[0],
157 '1': self.nas_data.fake_channel_ip[1],
158 }
160 self._iftnas._execute = mock.Mock(
161 return_value=(0, self.nas_data.fake_get_channel_status()))
163 self._iftnas._check_channels_status()
165 self.assertDictEqual(expect_channel_dict, self._iftnas.channel_dict)
167 @mock.patch.object(infortrend_nas.LOG, 'warning')
168 def test_channel_status_down(self, log_warning):
169 self._get_driver(self.fake_conf)
170 self._iftnas._execute = mock.Mock(
171 return_value=(0, self.nas_data.fake_get_channel_status('DOWN')))
173 self._iftnas._check_channels_status()
175 self.assertEqual(1, log_warning.call_count)
177 @mock.patch.object(infortrend_nas.LOG, 'error')
178 def test_invalid_channel(self, log_error):
179 self.fake_conf.set_default('infortrend_share_channels', '0, 6')
180 self._get_driver(self.fake_conf)
181 self._iftnas._execute = mock.Mock(
182 return_value=(0, self.nas_data.fake_get_channel_status()))
184 self.assertRaises(
185 exception.InfortrendNASException,
186 self._iftnas._check_channels_status)
188 def test_check_pools_setup(self):
189 self._get_driver(self.fake_conf)
190 expect_pool_dict = {
191 'share-pool-01': {
192 'id': self.pool_id[0],
193 'path': self.pool_path[0],
194 }
195 }
196 self._iftnas._execute = mock.Mock(
197 return_value=(0, self.nas_data.fake_folder_status))
199 self._iftnas._check_pools_setup()
201 self.assertDictEqual(expect_pool_dict, self._iftnas.pool_dict)
203 def test_unknow_pools_setup(self):
204 self.fake_conf.set_default(
205 'infortrend_share_pools', 'chengwei, share-pool-01')
206 self._get_driver(self.fake_conf)
207 self._iftnas._execute = mock.Mock(
208 return_value=(0, self.nas_data.fake_folder_status))
210 self.assertRaises(
211 exception.InfortrendNASException,
212 self._iftnas._check_pools_setup)
214 @mock.patch.object(infortrend_nas.InfortrendNAS, '_execute')
215 def test_get_pool_quota_used(self, mock_execute):
216 self._get_driver(self.fake_conf, True)
217 mock_execute.return_value = (0, self.nas_data.fake_fquota_status)
219 pool_quota = self._iftnas._get_pool_quota_used('share-pool-01')
221 mock_execute.assert_called_with(
222 ['fquota', 'status', self.pool_id[0],
223 'LV-1', '-t', 'folder'])
224 self.assertEqual(201466179584, pool_quota)
226 @mock.patch.object(infortrend_nas.InfortrendNAS, '_execute')
227 def test_create_share_nfs(self, mock_execute):
228 self._get_driver(self.fake_conf, True)
229 fake_share_id = self.m_data.fake_share_nfs['id']
230 fake_share_name = fake_share_id.replace('-', '')
231 expect_locations = [
232 self.nas_data.fake_channel_ip[0] +
233 ':/share-pool-01/LV-1/' + fake_share_name,
234 self.nas_data.fake_channel_ip[1] +
235 ':/share-pool-01/LV-1/' + fake_share_name,
236 ]
237 mock_execute.side_effect = [
238 SUCCEED, # create folder
239 SUCCEED, # set size
240 (0, self.nas_data.fake_get_share_status_nfs()), # check proto
241 SUCCEED, # enable proto
242 (0, self.nas_data.fake_get_channel_status()) # update channel
243 ]
245 locations = self._driver.create_share(
246 self._ctxt, self.m_data.fake_share_nfs)
248 self.assertEqual(expect_locations, locations)
249 mock_execute.assert_any_call(
250 ['share', self.pool_path[0] + fake_share_name, 'nfs', 'on'])
252 @mock.patch.object(infortrend_nas.InfortrendNAS, '_execute')
253 def test_create_share_cifs(self, mock_execute):
254 self._get_driver(self.fake_conf, True)
255 fake_share_id = self.m_data.fake_share_cifs['id']
256 fake_share_name = fake_share_id.replace('-', '')
257 expect_locations = [
258 '\\\\' + self.nas_data.fake_channel_ip[0] +
259 '\\' + fake_share_name,
260 '\\\\' + self.nas_data.fake_channel_ip[1] +
261 '\\' + fake_share_name,
262 ]
263 mock_execute.side_effect = [
264 SUCCEED, # create folder
265 SUCCEED, # set size
266 (0, self.nas_data.fake_get_share_status_cifs()), # check proto
267 SUCCEED, # enable proto
268 (0, self.nas_data.fake_get_channel_status()) # update channel
269 ]
271 locations = self._driver.create_share(
272 self._ctxt, self.m_data.fake_share_cifs)
274 self.assertEqual(expect_locations, locations)
275 mock_execute.assert_any_call(
276 ['share', self.pool_path[0] + fake_share_name,
277 'cifs', 'on', '-n', fake_share_name])
279 @mock.patch.object(infortrend_nas.InfortrendNAS, '_execute')
280 def test_delete_share_nfs(self, mock_execute):
281 self._get_driver(self.fake_conf, True)
282 fake_share_id = self.m_data.fake_share_nfs['id']
283 fake_share_name = fake_share_id.replace('-', '')
284 mock_execute.side_effect = [
285 (0, self.nas_data.fake_subfolder_data), # pagelist folder
286 SUCCEED, # delete folder
287 ]
289 self._driver.delete_share(
290 self._ctxt, self.m_data.fake_share_nfs)
292 mock_execute.assert_any_call(
293 ['folder', 'options', self.pool_id[0],
294 'LV-1', '-d', fake_share_name])
296 @mock.patch.object(infortrend_nas.InfortrendNAS, '_execute')
297 def test_delete_share_cifs(self, mock_execute):
298 self._get_driver(self.fake_conf, True)
299 fake_share_id = self.m_data.fake_share_cifs['id']
300 fake_share_name = fake_share_id.replace('-', '')
301 mock_execute.side_effect = [
302 (0, self.nas_data.fake_subfolder_data), # pagelist folder
303 SUCCEED, # delete folder
304 ]
306 self._driver.delete_share(
307 self._ctxt, self.m_data.fake_share_cifs)
309 mock_execute.assert_any_call(
310 ['folder', 'options', self.pool_id[0],
311 'LV-1', '-d', fake_share_name])
313 @mock.patch.object(infortrend_nas.LOG, 'warning')
314 @mock.patch.object(infortrend_nas.InfortrendNAS, '_execute')
315 def test_delete_non_exist_share(self, mock_execute, log_warning):
316 self._get_driver(self.fake_conf, True)
317 mock_execute.side_effect = [
318 (0, self.nas_data.fake_subfolder_data), # pagelist folder
319 ]
321 self._driver.delete_share(
322 self._ctxt, self.m_data.fake_non_exist_share)
324 self.assertEqual(1, log_warning.call_count)
326 def test_get_pool(self):
327 self._get_driver(self.fake_conf, True)
328 pool = self._driver.get_pool(self.m_data.fake_share_nfs)
330 self.assertEqual('share-pool-01', pool)
332 def test_get_pool_without_host(self):
333 self._get_driver(self.fake_conf, True)
334 self._iftnas._execute = mock.Mock(
335 return_value=(0, self.nas_data.fake_subfolder_data))
337 pool = self._driver.get_pool(self.m_data.fake_share_cifs_no_host)
339 self.assertEqual('share-pool-01', pool)
341 def test_ensure_share_nfs(self):
342 self._get_driver(self.fake_conf, True)
343 share_id = self.m_data.fake_share_nfs['id']
344 share_name = share_id.replace('-', '')
345 share_path = self.pool_path[0] + share_name
346 expect_locations = [
347 self.nas_data.fake_channel_ip[0] + ':' + share_path,
348 self.nas_data.fake_channel_ip[1] + ':' + share_path,
349 ]
350 self._iftnas._execute = mock.Mock(
351 return_value=(0, self.nas_data.fake_get_channel_status()))
353 locations = self._driver.ensure_share(
354 self._ctxt, self.m_data.fake_share_nfs)
356 self.assertEqual(expect_locations, locations)
358 def test_ensure_share_cifs(self):
359 self._get_driver(self.fake_conf, True)
360 share_id = self.m_data.fake_share_cifs['id']
361 share_name = share_id.replace('-', '')
362 expect_locations = [
363 '\\\\' + self.nas_data.fake_channel_ip[0] +
364 '\\' + share_name,
365 '\\\\' + self.nas_data.fake_channel_ip[1] +
366 '\\' + share_name,
367 ]
368 self._iftnas._execute = mock.Mock(
369 return_value=(0, self.nas_data.fake_get_channel_status()))
371 locations = self._driver.ensure_share(
372 self._ctxt, self.m_data.fake_share_cifs)
374 self.assertEqual(expect_locations, locations)
376 def test_extend_share(self):
377 self._get_driver(self.fake_conf, True)
378 share_id = self.m_data.fake_share_nfs['id']
379 share_name = share_id.replace('-', '')
380 self._iftnas._execute = mock.Mock(return_value=SUCCEED)
382 self._driver.extend_share(self.m_data.fake_share_nfs, 100)
384 self._iftnas._execute.assert_called_once_with(
385 ['fquota', 'create', self.pool_id[0], 'LV-1',
386 share_name, '100G', '-t', 'folder'])
388 @mock.patch.object(infortrend_nas.InfortrendNAS, '_execute')
389 def test_shrink_share(self, mock_execute):
390 self._get_driver(self.fake_conf, True)
391 share_id = self.m_data.fake_share_nfs['id']
392 share_name = share_id.replace('-', '')
393 mock_execute.side_effect = [
394 (0, self.nas_data.fake_fquota_status), # check used
395 SUCCEED,
396 ]
398 self._driver.shrink_share(self.m_data.fake_share_nfs, 10)
400 mock_execute.assert_has_calls([
401 mock.call(['fquota', 'status', self.pool_id[0],
402 'LV-1', '-t', 'folder']),
403 mock.call(['fquota', 'create', self.pool_id[0],
404 'LV-1', share_name, '10G', '-t', 'folder'])])
406 @mock.patch.object(infortrend_nas.InfortrendNAS, '_execute')
407 def test_shrink_share_smaller_than_used_size(self, mock_execute):
408 self._get_driver(self.fake_conf, True)
409 mock_execute.side_effect = [
410 (0, self.nas_data.fake_fquota_status), # check used
411 ]
413 self.assertRaises(
414 exception.ShareShrinkingPossibleDataLoss,
415 self._driver.shrink_share,
416 self.m_data.fake_share_cifs,
417 10)
419 def test_get_share_size(self):
420 self._get_driver(self.fake_conf, True)
421 self._iftnas._execute = mock.Mock(
422 return_value=(0, self.nas_data.fake_fquota_status))
424 size = self._iftnas._get_share_size('', '', 'test-folder-02')
426 self.assertEqual(87.63, size)
428 @mock.patch.object(infortrend_nas.InfortrendNAS, '_execute')
429 def test_manage_existing_nfs(self, mock_execute):
430 self._get_driver(self.fake_conf, True)
431 share_id = self.m_data.fake_share_for_manage_nfs['id']
432 share_name = share_id.replace('-', '')
433 origin_share_path = self.pool_path[0] + 'test-folder'
434 export_share_path = self.pool_path[0] + share_name
435 expect_result = {
436 'size': 20.0,
437 'export_locations': [
438 self.nas_data.fake_channel_ip[0] + ':' + export_share_path,
439 self.nas_data.fake_channel_ip[1] + ':' + export_share_path,
440 ]
441 }
442 mock_execute.side_effect = [
443 (0, self.nas_data.fake_subfolder_data), # pagelist folder
444 (0, self.nas_data.fake_get_share_status_nfs()), # check proto
445 SUCCEED, # enable nfs
446 (0, self.nas_data.fake_fquota_status), # get share size
447 SUCCEED, # rename share
448 (0, self.nas_data.fake_get_channel_status()) # update channel
449 ]
451 result = self._driver.manage_existing(
452 self.m_data.fake_share_for_manage_nfs,
453 {}
454 )
456 self.assertEqual(expect_result, result)
457 mock_execute.assert_has_calls([
458 mock.call(['pagelist', 'folder', self.pool_path[0]]),
459 mock.call(['share', 'status', '-f', origin_share_path]),
460 mock.call(['share', origin_share_path, 'nfs', 'on']),
461 mock.call(['fquota', 'status', self.pool_id[0],
462 origin_share_path.split('/')[3], '-t', 'folder']),
463 mock.call(['folder', 'options', self.pool_id[0],
464 'LV-1', '-k', 'test-folder', share_name]),
465 mock.call(['ifconfig', 'inet', 'show']),
466 ])
468 @mock.patch.object(infortrend_nas.InfortrendNAS, '_execute')
469 def test_manage_existing_cifs(self, mock_execute):
470 self._get_driver(self.fake_conf, True)
471 share_id = self.m_data.fake_share_for_manage_cifs['id']
472 share_name = share_id.replace('-', '')
473 origin_share_path = self.pool_path[0] + 'test-folder-02'
474 expect_result = {
475 'size': 87.63,
476 'export_locations': [
477 '\\\\' + self.nas_data.fake_channel_ip[0] + '\\' + share_name,
478 '\\\\' + self.nas_data.fake_channel_ip[1] + '\\' + share_name,
479 ]
480 }
481 mock_execute.side_effect = [
482 (0, self.nas_data.fake_subfolder_data), # pagelist folder
483 (0, self.nas_data.fake_get_share_status_cifs()), # check proto
484 SUCCEED, # enable cifs
485 (0, self.nas_data.fake_fquota_status), # get share size
486 SUCCEED, # rename share
487 (0, self.nas_data.fake_get_channel_status()) # update channel
488 ]
490 result = self._driver.manage_existing(
491 self.m_data.fake_share_for_manage_cifs,
492 {}
493 )
495 self.assertEqual(expect_result, result)
496 mock_execute.assert_has_calls([
497 mock.call(['pagelist', 'folder', self.pool_path[0]]),
498 mock.call(['share', 'status', '-f', origin_share_path]),
499 mock.call(['share', origin_share_path, 'cifs', 'on',
500 '-n', share_name]),
501 mock.call(['fquota', 'status', self.pool_id[0],
502 origin_share_path.split('/')[3], '-t', 'folder']),
503 mock.call(['folder', 'options', self.pool_id[0],
504 'LV-1', '-k', 'test-folder-02', share_name]),
505 mock.call(['ifconfig', 'inet', 'show']),
506 ])
508 def test_manage_existing_with_no_location(self):
509 self._get_driver(self.fake_conf, True)
510 fake_share = self.m_data._get_fake_share_for_manage('')
512 self.assertRaises(
513 exception.InfortrendNASException,
514 self._driver.manage_existing,
515 fake_share, {})
517 @ddt.data('172.27.1.1:/share-pool-01/LV-1/test-folder',
518 '172.27.112.223:/share-pool-01/LV-1/some-folder')
519 def test_manage_existing_wrong_ip_or_name(self, fake_share_path):
520 self._get_driver(self.fake_conf, True)
521 fake_share = self.m_data._get_fake_share_for_manage(fake_share_path)
522 self._iftnas._execute = mock.Mock(
523 return_value=(0, self.nas_data.fake_subfolder_data))
525 self.assertRaises(
526 exception.InfortrendNASException,
527 self._driver.manage_existing,
528 fake_share, {})
530 @mock.patch.object(infortrend_nas.InfortrendNAS, '_execute')
531 def test_manage_existing_with_no_size_setting(self, mock_execute):
532 self._get_driver(self.fake_conf, True)
533 mock_execute.side_effect = [
534 (0, self.nas_data.fake_subfolder_data), # pagelist folder
535 (0, self.nas_data.fake_get_share_status_nfs()), # check proto
536 SUCCEED, # enable nfs
537 (0, self.nas_data.fake_fquota_status_with_no_settings),
538 ]
540 self.assertRaises(
541 exception.InfortrendNASException,
542 self._driver.manage_existing,
543 self.m_data.fake_share_for_manage_nfs,
544 {})
546 @ddt.data('NFS', 'CIFS')
547 @mock.patch.object(infortrend_nas.InfortrendNAS, '_execute')
548 def test_unmanage(self, protocol, mock_execute):
549 share_to_unmanage = (self.m_data.fake_share_nfs
550 if protocol == 'NFS' else
551 self.m_data.fake_share_cifs)
552 self._get_driver(self.fake_conf, True)
553 mock_execute.side_effect = [
554 (0, self.nas_data.fake_subfolder_data), # pagelist folder
555 ]
557 self._driver.unmanage(share_to_unmanage)
559 mock_execute.assert_called_once_with(
560 ['pagelist', 'folder', self.pool_path[0]],
561 )
563 @mock.patch.object(infortrend_nas.LOG, 'warning')
564 def test_unmanage_share_not_exist(self, log_warning):
565 self._get_driver(self.fake_conf, True)
566 self._iftnas._execute = mock.Mock(
567 return_value=(0, self.nas_data.fake_subfolder_data))
569 self._driver.unmanage(
570 self.m_data.fake_share_for_manage_nfs,
571 )
573 self.assertEqual(1, log_warning.call_count)