Coverage for manila/tests/share/drivers/qnap/test_qnap.py: 99%
654 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2016 QNAP Systems, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16from http import client as http_client
17import time
18from unittest import mock
20import ddt
21from defusedxml import ElementTree as etree
22from eventlet import greenthread
23from oslo_config import cfg
25from manila import exception
26from manila.share.drivers.qnap import api
27from manila.share.drivers.qnap import qnap
28from manila.share import share_types
29from manila import test
30from manila.tests import fake_share
31from manila.tests.share.drivers.qnap import fakes
34CONF = cfg.CONF
37def create_configuration(management_url, qnap_share_ip, qnap_nas_login,
38 qnap_nas_password, qnap_poolname):
39 """Create configuration."""
40 configuration = mock.Mock()
41 configuration.qnap_management_url = management_url
42 configuration.qnap_share_ip = qnap_share_ip
43 configuration.qnap_nas_login = qnap_nas_login
44 configuration.qnap_nas_password = qnap_nas_password
45 configuration.qnap_poolname = qnap_poolname
46 configuration.safe_get.return_value = False
47 return configuration
50class QnapShareDriverBaseTestCase(test.TestCase):
51 """Base Class for the QnapShareDriver Tests."""
53 def setUp(self):
54 """Setup the Qnap Driver Base TestCase."""
55 super(QnapShareDriverBaseTestCase, self).setUp()
56 self.driver = None
57 self.share_api = None
59 def _do_setup(self, management_url, share_ip, nas_login,
60 nas_password, poolname, **kwargs):
61 """Config do setup configurations."""
62 self.driver = qnap.QnapShareDriver(
63 configuration=create_configuration(
64 management_url,
65 share_ip,
66 nas_login,
67 nas_password,
68 poolname),
69 private_storage=kwargs.get('private_storage'))
70 self.driver.do_setup('context')
73@ddt.ddt
74class QnapShareDriverLoginTestCase(QnapShareDriverBaseTestCase):
75 """Tests do_setup api."""
77 def setUp(self):
78 """Setup the Qnap Share Driver login TestCase."""
79 super(QnapShareDriverLoginTestCase, self).setUp()
80 self.mock_object(http_client, 'HTTPConnection')
81 self.mock_object(http_client, 'HTTPSConnection')
83 @ddt.unpack
84 @ddt.data({'mng_url': 'http://1.2.3.4:8080', 'port': '8080', 'ssl': False},
85 {'mng_url': 'https://1.2.3.4:443', 'port': '443', 'ssl': True})
86 def test_do_setup_positive(self, mng_url, port, ssl):
87 """Test do_setup with http://1.2.3.4:8080."""
88 fake_login_response = fakes.FakeLoginResponse()
89 fake_get_basic_info_response_es = (
90 fakes.FakeGetBasicInfoResponseEs_1_1_3())
91 if ssl:
92 mock_connection = http_client.HTTPSConnection
93 else:
94 mock_connection = http_client.HTTPConnection
95 mock_connection.return_value.getresponse.side_effect = [
96 fake_login_response,
97 fake_get_basic_info_response_es,
98 fake_login_response]
100 self._do_setup(mng_url, '1.2.3.4', 'admin',
101 'qnapadmin', 'Storage Pool 1')
103 self.assertEqual(
104 mng_url,
105 self.driver.configuration.qnap_management_url)
106 self.assertEqual(
107 '1.2.3.4', self.driver.configuration.qnap_share_ip)
108 self.assertEqual(
109 'admin', self.driver.configuration.qnap_nas_login)
110 self.assertEqual(
111 'qnapadmin', self.driver.configuration.qnap_nas_password)
112 self.assertEqual(
113 'Storage Pool 1', self.driver.configuration.qnap_poolname)
114 self.assertEqual('fakeSid', self.driver.api_executor.sid)
115 self.assertEqual('admin', self.driver.api_executor.username)
116 self.assertEqual('qnapadmin', self.driver.api_executor.password)
117 self.assertEqual('1.2.3.4', self.driver.api_executor.ip)
118 self.assertEqual(port, self.driver.api_executor.port)
119 self.assertEqual(ssl, self.driver.api_executor.ssl)
121 @ddt.data(fakes.FakeGetBasicInfoResponseTs_4_3_0(),
122 fakes.FakeGetBasicInfoResponseTesTs_4_3_0(),
123 fakes.FakeGetBasicInfoResponseTesEs_1_1_3())
124 def test_do_setup_positive_with_diff_nas(self, fake_basic_info):
125 """Test do_setup with different NAS model."""
126 fake_login_response = fakes.FakeLoginResponse()
127 mock_connection = http_client.HTTPSConnection
128 mock_connection.return_value.getresponse.side_effect = [
129 fake_login_response,
130 fake_basic_info,
131 fake_login_response]
133 self._do_setup('https://1.2.3.4:443', '1.2.3.4', 'admin',
134 'qnapadmin', 'Storage Pool 1')
136 self.assertEqual('fakeSid', self.driver.api_executor.sid)
137 self.assertEqual('admin', self.driver.api_executor.username)
138 self.assertEqual('qnapadmin', self.driver.api_executor.password)
139 self.assertEqual('1.2.3.4', self.driver.api_executor.ip)
140 self.assertEqual('443', self.driver.api_executor.port)
141 self.assertTrue(self.driver.api_executor.ssl)
143 @ddt.data({
144 'fake_basic_info': fakes.FakeGetBasicInfoResponseTs_4_3_0(),
145 'expect_result': api.QnapAPIExecutorTS
146 }, {
147 'fake_basic_info': fakes.FakeGetBasicInfoResponseTesTs_4_3_0(),
148 'expect_result': api.QnapAPIExecutorTS
149 }, {
150 'fake_basic_info': fakes.FakeGetBasicInfoResponseTesEs_1_1_3(),
151 'expect_result': api.QnapAPIExecutor
152 }, {
153 'fake_basic_info': fakes.FakeGetBasicInfoResponseTesEs_2_0_0(),
154 'expect_result': api.QnapAPIExecutor
155 }, {
156 'fake_basic_info': fakes.FakeGetBasicInfoResponseTesEs_2_1_0(),
157 'expect_result': api.QnapAPIExecutor
158 }, {
159 'fake_basic_info': fakes.FakeGetBasicInfoResponseEs_1_1_3(),
160 'expect_result': api.QnapAPIExecutor
161 }, {
162 'fake_basic_info': fakes.FakeGetBasicInfoResponseEs_2_0_0(),
163 'expect_result': api.QnapAPIExecutor
164 }, {
165 'fake_basic_info': fakes.FakeGetBasicInfoResponseEs_2_1_0(),
166 'expect_result': api.QnapAPIExecutor
167 })
168 @ddt.unpack
169 def test_create_api_executor(self, fake_basic_info, expect_result):
170 """Test do_setup with different NAS model."""
171 fake_login_response = fakes.FakeLoginResponse()
172 mock_connection = http_client.HTTPSConnection
173 mock_connection.return_value.getresponse.side_effect = [
174 fake_login_response,
175 fake_basic_info,
176 fake_login_response]
177 self._do_setup('https://1.2.3.4:443', '1.2.3.4', 'admin',
178 'qnapadmin', 'Storage Pool 1')
179 self.assertIsInstance(self.driver.api_executor, expect_result)
181 @ddt.data({
182 'fake_basic_info': fakes.FakeGetBasicInfoResponseTs_4_0_0(),
183 'expect_result': exception.ShareBackendException
184 }, {
185 'fake_basic_info': fakes.FakeGetBasicInfoResponseTesTs_4_0_0(),
186 'expect_result': exception.ShareBackendException
187 }, {
188 'fake_basic_info': fakes.FakeGetBasicInfoResponseTesEs_1_1_1(),
189 'expect_result': exception.ShareBackendException
190 }, {
191 'fake_basic_info': fakes.FakeGetBasicInfoResponseTesEs_2_2_0(),
192 'expect_result': exception.ShareBackendException
193 }, {
194 'fake_basic_info': fakes.FakeGetBasicInfoResponseEs_1_1_1(),
195 'expect_result': exception.ShareBackendException
196 }, {
197 'fake_basic_info': fakes.FakeGetBasicInfoResponseEs_2_2_0(),
198 'expect_result': exception.ShareBackendException
199 })
200 @ddt.unpack
201 def test_create_api_executor_negative(self,
202 fake_basic_info, expect_result):
203 """Test do_setup with different NAS model."""
204 fake_login_response = fakes.FakeLoginResponse()
205 mock_connection = http_client.HTTPSConnection
206 mock_connection.return_value.getresponse.side_effect = [
207 fake_login_response,
208 fake_basic_info,
209 fake_login_response]
210 self.assertRaises(
211 exception.ShareBackendException,
212 self._do_setup,
213 'https://1.2.3.4:443',
214 '1.2.3.4',
215 'admin',
216 'qnapadmin',
217 'Storage Pool 1')
219 def test_do_setup_with_exception(self):
220 """Test do_setup with exception."""
221 fake_login_response = fakes.FakeLoginResponse()
222 fake_get_basic_info_response_error = (
223 fakes.FakeGetBasicInfoResponseError())
224 mock_connection = http_client.HTTPSConnection
225 mock_connection.return_value.getresponse.side_effect = [
226 fake_login_response,
227 fake_get_basic_info_response_error,
228 fake_login_response]
230 self.driver = qnap.QnapShareDriver(
231 configuration=create_configuration(
232 'https://1.2.3.4:443', '1.2.3.4', 'admin',
233 'qnapadmin', 'Pool1'))
234 self.assertRaises(
235 exception.ShareBackendException,
236 self.driver.do_setup,
237 context='context')
239 def test_check_for_setup_error(self):
240 """Test do_setup with exception."""
241 self.driver = qnap.QnapShareDriver(
242 configuration=create_configuration(
243 'https://1.2.3.4:443', '1.2.3.4', 'admin',
244 'qnapadmin', 'Pool1'))
245 self.assertRaises(
246 exception.ShareBackendException,
247 self.driver.check_for_setup_error)
250@ddt.ddt
251class QnapShareDriverTestCase(QnapShareDriverBaseTestCase):
252 """Tests share driver functions."""
254 def setUp(self):
255 """Setup the Qnap Driver Base TestCase."""
256 super(QnapShareDriverTestCase, self).setUp()
257 self.mock_object(qnap.QnapShareDriver, '_create_api_executor')
258 self.share = fake_share.fake_share(
259 share_proto='NFS',
260 id='shareId',
261 display_name='fakeDisplayName',
262 export_locations=[{'path': '1.2.3.4:/share/fakeShareName'}],
263 host='QnapShareDriver',
264 size=10)
266 def get_share_info_return_value(self):
267 """Return the share info form get_share_info method."""
268 root = etree.fromstring(fakes.FAKE_RES_DETAIL_DATA_SHARE_INFO)
270 share_list = root.find('Volume_Info')
271 share_info_tree = share_list.findall('row')
272 for share in share_info_tree: 272 ↛ exitline 272 didn't return from function 'get_share_info_return_value' because the loop on line 272 didn't complete
273 return share
275 def get_snapshot_info_return_value(self):
276 """Return the snapshot info form get_snapshot_info method."""
277 root = etree.fromstring(fakes.FAKE_RES_DETAIL_DATA_SNAPSHOT)
279 snapshot_list = root.find('SnapshotList')
280 snapshot_info_tree = snapshot_list.findall('row')
281 for snapshot in snapshot_info_tree: 281 ↛ exitline 281 didn't return from function 'get_snapshot_info_return_value' because the loop on line 281 didn't complete
282 return snapshot
284 def get_specific_volinfo_return_value(self):
285 """Return the volume info form get_specific_volinfo method."""
286 root = etree.fromstring(fakes.FAKE_RES_DETAIL_DATA_VOLUME_INFO)
288 volume_list = root.find('Volume_Info')
289 volume_info_tree = volume_list.findall('row')
290 for volume in volume_info_tree: 290 ↛ exitline 290 didn't return from function 'get_specific_volinfo_return_value' because the loop on line 290 didn't complete
291 return volume
293 def get_specific_poolinfo_return_value(self):
294 """Get specific pool info."""
295 root = etree.fromstring(fakes.FAKE_RES_DETAIL_DATA_SPECIFIC_POOL_INFO)
297 pool_list = root.find('Pool_Index')
298 pool_info_tree = pool_list.findall('row')
299 for pool in pool_info_tree: 299 ↛ exitline 299 didn't return from function 'get_specific_poolinfo_return_value' because the loop on line 299 didn't complete
300 return pool
302 def get_host_list_return_value(self):
303 """Get host list."""
304 root = etree.fromstring(fakes.FAKE_RES_DETAIL_DATA_GET_HOST_LIST)
306 hosts = []
307 host_list = root.find('host_list')
308 host_tree = host_list.findall('host')
309 for host in host_tree:
310 hosts.append(host)
312 return hosts
314 @ddt.data({
315 'fake_extra_spec': {},
316 'expect_extra_spec': {
317 'qnap_thin_provision': True,
318 'qnap_compression': True,
319 'qnap_deduplication': False,
320 'qnap_ssd_cache': False
321 }
322 }, {
323 'fake_extra_spec': {
324 'thin_provisioning': u'true',
325 'compression': u'true',
326 'qnap_ssd_cache': u'true'
327 },
328 'expect_extra_spec': {
329 'qnap_thin_provision': True,
330 'qnap_compression': True,
331 'qnap_deduplication': False,
332 'qnap_ssd_cache': True
333 }
334 }, {
335 'fake_extra_spec': {
336 'thin_provisioning': u'<is> False',
337 'compression': u'<is> True',
338 'qnap_ssd_cache': u'<is> True'
339 },
340 'expect_extra_spec': {
341 'qnap_thin_provision': False,
342 'qnap_compression': True,
343 'qnap_deduplication': False,
344 'qnap_ssd_cache': True
345 }
346 }, {
347 'fake_extra_spec': {
348 'thin_provisioning': u'true',
349 'dedupe': u'<is> True',
350 'qnap_ssd_cache': u'False'
351 },
352 'expect_extra_spec': {
353 'qnap_thin_provision': True,
354 'qnap_compression': True,
355 'qnap_deduplication': True,
356 'qnap_ssd_cache': False
357 }
358 }, {
359 'fake_extra_spec': {
360 'thin_provisioning': u'<is> False',
361 'compression': u'false',
362 'dedupe': u'<is> False',
363 'qnap_ssd_cache': u'<is> False'
364 },
365 'expect_extra_spec': {
366 'qnap_thin_provision': False,
367 'qnap_compression': False,
368 'qnap_deduplication': False,
369 'qnap_ssd_cache': False
370 }
371 })
372 @ddt.unpack
373 @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
374 @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
375 def test_create_share_positive(
376 self,
377 mock_gen_random_name,
378 mock_get_location_path,
379 fake_extra_spec, expect_extra_spec):
380 """Test create share."""
381 mock_api_executor = qnap.QnapShareDriver._create_api_executor
382 mock_api_executor.return_value.get_share_info.side_effect = [
383 None, self.get_share_info_return_value()]
384 mock_gen_random_name.return_value = 'fakeShareName'
385 mock_api_executor.return_value.create_share.return_value = (
386 'fakeCreateShareId')
387 mock_get_location_path.return_value = None
388 mock_private_storage = mock.Mock()
389 self.mock_object(greenthread, 'sleep')
390 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
391 'qnapadmin', 'Storage Pool 1',
392 private_storage=mock_private_storage)
393 self.mock_object(share_types, 'get_extra_specs_from_share',
394 mock.Mock(return_value=fake_extra_spec))
395 self.driver.create_share('context', self.share)
397 mock_api_return = mock_api_executor.return_value
398 expected_call_list = [
399 mock.call('Storage Pool 1', vol_label='fakeShareName'),
400 mock.call('Storage Pool 1', vol_label='fakeShareName')]
401 self.assertEqual(
402 expected_call_list,
403 mock_api_return.get_share_info.call_args_list)
404 mock_api_executor.return_value.create_share.assert_called_once_with(
405 self.share,
406 self.driver.configuration.qnap_poolname,
407 'fakeShareName',
408 'NFS',
409 **expect_extra_spec)
410 mock_get_location_path.assert_called_once_with(
411 'fakeShareName', 'NFS', '1.2.3.4', 'fakeNo')
413 @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
414 @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
415 def test_create_share_negative_share_exist(
416 self,
417 mock_gen_random_name,
418 mock_get_location_path):
419 """Test create share."""
420 mock_api_executor = qnap.QnapShareDriver._create_api_executor
421 mock_api_executor.return_value.get_share_info.return_value = (
422 self.get_share_info_return_value())
423 mock_gen_random_name.return_value = 'fakeShareName'
424 mock_get_location_path.return_value = None
425 mock_private_storage = mock.Mock()
426 self.mock_object(time, 'sleep')
427 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
428 'qnapadmin', 'Storage Pool 1',
429 private_storage=mock_private_storage)
430 self.mock_object(share_types, 'get_extra_specs_from_share',
431 mock.Mock(return_value={}))
433 self.assertRaises(
434 exception.ShareBackendException,
435 self.driver.create_share,
436 context='context',
437 share=self.share)
439 @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
440 @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
441 def test_create_share_negative_create_fail(
442 self,
443 mock_gen_random_name,
444 mock_get_location_path):
445 """Test create share."""
446 mock_api_executor = qnap.QnapShareDriver._create_api_executor
447 mock_api_executor.return_value.get_share_info.return_value = None
448 mock_gen_random_name.return_value = 'fakeShareName'
449 mock_get_location_path.return_value = None
450 mock_private_storage = mock.Mock()
451 self.mock_object(time, 'sleep')
452 self.mock_object(greenthread, 'sleep')
453 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
454 'qnapadmin', 'Storage Pool 1',
455 private_storage=mock_private_storage)
456 self.mock_object(share_types, 'get_extra_specs_from_share',
457 mock.Mock(return_value={}))
459 self.assertRaises(
460 exception.ShareBackendException,
461 self.driver.create_share,
462 context='context',
463 share=self.share)
465 @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
466 @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
467 def test_create_share_negative_configutarion(
468 self,
469 mock_gen_random_name,
470 mock_get_location_path):
471 """Test create share."""
472 mock_api_executor = qnap.QnapShareDriver._create_api_executor
473 mock_api_executor.return_value.get_share_info.side_effect = [
474 None, self.get_share_info_return_value()]
475 mock_gen_random_name.return_value = 'fakeShareName'
476 mock_get_location_path.return_value = None
477 mock_private_storage = mock.Mock()
479 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
480 'qnapadmin', 'Storage Pool 1',
481 private_storage=mock_private_storage)
482 self.mock_object(share_types, 'get_extra_specs_from_share',
483 mock.Mock(return_value={
484 'dedupe': 'true',
485 'thin_provisioning': 'false'}))
487 self.assertRaises(
488 exception.InvalidExtraSpec,
489 self.driver.create_share,
490 context='context',
491 share=self.share)
493 def test_delete_share_positive(self):
494 """Test delete share with fake_share."""
495 mock_api_executor = qnap.QnapShareDriver._create_api_executor
496 mock_api_executor.return_value.get_share_info.return_value = (
497 self.get_share_info_return_value())
498 mock_api_executor.return_value.delete_share.return_value = (
499 'fakeCreateShareId')
500 mock_private_storage = mock.Mock()
501 mock_private_storage.get.return_value = 'fakeVolNo'
503 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
504 'qnapadmin', 'Storage Pool 1',
505 private_storage=mock_private_storage)
506 self.driver.delete_share('context', self.share, share_server=None)
508 mock_api_executor.return_value.get_share_info.assert_called_once_with(
509 'Storage Pool 1', vol_no='fakeVolNo')
510 mock_api_executor.return_value.delete_share.assert_called_once_with(
511 'fakeNo')
513 def test_delete_share_no_volid(self):
514 """Test delete share with fake_share and no volID."""
515 mock_private_storage = mock.Mock()
516 mock_private_storage.get.return_value = None
518 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
519 'qnapadmin', 'Storage Pool 1',
520 private_storage=mock_private_storage)
521 self.driver.delete_share('context', self.share, share_server=None)
523 mock_private_storage.get.assert_called_once_with(
524 'shareId', 'volID')
526 def test_delete_share_no_delete_share(self):
527 """Test delete share with fake_share."""
528 mock_api_executor = qnap.QnapShareDriver._create_api_executor
529 mock_api_executor.return_value.get_share_info.return_value = None
530 mock_api_executor.return_value.delete_share.return_value = (
531 'fakeCreateShareId')
532 mock_private_storage = mock.Mock()
533 mock_private_storage.get.return_value = 'fakeVolNo'
535 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
536 'qnapadmin', 'Storage Pool 1',
537 private_storage=mock_private_storage)
538 self.driver.delete_share('context', self.share, share_server=None)
540 mock_api_executor.return_value.get_share_info.assert_called_once_with(
541 'Storage Pool 1', vol_no='fakeVolNo')
543 def test_extend_share(self):
544 """Test extend share with fake_share."""
545 mock_api_executor = qnap.QnapShareDriver._create_api_executor
546 mock_api_executor.return_value.get_share_info.return_value = (
547 self.get_share_info_return_value())
548 mock_api_executor.return_value.edit_share.return_value = None
549 mock_private_storage = mock.Mock()
550 mock_private_storage.get.side_effect = [
551 'fakeVolName',
552 'True',
553 'True',
554 'False',
555 'False']
557 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
558 'qnapadmin', 'Storage Pool 1',
559 private_storage=mock_private_storage)
560 self.driver.extend_share(self.share, 100, share_server=None)
562 expect_share_dict = {
563 'sharename': 'fakeVolName',
564 'old_sharename': 'fakeVolName',
565 'new_size': 100,
566 'thin_provision': True,
567 'compression': True,
568 'deduplication': False,
569 'ssd_cache': False,
570 'share_proto': 'NFS'
571 }
572 mock_api_executor.return_value.edit_share.assert_called_once_with(
573 expect_share_dict)
575 def test_extend_share_without_share_name(self):
576 """Test extend share without share name."""
577 mock_private_storage = mock.Mock()
578 mock_private_storage.get.return_value = None
580 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
581 'qnapadmin', 'Storage Pool 1',
582 private_storage=mock_private_storage)
583 self.assertRaises(
584 exception.ShareResourceNotFound,
585 self.driver.extend_share,
586 share=self.share,
587 new_size=100,
588 share_server=None)
590 @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
591 def test_create_snapshot(
592 self,
593 mock_gen_random_name):
594 """Test create snapshot with fake_snapshot."""
595 fake_snapshot = fakes.SnapshotClass(
596 10, 'fakeShareName@fakeSnapshotName')
598 mock_gen_random_name.return_value = 'fakeSnapshotName'
599 mock_api_executor = qnap.QnapShareDriver._create_api_executor
600 mock_api_executor.return_value.get_snapshot_info.side_effect = [
601 None, self.get_snapshot_info_return_value()]
602 mock_api_executor.return_value.create_snapshot_api.return_value = (
603 'fakeCreateShareId')
604 mock_private_storage = mock.Mock()
605 mock_private_storage.get.return_value = 'fakeVolId'
607 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
608 'qnapadmin', 'Storage Pool 1',
609 private_storage=mock_private_storage)
610 self.driver.create_snapshot(
611 'context', fake_snapshot, share_server=None)
613 mock_api_return = mock_api_executor.return_value
614 expected_call_list = [
615 mock.call(volID='fakeVolId', snapshot_name='fakeSnapshotName'),
616 mock.call(volID='fakeVolId', snapshot_name='fakeSnapshotName')]
617 self.assertEqual(
618 expected_call_list,
619 mock_api_return.get_snapshot_info.call_args_list)
621 mock_api_return.create_snapshot_api.assert_called_once_with(
622 'fakeVolId', 'fakeSnapshotName')
624 def test_create_snapshot_without_volid(self):
625 """Test create snapshot with fake_snapshot."""
626 fake_snapshot = fakes.SnapshotClass(10, None)
628 mock_private_storage = mock.Mock()
629 mock_private_storage.get.return_value = None
631 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
632 'qnapadmin', 'Storage Pool 1',
633 private_storage=mock_private_storage)
634 self.assertRaises(
635 exception.ShareResourceNotFound,
636 self.driver.create_snapshot,
637 context='context',
638 snapshot=fake_snapshot,
639 share_server=None)
641 def test_delete_snapshot(self):
642 """Test delete snapshot with fakeSnapshot."""
643 fake_snapshot = fakes.SnapshotClass(
644 10, 'fakeShareName@fakeSnapshotName')
646 mock_api_executor = qnap.QnapShareDriver._create_api_executor
647 mock_api_executor.return_value.delete_snapshot_api.return_value = (
648 'fakeCreateShareId')
649 mock_private_storage = mock.Mock()
650 mock_private_storage.get.return_value = 'fakeSnapshotId'
652 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
653 'qnapadmin', 'Storage Pool 1',
654 private_storage=mock_private_storage)
655 self.driver.delete_snapshot(
656 'context', fake_snapshot, share_server=None)
658 mock_api_return = mock_api_executor.return_value
659 mock_api_return.delete_snapshot_api.assert_called_once_with(
660 'fakeShareName@fakeSnapshotName')
662 def test_delete_snapshot_without_snapshot_id(self):
663 """Test delete snapshot with fakeSnapshot and no snapshot id."""
664 fake_snapshot = fakes.SnapshotClass(10, None)
666 mock_private_storage = mock.Mock()
667 mock_private_storage.get.return_value = None
669 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
670 'qnapadmin', 'Storage Pool 1',
671 private_storage=mock_private_storage)
672 self.driver.delete_snapshot(
673 'context', fake_snapshot, share_server=None)
675 mock_private_storage.get.assert_called_once_with(
676 'fakeSnapshotId', 'snapshot_id')
678 @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
679 @mock.patch('manila.share.API')
680 @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
681 def test_create_share_from_snapshot(
682 self,
683 mock_gen_random_name,
684 mock_share_api,
685 mock_get_location_path):
686 """Test create share from snapshot."""
687 fake_snapshot = fakes.SnapshotClass(
688 10, 'fakeShareName@fakeSnapshotName')
690 mock_api_executor = qnap.QnapShareDriver._create_api_executor
691 mock_gen_random_name.return_value = 'fakeShareName'
692 mock_api_executor.return_value.get_share_info.side_effect = [
693 None, self.get_share_info_return_value()]
694 mock_private_storage = mock.Mock()
695 mock_private_storage.get.return_value = 'fakeSnapshotId'
696 mock_share_api.return_value.get.return_value = {'size': 10}
698 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
699 'qnapadmin', 'Storage Pool 1',
700 private_storage=mock_private_storage)
701 self.driver.create_share_from_snapshot(
702 'context', self.share, fake_snapshot, share_server=None)
704 mock_gen_random_name.assert_called_once_with(
705 'share')
706 mock_api_return = mock_api_executor.return_value
707 expected_call_list = [
708 mock.call('Storage Pool 1', vol_label='fakeShareName'),
709 mock.call('Storage Pool 1', vol_label='fakeShareName')]
710 self.assertEqual(
711 expected_call_list,
712 mock_api_return.get_share_info.call_args_list)
713 mock_api_return.clone_snapshot.assert_called_once_with(
714 'fakeShareName@fakeSnapshotName', 'fakeShareName', 10)
716 @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
717 @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
718 def test_create_share_from_snapshot_diff_size(
719 self,
720 mock_gen_random_name,
721 mock_get_location_path):
722 """Test create share from snapshot."""
723 fake_snapshot = fakes.SnapshotClass(
724 10, 'fakeShareName@fakeSnapshotName')
726 mock_gen_random_name.return_value = 'fakeShareName'
727 mock_api_executor = qnap.QnapShareDriver._create_api_executor
728 mock_api_executor.return_value.get_share_info.side_effect = [
729 None, self.get_share_info_return_value()]
730 mock_private_storage = mock.Mock()
731 mock_private_storage.get.side_effect = [
732 'True',
733 'True',
734 'False',
735 'False',
736 'fakeVolName']
738 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
739 'qnapadmin', 'Storage Pool 1',
740 private_storage=mock_private_storage)
741 self.driver.create_share_from_snapshot(
742 'context', self.share, fake_snapshot, share_server=None)
744 mock_gen_random_name.assert_called_once_with(
745 'share')
746 mock_api_return = mock_api_executor.return_value
747 expected_call_list = [
748 mock.call('Storage Pool 1', vol_label='fakeShareName'),
749 mock.call('Storage Pool 1', vol_label='fakeShareName')]
750 self.assertEqual(
751 expected_call_list,
752 mock_api_return.get_share_info.call_args_list)
753 mock_api_return.clone_snapshot.assert_called_once_with(
754 'fakeShareName@fakeSnapshotName', 'fakeShareName', 10)
756 def test_create_share_from_snapshot_without_snapshot_id(self):
757 """Test create share from snapshot."""
758 fake_snapshot = fakes.SnapshotClass(10, None)
760 mock_private_storage = mock.Mock()
761 mock_private_storage.get.return_value = None
763 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
764 'qnapadmin', 'Storage Pool 1',
765 private_storage=mock_private_storage)
766 self.assertRaises(
767 exception.SnapshotResourceNotFound,
768 self.driver.create_share_from_snapshot,
769 context='context',
770 share=self.share,
771 snapshot=fake_snapshot,
772 share_server=None)
774 @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
775 @mock.patch('manila.share.API')
776 @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
777 def test_create_share_from_snapshot_negative_name_exist(
778 self,
779 mock_gen_random_name,
780 mock_share_api,
781 mock_get_location_path):
782 """Test create share from snapshot."""
783 fake_snapshot = fakes.SnapshotClass(
784 10, 'fakeShareName@fakeSnapshotName')
786 mock_api_executor = qnap.QnapShareDriver._create_api_executor
787 mock_gen_random_name.return_value = 'fakeShareName'
788 mock_api_executor.return_value.get_share_info.return_value = (
789 self.get_share_info_return_value())
790 mock_private_storage = mock.Mock()
791 mock_private_storage.get.return_value = 'fakeSnapshotId'
792 mock_share_api.return_value.get.return_value = {'size': 10}
793 self.mock_object(time, 'sleep')
794 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
795 'qnapadmin', 'Storage Pool 1',
796 private_storage=mock_private_storage)
797 self.assertRaises(
798 exception.ShareBackendException,
799 self.driver.create_share_from_snapshot,
800 context='context',
801 share=self.share,
802 snapshot=fake_snapshot,
803 share_server=None)
805 @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
806 @mock.patch('manila.share.API')
807 @mock.patch.object(qnap.QnapShareDriver, '_gen_random_name')
808 def test_create_share_from_snapshot_negative_clone_fail(
809 self,
810 mock_gen_random_name,
811 mock_share_api,
812 mock_get_location_path):
813 """Test create share from snapshot."""
814 fake_snapshot = fakes.SnapshotClass(
815 10, 'fakeShareName@fakeSnapshotName')
817 mock_api_executor = qnap.QnapShareDriver._create_api_executor
818 mock_gen_random_name.return_value = 'fakeShareName'
819 mock_api_executor.return_value.get_share_info.return_value = None
820 mock_private_storage = mock.Mock()
821 mock_private_storage.get.return_value = 'fakeSnapshotId'
822 mock_share_api.return_value.get.return_value = {'size': 10}
823 self.mock_object(time, 'sleep')
824 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
825 'qnapadmin', 'Storage Pool 1',
826 private_storage=mock_private_storage)
827 self.assertRaises(
828 exception.ShareBackendException,
829 self.driver.create_share_from_snapshot,
830 context='context',
831 share=self.share,
832 snapshot=fake_snapshot,
833 share_server=None)
835 @mock.patch.object(qnap.QnapShareDriver, '_get_timestamp_from_vol_name')
836 @mock.patch.object(qnap.QnapShareDriver, '_allow_access')
837 @ddt.data('fakeHostName', 'fakeHostNameNotMatch')
838 def test_update_access_allow_access(
839 self, fakeHostName, mock_allow_access,
840 mock_get_timestamp_from_vol_name):
841 """Test update access with allow access rules."""
842 mock_private_storage = mock.Mock()
843 mock_private_storage.get.return_value = 'fakeVolName'
844 mock_api_executor = qnap.QnapShareDriver._create_api_executor
845 mock_api_executor.return_value.get_host_list.return_value = (
846 self.get_host_list_return_value())
847 mock_api_executor.return_value.set_nfs_access.return_value = None
848 mock_api_executor.return_value.delete_host.return_value = None
849 mock_allow_access.return_value = None
850 mock_get_timestamp_from_vol_name.return_value = fakeHostName
852 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
853 'qnapadmin', 'Storage Pool 1',
854 private_storage=mock_private_storage)
855 self.driver.update_access(
856 'context', self.share, 'access_rules',
857 None, None, None, share_server=None)
859 mock_api_executor.return_value.set_nfs_access.assert_called_once_with(
860 'fakeVolName', 2, 'all')
862 @mock.patch.object(qnap.QnapShareDriver, '_allow_access')
863 @mock.patch.object(qnap.QnapShareDriver, '_deny_access')
864 def test_update_access_deny_and_allow_access(
865 self,
866 mock_deny_access,
867 mock_allow_access):
868 """Test update access with deny and allow access rules."""
869 mock_private_storage = mock.Mock()
870 mock_private_storage.get.return_value = 'fakeVolName'
871 mock_deny_access.return_value = None
872 mock_allow_access.return_value = None
874 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
875 'qnapadmin', 'Storage Pool 1',
876 private_storage=mock_private_storage)
877 delete_rules = []
878 delete_rules.append('access1')
879 add_rules = []
880 add_rules.append('access1')
881 update_rules = []
882 self.driver.update_access(
883 'context', self.share, None,
884 add_rules, delete_rules, update_rules, share_server=None)
886 mock_deny_access.assert_called_once_with(
887 'context', self.share, 'access1', None)
888 mock_allow_access.assert_called_once_with(
889 'context', self.share, 'access1', None)
891 def test_update_access_without_volname(self):
892 """Test update access without volName."""
893 mock_private_storage = mock.Mock()
894 mock_private_storage.get.return_value = None
896 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
897 'qnapadmin', 'Storage Pool 1',
898 private_storage=mock_private_storage)
899 self.assertRaises(
900 exception.ShareResourceNotFound,
901 self.driver.update_access,
902 context='context',
903 share=self.share,
904 access_rules='access_rules',
905 add_rules=None,
906 delete_rules=None,
907 update_rules=None,
908 share_server=None)
910 @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
911 def test_manage_existing_nfs(
912 self,
913 mock_get_location_path):
914 """Test manage existing."""
915 mock_api_executor = qnap.QnapShareDriver._create_api_executor
916 mock_api_executor.return_value.get_share_info.return_value = (
917 self.get_share_info_return_value())
918 mock_private_storage = mock.Mock()
919 mock_private_storage.update.return_value = None
920 mock_private_storage.get.side_effect = [
921 'fakeVolId',
922 'fakeVolName']
923 mock_api_executor.return_value.get_specific_volinfo.return_value = (
924 self.get_specific_volinfo_return_value())
925 mock_api_executor.return_value.get_share_info.return_value = (
926 self.get_share_info_return_value())
927 mock_get_location_path.return_value = None
929 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
930 'qnapadmin', 'Storage Pool 1',
931 private_storage=mock_private_storage)
932 self.mock_object(share_types, 'get_extra_specs_from_share',
933 mock.Mock(return_value={}))
934 self.driver.manage_existing(self.share, 'driver_options')
936 mock_api_return = mock_api_executor.return_value
937 mock_api_return.get_share_info.assert_called_once_with(
938 'Storage Pool 1', vol_label='fakeShareName')
939 mock_api_return.get_specific_volinfo.assert_called_once_with(
940 'fakeNo')
941 mock_get_location_path.assert_called_once_with(
942 'fakeShareName', 'NFS', '1.2.3.4', 'fakeNo')
944 @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
945 def test_manage_existing_nfs_negative_configutarion(
946 self,
947 mock_get_location_path):
948 """Test manage existing."""
949 mock_api_executor = qnap.QnapShareDriver._create_api_executor
950 mock_api_executor.return_value.get_share_info.return_value = (
951 self.get_share_info_return_value())
952 mock_private_storage = mock.Mock()
953 mock_private_storage.update.return_value = None
954 mock_private_storage.get.side_effect = [
955 'fakeVolId',
956 'fakeVolName']
957 mock_api_executor.return_value.get_specific_volinfo.return_value = (
958 self.get_specific_volinfo_return_value())
959 mock_api_executor.return_value.get_share_info.return_value = (
960 self.get_share_info_return_value())
961 mock_get_location_path.return_value = None
963 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
964 'qnapadmin', 'Storage Pool 1',
965 private_storage=mock_private_storage)
966 self.mock_object(share_types, 'get_extra_specs_from_share',
967 mock.Mock(return_value={
968 'dedupe': 'true',
969 'thin_provisioning': 'false'}))
971 self.assertRaises(
972 exception.InvalidExtraSpec,
973 self.driver.manage_existing,
974 share=self.share,
975 driver_options='driver_options')
977 def test_manage_invalid_protocol(self):
978 """Test manage existing."""
979 share = fake_share.fake_share(
980 share_proto='fakeProtocol',
981 id='fakeId',
982 display_name='fakeDisplayName',
983 export_locations=[{'path': ''}],
984 host='QnapShareDriver',
985 size=10)
987 mock_private_storage = mock.Mock()
989 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
990 'qnapadmin', 'Storage Pool 1',
991 private_storage=mock_private_storage)
992 self.assertRaises(
993 exception.InvalidInput,
994 self.driver.manage_existing,
995 share=share,
996 driver_options='driver_options')
998 def test_manage_existing_nfs_without_export_locations(self):
999 share = fake_share.fake_share(
1000 share_proto='NFS',
1001 id='fakeId',
1002 display_name='fakeDisplayName',
1003 export_locations=[{'path': ''}],
1004 host='QnapShareDriver',
1005 size=10)
1007 mock_private_storage = mock.Mock()
1009 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1010 'qnapadmin', 'Storage Pool 1',
1011 private_storage=mock_private_storage)
1012 self.assertRaises(
1013 exception.ShareBackendException,
1014 self.driver.manage_existing,
1015 share=share,
1016 driver_options='driver_options')
1018 @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
1019 def test_manage_existing_nfs_ip_not_equel_share_ip(
1020 self,
1021 mock_get_location_path):
1022 """Test manage existing with nfs ip not equel to share ip."""
1023 mock_api_executor = qnap.QnapShareDriver._create_api_executor
1024 mock_api_executor.return_value.get_share_info.return_value = (
1025 self.get_share_info_return_value())
1026 mock_private_storage = mock.Mock()
1027 mock_private_storage.update.return_value = None
1028 mock_private_storage.get.side_effect = [
1029 'fakeVolId',
1030 'fakeVolName']
1031 mock_api_executor.return_value.get_specific_volinfo.return_value = (
1032 self.get_specific_volinfo_return_value())
1033 mock_api_executor.return_value.get_share_info.return_value = (
1034 self.get_share_info_return_value())
1035 mock_get_location_path.return_value = None
1037 self._do_setup('http://1.2.3.4:8080', '1.1.1.1', 'admin',
1038 'qnapadmin', 'Storage Pool 1',
1039 private_storage=mock_private_storage)
1040 self.assertRaises(
1041 exception.ShareBackendException,
1042 self.driver.manage_existing,
1043 share=self.share,
1044 driver_options='driver_options')
1046 @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
1047 def test_manage_existing_nfs_without_existing_share(
1048 self,
1049 mock_get_location_path):
1050 """Test manage existing nfs without existing share."""
1051 mock_api_executor = qnap.QnapShareDriver._create_api_executor
1052 mock_api_executor.return_value.get_share_info.return_value = (
1053 self.get_share_info_return_value())
1054 mock_private_storage = mock.Mock()
1055 mock_private_storage.update.return_value = None
1056 mock_private_storage.get.side_effect = [
1057 'fakeVolId',
1058 'fakeVolName']
1059 mock_api_executor.return_value.get_specific_volinfo.return_value = (
1060 self.get_specific_volinfo_return_value())
1061 mock_api_executor.return_value.get_share_info.return_value = (
1062 None)
1063 mock_get_location_path.return_value = None
1065 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1066 'qnapadmin', 'Storage Pool 1',
1067 private_storage=mock_private_storage)
1068 self.assertRaises(
1069 exception.ManageInvalidShare,
1070 self.driver.manage_existing,
1071 share=self.share,
1072 driver_options='driver_options')
1074 def test_unmanage(self):
1075 """Test unmanage."""
1076 mock_private_storage = mock.Mock()
1077 mock_private_storage.delete.return_value = None
1079 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1080 'qnapadmin', 'Storage Pool 1',
1081 private_storage=mock_private_storage)
1082 self.driver.unmanage(self.share)
1084 mock_private_storage.delete.assert_called_once_with(
1085 'shareId')
1087 @mock.patch.object(qnap.QnapShareDriver, '_get_location_path')
1088 def test_manage_existing_snapshot(
1089 self,
1090 mock_get_location_path):
1091 """Test manage existing snapshot snapshot."""
1092 fake_snapshot = fakes.SnapshotClass(
1093 10, 'fakeShareName@fakeSnapshotName')
1095 mock_api_executor = qnap.QnapShareDriver._create_api_executor
1096 mock_api_executor.return_value.get_share_info.return_value = (
1097 self.get_share_info_return_value())
1098 mock_api_executor.return_value.get_snapshot_info.return_value = (
1099 self.get_snapshot_info_return_value())
1100 mock_private_storage = mock.Mock()
1101 mock_private_storage.update.return_value = None
1102 mock_private_storage.get.side_effect = [
1103 'fakeVolId', 'fakeVolName']
1105 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1106 'qnapadmin', 'Storage Pool 1',
1107 private_storage=mock_private_storage)
1108 self.driver.manage_existing_snapshot(fake_snapshot, 'driver_options')
1110 mock_api_return = mock_api_executor.return_value
1111 mock_api_return.get_share_info.assert_called_once_with(
1112 'Storage Pool 1', vol_no='fakeVolId')
1113 fake_metadata = {
1114 'snapshot_id': 'fakeShareName@fakeSnapshotName'}
1115 mock_private_storage.update.assert_called_once_with(
1116 'fakeSnapshotId', fake_metadata)
1118 def test_unmanage_snapshot(self):
1119 """Test unmanage snapshot."""
1120 fake_snapshot = fakes.SnapshotClass(
1121 10, 'fakeShareName@fakeSnapshotName')
1123 mock_private_storage = mock.Mock()
1124 mock_private_storage.delete.return_value = None
1126 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1127 'qnapadmin', 'Storage Pool 1',
1128 private_storage=mock_private_storage)
1129 self.driver.unmanage_snapshot(fake_snapshot)
1131 mock_private_storage.delete.assert_called_once_with(
1132 'fakeSnapshotId')
1134 @ddt.data(
1135 {'expect_result': 'manila-shr-fake_time', 'test_string': 'share'},
1136 {'expect_result': 'manila-snp-fake_time', 'test_string': 'snapshot'},
1137 {'expect_result': 'manila-hst-fake_time', 'test_string': 'host'},
1138 {'expect_result': 'manila-fake_time', 'test_string': ''})
1139 @ddt.unpack
1140 @mock.patch('oslo_utils.timeutils.utcnow')
1141 def test_gen_random_name(
1142 self, mock_utcnow, expect_result, test_string):
1143 """Test gen random name."""
1144 mock_private_storage = mock.Mock()
1145 mock_utcnow.return_value.strftime.return_value = 'fake_time'
1147 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1148 'qnapadmin', 'Storage Pool 1',
1149 private_storage=mock_private_storage)
1151 self.assertEqual(
1152 expect_result, self.driver._gen_random_name(test_string))
1154 def test_get_location_path(self):
1155 """Test get location path name."""
1156 mock_private_storage = mock.Mock()
1157 mock_api_executor = qnap.QnapShareDriver._create_api_executor
1158 mock_api_executor.return_value.get_share_info.return_value = (
1159 self.get_share_info_return_value())
1160 mock_api_executor.return_value.get_specific_volinfo.return_value = (
1161 self.get_specific_volinfo_return_value())
1163 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1164 'qnapadmin', 'Storage Pool 1',
1165 private_storage=mock_private_storage)
1167 location = 'fakeIp:fakeMountPath'
1168 expect_result = {
1169 'path': location,
1170 'is_admin_only': False,
1171 }
1172 self.assertEqual(
1173 expect_result, self.driver._get_location_path(
1174 'fakeShareName', 'NFS', 'fakeIp', 'fakeVolId'))
1176 self.assertRaises(
1177 exception.InvalidInput,
1178 self.driver._get_location_path,
1179 share_name='fakeShareName',
1180 share_proto='fakeProto',
1181 ip='fakeIp',
1182 vol_id='fakeVolId')
1184 def test_update_share_stats(self):
1185 """Test update share stats."""
1186 mock_private_storage = mock.Mock()
1187 mock_api_return = (
1188 qnap.QnapShareDriver._create_api_executor.return_value)
1189 mock_api_return.get_specific_poolinfo.return_value = (
1190 self.get_specific_poolinfo_return_value())
1191 mock_api_return.get_share_info.return_value = (
1192 self.get_share_info_return_value())
1193 mock_api_return.get_specific_volinfo.return_value = (
1194 self.get_specific_volinfo_return_value())
1196 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1197 'qnapadmin', 'Storage Pool 1',
1198 private_storage=mock_private_storage)
1199 self.driver._update_share_stats()
1201 mock_api_return.get_specific_poolinfo.assert_called_once_with(
1202 self.driver.configuration.qnap_poolname)
1204 def test_get_vol_host(self):
1205 """Test get manila host IPV4s."""
1206 mock_private_storage = mock.Mock()
1208 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1209 'qnapadmin', 'Storage Pool 1',
1210 private_storage=mock_private_storage)
1212 expect_host_dict_ips = []
1213 host_list = self.get_host_list_return_value()
1214 for host in host_list:
1215 host_dict = {
1216 'index': host.find('index').text,
1217 'hostid': host.find('hostid').text,
1218 'name': host.find('name').text,
1219 'ipv4': [host.find('netaddrs').find('ipv4').text]
1220 }
1221 expect_host_dict_ips.append(host_dict)
1223 self.assertEqual(
1224 expect_host_dict_ips, self.driver._get_vol_host(
1225 host_list, 'fakeHostName'))
1227 @mock.patch.object(qnap.QnapShareDriver, '_gen_host_name')
1228 @mock.patch.object(qnap.QnapShareDriver, '_get_timestamp_from_vol_name')
1229 @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
1230 def test_allow_access_ro(
1231 self,
1232 mock_check_share_access,
1233 mock_get_timestamp_from_vol_name,
1234 mock_gen_host_name):
1235 """Test allow_access with access type ro."""
1236 fake_access = fakes.AccessClass('fakeAccessType', 'ro', 'fakeIp')
1238 mock_private_storage = mock.Mock()
1239 mock_private_storage.get.return_value = 'fakeVolName'
1240 mock_api_executor = qnap.QnapShareDriver._create_api_executor
1241 mock_api_executor.return_value.get_host_list.return_value = []
1242 mock_get_timestamp_from_vol_name.return_value = 'fakeHostName'
1243 mock_gen_host_name.return_value = 'manila-fakeHostName-ro'
1244 mock_api_executor.return_value.add_host.return_value = None
1245 mock_api_executor.return_value.set_nfs_access.return_value = None
1247 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1248 'qnapadmin', 'Storage Pool 1',
1249 private_storage=mock_private_storage)
1250 self.driver._allow_access(
1251 'context', self.share, fake_access, share_server=None)
1253 mock_check_share_access.assert_called_once_with(
1254 'NFS', 'fakeAccessType')
1255 mock_api_executor.return_value.add_host.assert_called_once_with(
1256 'manila-fakeHostName-ro', 'fakeIp')
1258 @mock.patch.object(qnap.QnapShareDriver, '_gen_host_name')
1259 @mock.patch.object(qnap.QnapShareDriver, '_get_timestamp_from_vol_name')
1260 @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
1261 def test_allow_access_ro_with_hostlist(
1262 self,
1263 mock_check_share_access,
1264 mock_get_timestamp_from_vol_name,
1265 mock_gen_host_name):
1266 """Test allow_access_ro_with_hostlist."""
1267 host_dict_ips = []
1268 for host in self.get_host_list_return_value():
1269 if host.find('netaddrs/ipv4').text is not None: 1269 ↛ 1268line 1269 didn't jump to line 1268 because the condition on line 1269 was always true
1270 host_dict = {
1271 'index': host.find('index').text,
1272 'hostid': host.find('hostid').text,
1273 'name': host.find('name').text,
1274 'ipv4': [host.find('netaddrs').find('ipv4').text]}
1275 host_dict_ips.append(host_dict)
1277 for host in host_dict_ips:
1278 fake_access_to = host['ipv4']
1279 fake_access = fakes.AccessClass(
1280 'fakeAccessType', 'ro', fake_access_to)
1282 mock_private_storage = mock.Mock()
1283 mock_private_storage.get.return_value = 'fakeVolName'
1284 mock_api_executor = qnap.QnapShareDriver._create_api_executor
1285 mock_api_executor.return_value.get_host_list.return_value = (
1286 self.get_host_list_return_value())
1287 mock_get_timestamp_from_vol_name.return_value = 'fakeHostName'
1288 mock_gen_host_name.return_value = 'manila-fakeHostName'
1289 mock_api_executor.return_value.set_nfs_access.return_value = None
1291 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1292 'qnapadmin', 'Storage Pool 1',
1293 private_storage=mock_private_storage)
1294 self.driver._allow_access(
1295 'context', self.share, fake_access, share_server=None)
1297 mock_check_share_access.assert_called_once_with(
1298 'NFS', 'fakeAccessType')
1300 @mock.patch.object(qnap.QnapShareDriver, '_gen_host_name')
1301 @mock.patch.object(qnap.QnapShareDriver, '_get_timestamp_from_vol_name')
1302 @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
1303 def test_allow_access_rw_with_hostlist_invalid_access(
1304 self,
1305 mock_check_share_access,
1306 mock_get_timestamp_from_vol_name,
1307 mock_gen_host_name):
1308 """Test allow_access_rw_invalid_access."""
1309 host_dict_ips = []
1310 for host in self.get_host_list_return_value():
1311 if host.find('netaddrs/ipv4').text is not None: 1311 ↛ 1310line 1311 didn't jump to line 1310 because the condition on line 1311 was always true
1312 host_dict = {
1313 'index': host.find('index').text,
1314 'hostid': host.find('hostid').text,
1315 'name': host.find('name').text,
1316 'ipv4': [host.find('netaddrs').find('ipv4').text]}
1317 host_dict_ips.append(host_dict)
1319 for host in host_dict_ips:
1320 fake_access_to = host['ipv4']
1321 fake_access = fakes.AccessClass(
1322 'fakeAccessType', 'rw', fake_access_to)
1324 mock_private_storage = mock.Mock()
1325 mock_private_storage.get.return_value = 'fakeVolName'
1326 mock_api_executor = qnap.QnapShareDriver._create_api_executor
1327 mock_api_executor.return_value.get_host_list.return_value = (
1328 self.get_host_list_return_value())
1329 mock_get_timestamp_from_vol_name.return_value = 'fakeHostName'
1330 mock_gen_host_name.return_value = 'manila-fakeHostName-rw'
1332 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1333 'qnapadmin', 'Storage Pool 1',
1334 private_storage=mock_private_storage)
1336 self.assertRaises(
1337 exception.InvalidShareAccess,
1338 self.driver._allow_access,
1339 context='context',
1340 share=self.share,
1341 access=fake_access,
1342 share_server=None)
1344 @mock.patch.object(qnap.QnapShareDriver, '_get_timestamp_from_vol_name')
1345 @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
1346 def test_allow_access_rw(
1347 self,
1348 mock_check_share_access,
1349 mock_get_timestamp_from_vol_name):
1350 """Test allow_access with access type rw."""
1351 fake_access = fakes.AccessClass('fakeAccessType', 'rw', 'fakeIp')
1353 mock_private_storage = mock.Mock()
1354 mock_private_storage.get.return_value = 'fakeVolName'
1355 mock_api_executor = qnap.QnapShareDriver._create_api_executor
1356 mock_api_executor.return_value.get_host_list.return_value = []
1357 mock_get_timestamp_from_vol_name.return_value = 'fakeHostName'
1358 mock_api_executor.return_value.add_host.return_value = None
1359 mock_api_executor.return_value.set_nfs_access.return_value = None
1361 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1362 'qnapadmin', 'Storage Pool 1',
1363 private_storage=mock_private_storage)
1364 self.driver._allow_access(
1365 'context', self.share, fake_access, share_server=None)
1367 mock_check_share_access.assert_called_once_with(
1368 'NFS', 'fakeAccessType')
1369 mock_api_executor.return_value.add_host.assert_called_once_with(
1370 'manila-fakeHostName-rw', 'fakeIp')
1372 @mock.patch.object(qnap.QnapShareDriver, '_gen_host_name')
1373 @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
1374 def test_allow_access_ro_without_hostlist(
1375 self,
1376 mock_check_share_access,
1377 mock_gen_host_name):
1378 """Test allow access without host list."""
1379 fake_access = fakes.AccessClass('fakeAccessType', 'ro', 'fakeIp')
1381 mock_private_storage = mock.Mock()
1383 mock_api_executor = qnap.QnapShareDriver._create_api_executor
1384 mock_api_executor.return_value.get_host_list.return_value = None
1385 mock_gen_host_name.return_value = 'fakeHostName'
1386 mock_api_executor.return_value.add_host.return_value = None
1387 mock_api_executor.return_value.set_nfs_access.return_value = None
1389 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1390 'qnapadmin', 'Storage Pool 1',
1391 private_storage=mock_private_storage)
1392 share_name = self.driver._gen_random_name('share')
1393 mock_private_storage.get.return_value = share_name
1394 self.driver._allow_access(
1395 'context', self.share, fake_access, share_server=None)
1397 mock_check_share_access.assert_called_once_with(
1398 'NFS', 'fakeAccessType')
1399 mock_api_executor.return_value.add_host.assert_called_once_with(
1400 'fakeHostName', 'fakeIp')
1402 @mock.patch.object(qnap.QnapShareDriver, '_get_vol_host')
1403 @mock.patch.object(qnap.QnapShareDriver, '_gen_host_name')
1404 @mock.patch.object(qnap.QnapShareDriver, '_get_timestamp_from_vol_name')
1405 @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
1406 def test_deny_access_with_hostlist(
1407 self,
1408 mock_check_share_access,
1409 mock_get_timestamp_from_vol_name,
1410 mock_gen_host_name,
1411 mock_get_vol_host):
1413 """Test deny access."""
1414 host_dict_ips = []
1415 for host in self.get_host_list_return_value():
1416 if host.find('netaddrs/ipv4').text is not None: 1416 ↛ 1415line 1416 didn't jump to line 1415 because the condition on line 1416 was always true
1417 host_dict = {
1418 'index': host.find('index').text,
1419 'hostid': host.find('hostid').text,
1420 'name': host.find('name').text,
1421 'ipv4': [host.find('netaddrs').find('ipv4').text]}
1422 host_dict_ips.append(host_dict)
1424 for host in host_dict_ips:
1425 fake_access_to = host['ipv4'][0]
1426 fake_access = fakes.AccessClass('fakeAccessType', 'ro', fake_access_to)
1428 mock_private_storage = mock.Mock()
1429 mock_private_storage.get.return_value = 'vol_name'
1431 mock_api_return = (
1432 qnap.QnapShareDriver._create_api_executor.return_value)
1433 mock_api_return.get_host_list.return_value = (
1434 self.get_host_list_return_value())
1435 mock_get_timestamp_from_vol_name.return_value = 'fakeTimeStamp'
1436 mock_gen_host_name.return_value = 'manila-fakeHostName'
1437 mock_get_vol_host.return_value = host_dict_ips
1438 mock_api_return.add_host.return_value = None
1439 mock_api_return.set_nfs_access.return_value = None
1441 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1442 'qnapadmin', 'Storage Pool 1',
1443 private_storage=mock_private_storage)
1444 self.driver._deny_access(
1445 'context', self.share, fake_access, share_server=None)
1447 mock_check_share_access.assert_called_once_with(
1448 'NFS', 'fakeAccessType')
1450 @mock.patch.object(qnap.QnapShareDriver, '_get_timestamp_from_vol_name')
1451 @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
1452 def test_deny_access_with_hostlist_not_equel_access_to(
1453 self,
1454 mock_check_share_access,
1455 mock_get_timestamp_from_vol_name):
1456 """Test deny access."""
1457 fake_access = fakes.AccessClass('fakeAccessType', 'ro', 'fakeIp')
1459 mock_private_storage = mock.Mock()
1460 mock_private_storage.get.return_value = 'vol_name'
1461 mock_api_return = (
1462 qnap.QnapShareDriver._create_api_executor.return_value)
1463 mock_api_return.get_host_list.return_value = (
1464 self.get_host_list_return_value())
1465 mock_api_return.add_host.return_value = None
1467 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1468 'qnapadmin', 'Storage Pool 1',
1469 private_storage=mock_private_storage)
1470 self.driver._deny_access(
1471 'context', self.share, fake_access, share_server=None)
1473 mock_check_share_access.assert_called_once_with(
1474 'NFS', 'fakeAccessType')
1476 @mock.patch.object(qnap.QnapShareDriver, '_get_timestamp_from_vol_name')
1477 @mock.patch.object(qnap.QnapShareDriver, '_check_share_access')
1478 def test_deny_access_without_hostlist(
1479 self,
1480 mock_check_share_access,
1481 mock_get_timestamp_from_vol_name):
1482 """Test deny access without hostlist."""
1483 fake_access = fakes.AccessClass('fakeAccessType', 'ro', 'fakeIp')
1485 mock_private_storage = mock.Mock()
1486 mock_private_storage.get.return_value = 'fakeVolName'
1487 mock_api_executor = qnap.QnapShareDriver._create_api_executor
1488 mock_api_executor.return_value.get_host_list.return_value = None
1489 mock_get_timestamp_from_vol_name.return_value = 'fakeHostName'
1490 mock_api_executor.return_value.add_host.return_value = None
1491 mock_api_executor.return_value.set_nfs_access.return_value = None
1493 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1494 'qnapadmin', 'Storage Pool 1',
1495 private_storage=mock_private_storage)
1496 self.driver._deny_access(
1497 'context', self.share, fake_access, share_server=None)
1499 mock_check_share_access.assert_called_once_with(
1500 'NFS', 'fakeAccessType')
1502 @ddt.data('NFS', 'CIFS', 'proto')
1503 def test_check_share_access(self, test_proto):
1504 """Test check_share_access."""
1505 mock_private_storage = mock.Mock()
1506 mock_api_executor = qnap.QnapShareDriver._create_api_executor
1507 mock_api_executor.return_value.get_host_list.return_value = None
1508 mock_api_executor.return_value.add_host.return_value = None
1509 mock_api_executor.return_value.set_nfs_access.return_value = None
1511 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1512 'qnapadmin', 'Storage Pool 1',
1513 private_storage=mock_private_storage)
1514 self.assertRaises(
1515 exception.InvalidShareAccess,
1516 self.driver._check_share_access,
1517 share_proto=test_proto,
1518 access_type='notser')
1520 def test_get_ts_model_pool_id(self):
1521 """Test get ts model pool id."""
1522 mock_private_storage = mock.Mock()
1524 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
1525 'qnapadmin', '1',
1526 private_storage=mock_private_storage)
1527 self.assertEqual('1', self.driver._get_ts_model_pool_id('1'))