Coverage for manila/tests/share/drivers/qnap/test_api.py: 99%
236 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2016 QNAP Systems, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import base64
17import ddt
18from http import client as http_client
19import time
20from unittest import mock
21from urllib import parse as urlparse
23from manila import exception
24from manila.share.drivers.qnap import qnap
25from manila import test
26from manila.tests import fake_share
27from manila.tests.share.drivers.qnap import fakes
30def create_configuration(management_url, qnap_share_ip, qnap_nas_login,
31 qnap_nas_password, qnap_poolname):
32 """Create configuration."""
33 configuration = mock.Mock()
34 configuration.qnap_management_url = management_url
35 configuration.qnap_share_ip = qnap_share_ip
36 configuration.qnap_nas_login = qnap_nas_login
37 configuration.qnap_nas_password = qnap_nas_password
38 configuration.qnap_poolname = qnap_poolname
39 configuration.safe_get.return_value = False
40 return configuration
43class QnapShareDriverBaseTestCase(test.TestCase):
44 """Base Class for the QnapShareDriver Tests."""
46 def setUp(self):
47 """Setup the Qnap Driver Base TestCase."""
48 super(QnapShareDriverBaseTestCase, self).setUp()
49 self.driver = None
50 self.share_api = None
52 def _do_setup(self, management_url, share_ip, nas_login,
53 nas_password, poolname, **kwargs):
54 """Config do setup configurations."""
55 self.driver = qnap.QnapShareDriver(
56 configuration=create_configuration(
57 management_url,
58 share_ip,
59 nas_login,
60 nas_password,
61 poolname),
62 private_storage=kwargs.get('private_storage'))
63 self.driver.do_setup('context')
66@ddt.ddt
67class QnapAPITestCase(QnapShareDriverBaseTestCase):
68 """Tests QNAP api functions."""
70 login_url = ('/cgi-bin/authLogin.cgi?')
71 get_basic_info_url = ('/cgi-bin/authLogin.cgi')
72 fake_password = 'qnapadmin'
74 def setUp(self):
75 """Setup the Qnap API TestCase."""
76 super(QnapAPITestCase, self).setUp()
77 fake_parms = {}
78 fake_parms['user'] = 'admin'
79 fake_parms['pwd'] = base64.b64encode(
80 self.fake_password.encode("utf-8"))
81 fake_parms['serviceKey'] = 1
82 sanitized_params = self._sanitize_params(fake_parms)
83 self.login_url = ('/cgi-bin/authLogin.cgi?%s' % sanitized_params)
84 self.mock_object(http_client, 'HTTPConnection')
85 self.share = fake_share.fake_share(
86 share_proto='NFS',
87 id='shareId',
88 display_name='fakeDisplayName',
89 export_locations=[{'path': '1.2.3.4:/share/fakeShareName'}],
90 host='QnapShareDriver',
91 size=10)
93 def _sanitize_params(self, params, doseq=False):
94 sanitized_params = {}
95 for key in params:
96 value = params[key]
97 if value is not None: 97 ↛ 95line 97 didn't jump to line 95 because the condition on line 97 was always true
98 if isinstance(value, list):
99 sanitized_params[key] = [str(v) for v in value]
100 else:
101 sanitized_params[key] = str(value)
103 sanitized_params = urlparse.urlencode(sanitized_params, doseq)
104 return sanitized_params
106 @ddt.data('fake_share_name', 'fakeLabel')
107 def test_create_share_api(self, fake_name):
108 """Test create share api."""
109 mock_http_connection = http_client.HTTPConnection
110 mock_http_connection.return_value.getresponse.side_effect = [
111 fakes.FakeLoginResponse(),
112 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
113 fakes.FakeLoginResponse(),
114 fakes.FakeCreateShareResponse()]
116 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
117 'qnapadmin', 'Storage Pool 1')
118 self.driver.api_executor.create_share(
119 self.share,
120 'Storage Pool 1',
121 fake_name,
122 'NFS',
123 qnap_deduplication=False,
124 qnap_compression=True,
125 qnap_thin_provision=True,
126 qnap_ssd_cache=False)
128 fake_params = {
129 'wiz_func': 'share_create',
130 'action': 'add_share',
131 'vol_name': fake_name,
132 'vol_size': '10' + 'GB',
133 'threshold': '80',
134 'dedup': 'off',
135 'compression': '1',
136 'thin_pro': '1',
137 'cache': '0',
138 'cifs_enable': '0',
139 'nfs_enable': '1',
140 'afp_enable': '0',
141 'ftp_enable': '0',
142 'encryption': '0',
143 'hidden': '0',
144 'oplocks': '1',
145 'sync': 'always',
146 'userrw0': 'admin',
147 'userrd_len': '0',
148 'userrw_len': '1',
149 'userno_len': '0',
150 'access_r': 'setup_users',
151 'path_type': 'auto',
152 'recycle_bin': '1',
153 'recycle_bin_administrators_only': '0',
154 'pool_name': 'Storage Pool 1',
155 'sid': 'fakeSid',
156 }
157 sanitized_params = self._sanitize_params(fake_params)
158 fake_url = ('/cgi-bin/wizReq.cgi?%s' % sanitized_params)
160 expected_call_list = [
161 mock.call('GET', self.login_url),
162 mock.call('GET', self.get_basic_info_url),
163 mock.call('GET', self.login_url),
164 mock.call('GET', fake_url)]
165 self.assertEqual(
166 expected_call_list,
167 mock_http_connection.return_value.request.call_args_list)
169 def test_api_delete_share(self):
170 """Test delete share api."""
171 mock_http_connection = http_client.HTTPConnection
172 mock_http_connection.return_value.getresponse.side_effect = [
173 fakes.FakeLoginResponse(),
174 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
175 fakes.FakeLoginResponse(),
176 fakes.FakeDeleteShareResponse()]
178 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
179 'qnapadmin', 'Storage Pool 1')
180 self.driver.api_executor.delete_share(
181 'fakeId')
183 fake_params = {
184 'func': 'volume_mgmt',
185 'vol_remove': '1',
186 'volumeID': 'fakeId',
187 'stop_service': 'no',
188 'sid': 'fakeSid',
189 }
190 sanitized_params = self._sanitize_params(fake_params)
191 fake_url = (
192 '/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
194 expected_call_list = [
195 mock.call('GET', self.login_url),
196 mock.call('GET', self.get_basic_info_url),
197 mock.call('GET', self.login_url),
198 mock.call('GET', fake_url)]
199 self.assertEqual(
200 expected_call_list,
201 mock_http_connection.return_value.request.call_args_list)
203 def test_get_specific_poolinfo(self):
204 """Test get specific poolinfo api."""
205 mock_http_connection = http_client.HTTPConnection
206 mock_http_connection.return_value.getresponse.side_effect = [
207 fakes.FakeLoginResponse(),
208 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
209 fakes.FakeLoginResponse(),
210 fakes.FakeSpecificPoolInfoResponse()]
212 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
213 'qnapadmin', 'Storage Pool 1')
214 self.driver.api_executor.get_specific_poolinfo(
215 'fakePoolId')
217 fake_params = {
218 'store': 'poolInfo',
219 'func': 'extra_get',
220 'poolID': 'fakePoolId',
221 'Pool_Info': '1',
222 'sid': 'fakeSid',
223 }
224 sanitized_params = self._sanitize_params(fake_params)
225 fake_url = (
226 '/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
228 expected_call_list = [
229 mock.call('GET', self.login_url),
230 mock.call('GET', self.get_basic_info_url),
231 mock.call('GET', self.login_url),
232 mock.call('GET', fake_url)]
233 self.assertEqual(
234 expected_call_list,
235 mock_http_connection.return_value.request.call_args_list)
237 @ddt.data({'pool_id': "Storage Pool 1"},
238 {'pool_id': "Storage Pool 1", 'vol_no': 'fakeNo'},
239 {'pool_id': "Storage Pool 1", 'vol_label': 'fakeShareName'})
240 def test_get_share_info(self, dict_parm):
241 """Test get share info api."""
242 mock_http_connection = http_client.HTTPConnection
243 mock_http_connection.return_value.getresponse.side_effect = [
244 fakes.FakeLoginResponse(),
245 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
246 fakes.FakeLoginResponse(),
247 fakes.FakeShareInfoResponse()]
249 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
250 'qnapadmin', 'Storage Pool 1')
251 self.driver.api_executor.get_share_info(**dict_parm)
253 fake_params = {
254 'store': 'poolVolumeList',
255 'poolID': 'Storage Pool 1',
256 'func': 'extra_get',
257 'Pool_Vol_Info': '1',
258 'sid': 'fakeSid',
259 }
260 sanitized_params = self._sanitize_params(fake_params)
261 fake_url = (
262 '/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
264 expected_call_list = [
265 mock.call('GET', self.login_url),
266 mock.call('GET', self.get_basic_info_url),
267 mock.call('GET', self.login_url),
268 mock.call('GET', fake_url)]
269 self.assertEqual(
270 expected_call_list,
271 mock_http_connection.return_value.request.call_args_list)
273 def test_get_specific_volinfo(self):
274 """Test get specific volume info api."""
275 mock_http_connection = http_client.HTTPConnection
276 mock_http_connection.return_value.getresponse.side_effect = [
277 fakes.FakeLoginResponse(),
278 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
279 fakes.FakeLoginResponse(),
280 fakes.FakeSpecificVolInfoResponse()]
282 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
283 'qnapadmin', 'Storage Pool 1')
284 self.driver.api_executor.get_specific_volinfo(
285 'fakeNo')
287 fake_params = {
288 'store': 'volumeInfo',
289 'volumeID': 'fakeNo',
290 'func': 'extra_get',
291 'Volume_Info': '1',
292 'sid': 'fakeSid',
293 }
294 sanitized_params = self._sanitize_params(fake_params)
295 fake_url = (
296 '/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
298 expected_call_list = [
299 mock.call('GET', self.login_url),
300 mock.call('GET', self.get_basic_info_url),
301 mock.call('GET', self.login_url),
302 mock.call('GET', fake_url)]
303 self.assertEqual(
304 expected_call_list,
305 mock_http_connection.return_value.request.call_args_list)
307 def test_get_snapshot_info_es(self):
308 """Test get snapsho info api."""
309 mock_http_connection = http_client.HTTPConnection
310 mock_http_connection.return_value.getresponse.side_effect = [
311 fakes.FakeLoginResponse(),
312 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
313 fakes.FakeLoginResponse(),
314 fakes.FakeSnapshotInfoResponse()]
316 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
317 'qnapadmin', 'Storage Pool 1')
318 self.driver.api_executor.get_snapshot_info(
319 volID='volId', snapshot_name='fakeSnapshotName')
321 fake_params = {
322 'func': 'extra_get',
323 'volumeID': 'volId',
324 'snapshot_list': '1',
325 'snap_start': '0',
326 'snap_count': '100',
327 'sid': 'fakeSid',
328 }
329 sanitized_params = self._sanitize_params(fake_params)
330 fake_url = (
331 '/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
333 expected_call_list = [
334 mock.call('GET', self.login_url),
335 mock.call('GET', self.get_basic_info_url),
336 mock.call('GET', self.login_url),
337 mock.call('GET', fake_url)]
338 self.assertEqual(
339 expected_call_list,
340 mock_http_connection.return_value.request.call_args_list)
342 def test_create_snapshot_api(self):
343 """Test create snapshot api."""
344 mock_http_connection = http_client.HTTPConnection
345 mock_http_connection.return_value.getresponse.side_effect = [
346 fakes.FakeLoginResponse(),
347 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
348 fakes.FakeLoginResponse(),
349 fakes.FakeCreateSnapshotResponse()]
351 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
352 'qnapadmin', 'Storage Pool 1')
353 self.driver.api_executor.create_snapshot_api(
354 'fakeVolumeId',
355 'fakeSnapshotName')
357 fake_params = {
358 'func': 'create_snapshot',
359 'volumeID': 'fakeVolumeId',
360 'snapshot_name': 'fakeSnapshotName',
361 'expire_min': '0',
362 'vital': '1',
363 'sid': 'fakeSid',
364 }
365 sanitized_params = self._sanitize_params(fake_params)
366 fake_url = (
367 '/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
369 expected_call_list = [
370 mock.call('GET', self.login_url),
371 mock.call('GET', self.get_basic_info_url),
372 mock.call('GET', self.login_url),
373 mock.call('GET', fake_url)]
374 self.assertEqual(
375 expected_call_list,
376 mock_http_connection.return_value.request.call_args_list)
378 @ddt.data(fakes.FakeDeleteSnapshotResponse(),
379 fakes.FakeDeleteSnapshotResponseSnapshotNotExist(),
380 fakes.FakeDeleteSnapshotResponseShareNotExist())
381 def test_delete_snapshot_api(self, fakeDeleteSnapshotResponse):
382 """Test delete snapshot api."""
383 mock_http_connection = http_client.HTTPConnection
384 mock_http_connection.return_value.getresponse.side_effect = [
385 fakes.FakeLoginResponse(),
386 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
387 fakes.FakeLoginResponse(),
388 fakeDeleteSnapshotResponse]
390 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
391 'qnapadmin', 'Storage Pool 1')
392 self.driver.api_executor.delete_snapshot_api(
393 'fakeSnapshotId')
395 fake_params = {
396 'func': 'del_snapshots',
397 'snapshotID': 'fakeSnapshotId',
398 'sid': 'fakeSid',
399 }
400 sanitized_params = self._sanitize_params(fake_params)
401 fake_url = (
402 '/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
404 expected_call_list = [
405 mock.call('GET', self.login_url),
406 mock.call('GET', self.get_basic_info_url),
407 mock.call('GET', self.login_url),
408 mock.call('GET', fake_url)]
409 self.assertEqual(
410 expected_call_list,
411 mock_http_connection.return_value.request.call_args_list)
413 def test_clone_snapshot_api(self):
414 """Test clone snapshot api."""
415 mock_http_connection = http_client.HTTPConnection
416 mock_http_connection.return_value.getresponse.side_effect = [
417 fakes.FakeLoginResponse(),
418 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
419 fakes.FakeLoginResponse(),
420 fakes.FakeDeleteSnapshotResponse()]
422 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
423 'qnapadmin', 'Storage Pool 1')
424 self.driver.api_executor.clone_snapshot(
425 'fakeSnapshotId',
426 'fakeNewShareName',
427 'fakeCloneSize')
429 fake_params = {
430 'func': 'clone_qsnapshot',
431 'by_vol': '1',
432 'snapshotID': 'fakeSnapshotId',
433 'new_name': 'fakeNewShareName',
434 'clone_size': '{}g'.format('fakeCloneSize'),
435 'sid': 'fakeSid',
436 }
437 sanitized_params = self._sanitize_params(fake_params)
438 fake_url = (
439 '/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
441 expected_call_list = [
442 mock.call('GET', self.login_url),
443 mock.call('GET', self.get_basic_info_url),
444 mock.call('GET', self.login_url),
445 mock.call('GET', fake_url)]
446 self.assertEqual(
447 expected_call_list,
448 mock_http_connection.return_value.request.call_args_list)
450 def test_edit_share_api(self):
451 """Test edit share api."""
452 mock_http_connection = http_client.HTTPConnection
453 mock_http_connection.return_value.getresponse.side_effect = [
454 fakes.FakeLoginResponse(),
455 fakes.FakeGetBasicInfoResponseTs_4_3_0(),
456 fakes.FakeLoginResponse(),
457 fakes.FakeCreateSnapshotResponse()]
459 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
460 'qnapadmin', 'Storage Pool 1')
461 expect_share_dict = {
462 "sharename": 'fakeVolId',
463 "old_sharename": 'fakeVolId',
464 "new_size": 100,
465 "deduplication": False,
466 "compression": True,
467 "thin_provision": True,
468 "ssd_cache": False,
469 "share_proto": "NFS"
470 }
471 self.driver.api_executor.edit_share(
472 expect_share_dict)
474 fake_params = {
475 'wiz_func': 'share_property',
476 'action': 'share_property',
477 'sharename': 'fakeVolId',
478 'old_sharename': 'fakeVolId',
479 'dedup': 'off',
480 'compression': '1',
481 'thin_pro': '1',
482 'cache': '0',
483 'cifs_enable': '0',
484 'nfs_enable': '1',
485 'afp_enable': '0',
486 'ftp_enable': '0',
487 'hidden': '0',
488 'oplocks': '1',
489 'sync': 'always',
490 'recycle_bin': '1',
491 'recycle_bin_administrators_only': '0',
492 'sid': 'fakeSid',
493 }
494 if expect_share_dict.get('new_size'): 494 ↛ 496line 494 didn't jump to line 496 because the condition on line 494 was always true
495 fake_params['vol_size'] = '100GB'
496 sanitized_params = self._sanitize_params(fake_params)
497 fake_url = (
498 '/cgi-bin/priv/privWizard.cgi?%s' % sanitized_params)
500 expected_call_list = [
501 mock.call('GET', self.login_url),
502 mock.call('GET', self.get_basic_info_url),
503 mock.call('GET', self.login_url),
504 mock.call('GET', fake_url)]
505 self.assertEqual(
506 expected_call_list,
507 mock_http_connection.return_value.request.call_args_list)
509 @ddt.data(fakes.FakeGetHostListResponse(),
510 fakes.FakeGetNoHostListResponse())
511 def test_get_host_list(self, fakeGetHostListResponse):
512 """Test get host list api."""
513 mock_http_connection = http_client.HTTPConnection
514 mock_http_connection.return_value.getresponse.side_effect = [
515 fakes.FakeLoginResponse(),
516 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
517 fakes.FakeLoginResponse(),
518 fakeGetHostListResponse]
520 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
521 'qnapadmin', 'Storage Pool 1')
522 self.driver.api_executor.get_host_list()
524 fake_params = {
525 'module': 'hosts',
526 'func': 'get_hostlist',
527 'sid': 'fakeSid',
528 }
529 sanitized_params = self._sanitize_params(fake_params)
530 fake_url = (
531 ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s') %
532 sanitized_params)
534 expected_call_list = [
535 mock.call('GET', self.login_url),
536 mock.call('GET', self.get_basic_info_url),
537 mock.call('GET', self.login_url),
538 mock.call('GET', fake_url)]
539 self.assertEqual(
540 expected_call_list,
541 mock_http_connection.return_value.request.call_args_list)
543 def test_add_host(self):
544 """Test add host api."""
545 mock_http_connection = http_client.HTTPConnection
546 mock_http_connection.return_value.getresponse.side_effect = [
547 fakes.FakeLoginResponse(),
548 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
549 fakes.FakeLoginResponse(),
550 fakes.FakeGetHostListResponse()]
552 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
553 'qnapadmin', 'Storage Pool 1')
554 self.driver.api_executor.add_host(
555 'fakeHostName', 'fakeIpV4')
557 fake_params = {
558 'module': 'hosts',
559 'func': 'apply_addhost',
560 'name': 'fakeHostName',
561 'ipaddr_v4': 'fakeIpV4',
562 'sid': 'fakeSid',
563 }
564 sanitized_params = self._sanitize_params(fake_params)
565 fake_url = (
566 ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s') %
567 sanitized_params)
569 expected_call_list = [
570 mock.call('GET', self.login_url),
571 mock.call('GET', self.get_basic_info_url),
572 mock.call('GET', self.login_url),
573 mock.call('GET', fake_url)]
574 self.assertEqual(
575 expected_call_list,
576 mock_http_connection.return_value.request.call_args_list)
578 def test_edit_host(self):
579 """Test edit host api."""
580 mock_http_connection = http_client.HTTPConnection
581 mock_http_connection.return_value.getresponse.side_effect = [
582 fakes.FakeLoginResponse(),
583 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
584 fakes.FakeLoginResponse(),
585 fakes.FakeGetHostListResponse()]
587 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
588 'qnapadmin', 'Storage Pool 1')
589 self.driver.api_executor.edit_host(
590 'fakeHostName', ['fakeIpV4'])
592 fake_params = {
593 'module': 'hosts',
594 'func': 'apply_sethost',
595 'name': 'fakeHostName',
596 'ipaddr_v4': ['fakeIpV4'],
597 'sid': 'fakeSid',
598 }
599 sanitized_params = self._sanitize_params(fake_params, doseq=True)
600 fake_url = (
601 ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s') %
602 sanitized_params)
604 expected_call_list = [
605 mock.call('GET', self.login_url),
606 mock.call('GET', self.get_basic_info_url),
607 mock.call('GET', self.login_url),
608 mock.call('GET', fake_url)]
609 self.assertEqual(
610 expected_call_list,
611 mock_http_connection.return_value.request.call_args_list)
613 def test_delete_host(self):
614 """Test delete host api."""
615 mock_http_connection = http_client.HTTPConnection
616 mock_http_connection.return_value.getresponse.side_effect = [
617 fakes.FakeLoginResponse(),
618 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
619 fakes.FakeLoginResponse(),
620 fakes.FakeGetHostListResponse()]
622 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
623 'qnapadmin', 'Storage Pool 1')
624 self.driver.api_executor.delete_host('fakeHostName')
626 fake_params = {
627 'module': 'hosts',
628 'func': 'apply_delhost',
629 'host_name': 'fakeHostName',
630 'sid': 'fakeSid',
631 }
632 sanitized_params = self._sanitize_params(fake_params)
633 fake_url = (
634 ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s') %
635 sanitized_params)
637 expected_call_list = [
638 mock.call('GET', self.login_url),
639 mock.call('GET', self.get_basic_info_url),
640 mock.call('GET', self.login_url),
641 mock.call('GET', fake_url)]
642 self.assertEqual(
643 expected_call_list,
644 mock_http_connection.return_value.request.call_args_list)
646 @ddt.data(fakes.FakeGetHostListResponse())
647 def test_set_nfs_access(self, fakeGetHostListResponse):
648 """Test get host list api."""
649 mock_http_connection = http_client.HTTPConnection
650 mock_http_connection.return_value.getresponse.side_effect = [
651 fakes.FakeLoginResponse(),
652 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
653 fakes.FakeLoginResponse(),
654 fakeGetHostListResponse]
656 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
657 'qnapadmin', 'Storage Pool 1')
658 self.driver.api_executor.set_nfs_access(
659 'fakeShareName', 'fakeAccess', 'fakeHostName')
661 fake_params = {
662 'wiz_func': 'share_nfs_control',
663 'action': 'share_nfs_control',
664 'sharename': 'fakeShareName',
665 'access': 'fakeAccess',
666 'host_name': 'fakeHostName',
667 'sid': 'fakeSid',
668 }
669 sanitized_params = self._sanitize_params(fake_params)
670 fake_url = (
671 ('/cgi-bin/priv/privWizard.cgi?%s') %
672 sanitized_params)
674 expected_call_list = [
675 mock.call('GET', self.login_url),
676 mock.call('GET', self.get_basic_info_url),
677 mock.call('GET', self.login_url),
678 mock.call('GET', fake_url)]
679 self.assertEqual(
680 expected_call_list,
681 mock_http_connection.return_value.request.call_args_list)
683 def test_get_snapshot_info_ts_api(self):
684 """Test get snapshot info api."""
685 mock_http_connection = http_client.HTTPConnection
686 mock_http_connection.return_value.getresponse.side_effect = [
687 fakes.FakeLoginResponse(),
688 fakes.FakeGetBasicInfoResponseTs_4_3_0(),
689 fakes.FakeLoginResponse(),
690 fakes.FakeSnapshotInfoResponse()]
692 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
693 'qnapadmin', 'Storage Pool 1')
694 self.driver.api_executor.get_snapshot_info(
695 snapshot_name='fakeSnapshotName',
696 lun_index='fakeLunIndex')
698 fake_params = {
699 'func': 'extra_get',
700 'LUNIndex': 'fakeLunIndex',
701 'smb_snapshot_list': '1',
702 'smb_snapshot': '1',
703 'snapshot_list': '1',
704 'sid': 'fakeSid'}
706 sanitized_params = self._sanitize_params(fake_params)
707 fake_url = (
708 ('/cgi-bin/disk/snapshot.cgi?%s') %
709 sanitized_params)
711 expected_call_list = [
712 mock.call('GET', self.login_url),
713 mock.call('GET', self.get_basic_info_url),
714 mock.call('GET', self.login_url),
715 mock.call('GET', fake_url)]
716 self.assertEqual(
717 expected_call_list,
718 mock_http_connection.return_value.request.call_args_list)
720 @ddt.data(fakes.FakeAuthPassFailResponse(),
721 fakes.FakeEsResCodeNegativeResponse())
722 def test_api_create_share_with_fail_response(self, fake_fail_response):
723 """Test create share api with fail response."""
724 mock_http_connection = http_client.HTTPConnection
725 mock_http_connection.return_value.getresponse.side_effect = [
726 fakes.FakeLoginResponse(),
727 fakes.FakeGetBasicInfoResponseEs_1_1_3(),
728 fakes.FakeLoginResponse(),
729 fake_fail_response,
730 fake_fail_response,
731 fake_fail_response,
732 fake_fail_response,
733 fake_fail_response,
734 fake_fail_response,
735 fake_fail_response,
736 fake_fail_response,
737 fake_fail_response,
738 fake_fail_response]
740 self.mock_object(time, 'sleep')
741 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
742 'qnapadmin', 'Storage Pool 1')
743 self.assertRaises(
744 exception.ShareBackendException,
745 self.driver.api_executor.create_share,
746 share=self.share,
747 pool_name='Storage Pool 1',
748 create_share_name='fake_share_name',
749 share_proto='NFS',
750 qnap_deduplication=False,
751 qnap_compression=True,
752 qnap_thin_provision=True,
753 qnap_ssd_cache=False)
755 @ddt.unpack
756 @ddt.data(['self.driver.api_executor.get_share_info',
757 {'pool_id': 'fakeId'},
758 fakes.FakeAuthPassFailResponse(),
759 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
760 ['self.driver.api_executor.get_specific_volinfo',
761 {'vol_id': 'fakeId'},
762 fakes.FakeAuthPassFailResponse(),
763 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
764 ['self.driver.api_executor.create_snapshot_api',
765 {'volumeID': 'fakeVolumeId',
766 'snapshot_name': 'fakeSnapshotName'},
767 fakes.FakeAuthPassFailResponse(),
768 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
769 ['self.driver.api_executor.create_snapshot_api',
770 {'volumeID': 'fakeVolumeId',
771 'snapshot_name': 'fakeSnapshotName'},
772 fakes.FakeEsResCodeNegativeResponse(),
773 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
774 ['self.driver.api_executor.get_snapshot_info',
775 {'volID': 'volId'},
776 fakes.FakeAuthPassFailResponse(),
777 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
778 ['self.driver.api_executor.get_snapshot_info',
779 {'volID': 'volId'},
780 fakes.FakeResultNegativeResponse(),
781 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
782 ['self.driver.api_executor.get_specific_poolinfo',
783 {'pool_id': 'Storage Pool 1'},
784 fakes.FakeAuthPassFailResponse(),
785 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
786 ['self.driver.api_executor.get_specific_poolinfo',
787 {'pool_id': 'Storage Pool 1'},
788 fakes.FakeResultNegativeResponse(),
789 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
790 ['self.driver.api_executor.delete_share',
791 {'vol_id': 'fakeId'},
792 fakes.FakeAuthPassFailResponse(),
793 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
794 ['self.driver.api_executor.delete_share',
795 {'vol_id': 'fakeId'},
796 fakes.FakeResultNegativeResponse(),
797 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
798 ['self.driver.api_executor.delete_snapshot_api',
799 {'snapshot_id': 'fakeSnapshotId'},
800 fakes.FakeAuthPassFailResponse(),
801 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
802 ['self.driver.api_executor.delete_snapshot_api',
803 {'snapshot_id': 'fakeSnapshotId'},
804 fakes.FakeResultNegativeResponse(),
805 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
806 ['self.driver.api_executor.clone_snapshot',
807 {'snapshot_id': 'fakeSnapshotId',
808 'new_sharename': 'fakeNewShareName',
809 'clone_size': 'fakeCloneSize'},
810 fakes.FakeResultNegativeResponse(),
811 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
812 ['self.driver.api_executor.clone_snapshot',
813 {'snapshot_id': 'fakeSnapshotId',
814 'new_sharename': 'fakeNewShareName',
815 'clone_size': 'fakeCloneSize'},
816 fakes.FakeAuthPassFailResponse(),
817 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
818 ['self.driver.api_executor.edit_share',
819 {'share_dict': {"sharename": 'fakeVolId',
820 "old_sharename": 'fakeVolId',
821 "new_size": 100,
822 "deduplication": False,
823 "compression": True,
824 "thin_provision": False,
825 "ssd_cache": False,
826 "share_proto": "NFS"}},
827 fakes.FakeEsResCodeNegativeResponse(),
828 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
829 ['self.driver.api_executor.edit_share',
830 {'share_dict': {"sharename": 'fakeVolId',
831 "old_sharename": 'fakeVolId',
832 "new_size": 100,
833 "deduplication": False,
834 "compression": True,
835 "thin_provision": False,
836 "ssd_cache": False,
837 "share_proto": "NFS"}},
838 fakes.FakeAuthPassFailResponse(),
839 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
840 ['self.driver.api_executor.add_host',
841 {'hostname': 'fakeHostName',
842 'ipv4': 'fakeIpV4'},
843 fakes.FakeResultNegativeResponse(),
844 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
845 ['self.driver.api_executor.add_host',
846 {'hostname': 'fakeHostName',
847 'ipv4': 'fakeIpV4'},
848 fakes.FakeAuthPassFailResponse(),
849 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
850 ['self.driver.api_executor.edit_host',
851 {'hostname': 'fakeHostName',
852 'ipv4_list': 'fakeIpV4List'},
853 fakes.FakeResultNegativeResponse(),
854 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
855 ['self.driver.api_executor.edit_host',
856 {'hostname': 'fakeHostName',
857 'ipv4_list': 'fakeIpV4List'},
858 fakes.FakeAuthPassFailResponse(),
859 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
860 ['self.driver.api_executor.delete_host',
861 {'hostname': 'fakeHostName'},
862 fakes.FakeResultNegativeResponse(),
863 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
864 ['self.driver.api_executor.delete_host',
865 {'hostname': 'fakeHostName'},
866 fakes.FakeAuthPassFailResponse(),
867 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
868 ['self.driver.api_executor.get_host_list',
869 {},
870 fakes.FakeResultNegativeResponse(),
871 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
872 ['self.driver.api_executor.get_host_list',
873 {},
874 fakes.FakeAuthPassFailResponse(),
875 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
876 ['self.driver.api_executor.set_nfs_access',
877 {'sharename': 'fakeShareName',
878 'access': 'fakeAccess',
879 'host_name': 'fakeHostName'},
880 fakes.FakeAuthPassFailResponse(),
881 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
882 ['self.driver.api_executor.set_nfs_access',
883 {'sharename': 'fakeShareName',
884 'access': 'fakeAccess',
885 'host_name': 'fakeHostName'},
886 fakes.FakeResultNegativeResponse(),
887 fakes.FakeGetBasicInfoResponseEs_1_1_3()],
888 ['self.driver.api_executor.get_snapshot_info',
889 {'snapshot_name': 'fakeSnapshoName',
890 'lun_index': 'fakeLunIndex'},
891 fakes.FakeAuthPassFailResponse(),
892 fakes.FakeGetBasicInfoResponseTs_4_3_0()],
893 ['self.driver.api_executor.get_snapshot_info',
894 {'snapshot_name': 'fakeSnapshoName',
895 'lun_index': 'fakeLunIndex'},
896 fakes.FakeResultNegativeResponse(),
897 fakes.FakeGetBasicInfoResponseTs_4_3_0()])
898 def test_get_snapshot_info_ts_with_fail_response(
899 self, api, dict_parm,
900 fake_fail_response, fake_basic_info):
901 """Test get snapshot info api with fail response."""
902 mock_http_connection = http_client.HTTPConnection
903 mock_http_connection.return_value.getresponse.side_effect = [
904 fakes.FakeLoginResponse(),
905 fake_basic_info,
906 fakes.FakeLoginResponse(),
907 fake_fail_response,
908 fake_fail_response,
909 fake_fail_response,
910 fake_fail_response,
911 fake_fail_response,
912 fake_fail_response,
913 fake_fail_response,
914 fake_fail_response,
915 fake_fail_response,
916 fake_fail_response]
918 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin',
919 'qnapadmin', 'Storage Pool 1')
920 self.mock_object(time, 'sleep')
921 self.assertRaises(
922 exception.ShareBackendException,
923 eval(api),
924 **dict_parm)