Coverage for manila/tests/share/drivers/macrosan/test_macrosan_nas.py: 99%
927 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2022 MacroSAN Technologies Co., Ltd.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""
17Share driver test for Macrosan Storage Array.
18"""
19import ddt
20import requests
22from oslo_config import cfg
23from unittest import mock
25from manila import context
26from manila import exception
27from manila.share import configuration
28from manila.share import driver
29from manila.share.drivers.macrosan import macrosan_constants as constants
30from manila.share.drivers.macrosan import macrosan_helper
31from manila.share.drivers.macrosan import macrosan_nas
32from manila.share.drivers.macrosan import rest_helper
33from manila import test
34from manila.tests import fake_share
36CONF = cfg.CONF
39class FakeResponse(object):
40 def __init__(self, status, result):
41 self.status_code = status
42 self.text = 'return message'
43 self.response = result
45 def json(self):
46 return self.response
48 def close(self):
49 pass
52@ddt.ddt
53class MacrosanShareDriverTestCase(test.TestCase):
55 def setUp(self):
56 self.mock_object(macrosan_nas.CONF, '_check_required_opts')
57 super(MacrosanShareDriverTestCase, self).setUp()
59 def _safe_get(opt):
60 return getattr(self.configuration, opt)
62 self._context = context.get_admin_context()
63 self.configuration = mock.Mock(spec=configuration.Configuration)
64 self.configuration.safe_get = mock.Mock(side_effect=_safe_get)
65 self.configuration.driver_handles_share_servers = False
66 self.configuration.share_backend_name = 'fake_share_backend_name'
67 self.configuration.macrosan_nas_http_protocol = 'https'
68 self.configuration.macrosan_nas_ip = 'fake_ip'
69 self.configuration.macrosan_nas_port = 'fake_port'
70 self.configuration.macrosan_nas_username = 'fake_username'
71 self.configuration.macrosan_nas_password = 'fake_password'
72 self.configuration.macrosan_nas_prefix = 'nas'
73 self.configuration.macrosan_share_pools = ['fake_pool']
74 self.configuration.macrosan_timeout = 60
75 self.configuration.macrosan_ssl_cert_verify = False
77 self.configuration.network_config_group = 'fake_network_config_group'
78 self.configuration.admin_network_config_group = (
79 'fake_admin_network_config_group')
80 self.configuration.config_group = 'fake_config_group'
81 self.configuration.reserved_share_percentage = 0
82 self.configuration.reserved_share_from_snapshot_percentage = 0
83 self.configuration.reserved_share_extend_percentage = 0
84 self.configuration.filter_function = None
85 self.configuration.goodness_function = None
86 self.driver = macrosan_nas.MacrosanNasDriver(
87 configuration=self.configuration)
88 self.result_success_storage_pools = {
89 'code': 0,
90 'message': 'success',
91 'data': [{
92 'name': 'fake_pool',
93 'size': '1000.0G',
94 'allocated': '100G',
95 'free': '900G',
96 'health': 'ONLINE',
97 'rwStatus': 'off'
98 }]
99 }
101 def test_do_setup(self):
102 mock_login = self.mock_object(rest_helper.RestHelper, 'login')
103 self.driver.do_setup(self._context)
104 mock_login.assert_called_once()
106 def test_do_setup_login_fail(self):
107 mock_login = self.mock_object(
108 rest_helper.RestHelper, 'login',
109 mock.Mock(
110 side_effect=exception.ShareBackendException(
111 msg='fake_exception')))
112 self.assertRaises(exception.ShareBackendException,
113 self.driver.do_setup,
114 self._context)
115 mock_login.assert_called_once()
117 @ddt.data({'nfs_status': constants.NFS_NON_CONFIG,
118 'cifs_status': constants.CIFS_NON_CONFIG},
119 {'nfs_status': constants.NFS_DISABLED,
120 'cifs_status': constants.CIFS_DISABLED},
121 {'nfs_status': constants.NFS_ENABLED,
122 'cifs_status': constants.CIFS_ENABLED},
123 {'nfs_status': constants.NFS_ENABLED,
124 'cifs_status': constants.CIFS_SHARE_MODE})
125 @ddt.unpack
126 def test_check_for_setup_error_non_config(self, nfs_status, cifs_status):
127 mock_gnss = self.mock_object(
128 rest_helper.RestHelper, '_get_nfs_service_status',
129 mock.Mock(return_value={
130 "serviceStatus": nfs_status,
131 "nfs3Status": constants.NFS_NON_SUPPORTED,
132 "nfs4Status": constants.NFS_NON_SUPPORTED
133 }))
135 mock_cns = self.mock_object(rest_helper.RestHelper,
136 '_config_nfs_service')
137 mock_sns = self.mock_object(rest_helper.RestHelper,
138 '_start_nfs_service')
139 if cifs_status == constants.CIFS_DISABLED:
140 mock_gcss = self.mock_object(
141 rest_helper.RestHelper, '_get_cifs_service_status',
142 mock.Mock(side_effect=[cifs_status,
143 constants.CIFS_SHARE_MODE]))
144 else:
145 mock_gcss = self.mock_object(
146 rest_helper.RestHelper, '_get_cifs_service_status',
147 mock.Mock(return_value=cifs_status))
148 mock_ccs = self.mock_object(rest_helper.RestHelper,
149 '_config_cifs_service')
150 mock_scs = self.mock_object(rest_helper.RestHelper,
151 '_start_cifs_service')
152 self.driver.check_for_setup_error()
153 if (nfs_status == constants.NFS_NON_CONFIG or
154 nfs_status == constants.NFS_DISABLED):
155 mock_cns.assert_called_once()
156 mock_sns.assert_called_once()
157 else:
158 mock_cns.assert_called_once()
159 mock_gnss.assert_called_once()
160 if cifs_status == constants.CIFS_NON_CONFIG:
161 mock_gcss.assert_called_once()
162 mock_ccs.assert_called_once()
163 mock_scs.assert_called_once()
164 elif cifs_status == constants.CIFS_DISABLED:
165 mock_gcss.assert_called()
166 mock_ccs.assert_called_once()
167 mock_scs.assert_called_once()
168 elif cifs_status == constants.CIFS_SHARE_MODE:
169 mock_gcss.assert_called_once()
170 mock_ccs.assert_called_once()
171 else:
172 mock_gcss.assert_called_once()
174 def test_check_for_setup_error_nfs_service_error(self):
175 mock_gnss = self.mock_object(
176 rest_helper.RestHelper, '_get_nfs_service_status',
177 mock.Mock(return_value={
178 "serviceStatus": constants.NFS_EXCEPTION,
179 "nfs3Status": constants.NFS_NON_SUPPORTED,
180 "nfs4Status": constants.NFS_NON_SUPPORTED
181 }))
182 self.assertRaises(exception.MacrosanBackendExeption,
183 self.driver.check_for_setup_error)
184 mock_gnss.assert_called_once()
186 def test_check_for_setup_error_cifs_service_error(self):
187 mock_gnss = self.mock_object(
188 rest_helper.RestHelper, '_get_nfs_service_status',
189 mock.Mock(return_value={
190 "serviceStatus": constants.NFS_ENABLED,
191 "nfs3Status": constants.NFS_SUPPORTED,
192 "nfs4Status": constants.NFS_SUPPORTED
193 }))
194 mock_gcss = self.mock_object(
195 rest_helper.RestHelper, '_get_cifs_service_status',
196 mock.Mock(return_value=constants.CIFS_EXCEPTION))
197 self.assertRaises(exception.MacrosanBackendExeption,
198 self.driver.check_for_setup_error)
199 mock_gnss.assert_called_once()
200 mock_gcss.assert_called_once()
202 @ddt.data('nfs', 'cifs')
203 def test_create_share(self, share_proto):
204 share = fake_share.fake_share(
205 share_proto=share_proto, host="fake_host@fake_backend#fake_pool")
206 mock_cf = self.mock_object(rest_helper.RestHelper,
207 '_create_filesystem')
208 mock_cfd = self.mock_object(rest_helper.RestHelper,
209 '_create_filesystem_dir')
210 mock_cns = self.mock_object(rest_helper.RestHelper,
211 '_create_nfs_share')
212 self.mock_object(macrosan_helper.MacrosanHelper,
213 '_ensure_user',
214 mock.Mock(return_value=True))
215 mock_ccs = self.mock_object(rest_helper.RestHelper,
216 '_create_cifs_share')
217 self.driver.helper.configuration.macrosan_nas_ip = "172.0.0.1"
219 location = self.driver.create_share(self._context, share)
220 if share_proto == 'nfs':
221 expect_location = r'172.0.0.1:/manila_fakeid/manila_fakeid'
222 print('test location:', location)
223 self.assertEqual(location, expect_location)
224 else:
225 expect_location = r'\\172.0.0.1\manila_fakeid'
226 self.assertEqual(location, expect_location)
227 mock_cf.assert_called_once_with(fs_name='manila_fakeid',
228 pool_name='fake_pool',
229 filesystem_quota='1GB')
230 mock_cf.assert_called()
231 share_path = self.driver.helper._generate_share_path('manila_fakeid')
232 mock_cfd.assert_called_once_with(share_path)
233 if share_proto == 'nfs':
234 mock_cns.assert_called_once_with(share_path=share_path)
235 else:
236 mock_ccs.assert_called_once()
238 def test_create_share_user_error(self):
239 share = fake_share.fake_share(
240 share_proto='cifs', host="fake_host@fake_backend#fake_pool")
241 mock_cf = self.mock_object(rest_helper.RestHelper,
242 '_create_filesystem')
243 mock_cfd = self.mock_object(rest_helper.RestHelper,
244 '_create_filesystem_dir')
245 self.mock_object(macrosan_helper.MacrosanHelper,
246 '_ensure_user',
247 mock.Mock(return_value=False))
248 mock_df = self.mock_object(rest_helper.RestHelper,
249 '_delete_filesystem')
250 self.assertRaises(exception.MacrosanBackendExeption,
251 self.driver.create_share,
252 self._context,
253 share)
254 mock_cf.assert_called_once()
255 share_path = self.driver.helper._generate_share_path('manila_fakeid')
256 mock_cfd.assert_called_once_with(share_path)
257 mock_df.assert_called_once_with('manila_fakeid')
259 @ddt.data('nfs', 'cifs')
260 def test_delete_share(self, share_proto):
261 share = fake_share.fake_share(
262 share_proto=share_proto, host="fake_host@fake_backend#fake_pool")
263 expect_share_path = self.driver.helper._generate_share_path(
264 'manila_fakeid')
266 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share',
267 mock.Mock(return_value={
268 "path": "fake_path",
269 "clients": ["client"],
270 "protocol": "fake_protocol"
271 }))
272 mock_dns = self.mock_object(
273 rest_helper.RestHelper, '_delete_nfs_share')
274 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share',
275 mock.Mock(return_value={
276 "path": "fake_path",
277 "cifsname": "fake_cifsname",
278 "protocol": "fake_protocol",
279 "roList": ["fake_ro"],
280 "rwList": ["fake_rw"],
281 "allowList": ["fake_allow"],
282 "denyList": ["fake_deny"]
283 }))
284 mock_dcs = self.mock_object(rest_helper.RestHelper,
285 '_delete_cifs_share')
286 mock_df = self.mock_object(rest_helper.RestHelper,
287 '_delete_filesystem')
288 self.driver.delete_share(self._context, share)
290 if share_proto == "nfs":
291 mock_gns.assert_called_once_with(expect_share_path)
292 mock_dns.assert_called_once_with(expect_share_path)
293 else:
294 mock_gcs.assert_called_once_with(expect_share_path)
295 mock_dcs.assert_called_once_with('manila_fakeid',
296 expect_share_path)
297 mock_df.assert_called_once_with('manila_fakeid')
299 @ddt.data('nfs', 'cifs')
300 def test_delete_share_not_exist(self, share_proto):
301 share = fake_share.fake_share(share_proto=share_proto,
302 host="fake_host@fake_backend#fake_pool")
303 expect_share_path = self.driver.helper._generate_share_path(
304 'manila_fakeid')
305 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share',
306 mock.Mock(return_value=None))
307 mock_gf = self.mock_object(rest_helper.RestHelper, '_get_filesystem',
308 mock.Mock(return_value={
309 "name": "fake_name",
310 "poolName": "fake_pool",
311 "quotaStatus": "1GB"
312 }))
313 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share',
314 mock.Mock(return_value=None))
315 mock_df = self.mock_object(rest_helper.RestHelper,
316 '_delete_filesystem')
317 self.driver.delete_share(self._context, share)
319 if share_proto == 'nfs':
320 mock_gns.assert_called_once_with(expect_share_path)
321 else:
322 mock_gcs.assert_called_once_with(expect_share_path)
324 mock_gf.assert_called_once_with('manila_fakeid')
325 mock_df.assert_called_once_with('manila_fakeid')
327 @ddt.data('nfs', 'cifs')
328 def test_extend_share(self, share_proto):
329 share = fake_share.fake_share(share_proto=share_proto,
330 host="fake_host@fake_backend#fake_pool")
331 expect_share_path = self.driver.helper._generate_share_path(
332 'manila_fakeid')
333 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share',
334 mock.Mock(return_value={
335 "path": "fake_path",
336 "clients": ["client"],
337 "protocol": "fake_protocol"
338 }))
339 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share',
340 mock.Mock(return_value={
341 "path": "fake_path",
342 "cifsname": "fake_cifsname",
343 "protocol": "fake_protocol",
344 "roList": ["fake_ro"],
345 "rwList": ["fake_rw"],
346 "allowList": ["fake_allow"],
347 "denyList": ["fake_deny"]
348 }))
349 mock_uss = self.mock_object(rest_helper.RestHelper,
350 '_update_share_size')
351 self.driver.extend_share(share, 2)
353 if share_proto == 'nfs':
354 mock_gns.assert_called_once_with(expect_share_path)
355 else:
356 mock_gcs.assert_called_once_with(expect_share_path)
358 mock_uss.assert_called_once_with('manila_fakeid', '2GB')
360 def test_extend_share_not_exist(self):
361 share = fake_share.fake_share(share_proto='nfs',
362 size=1,
363 host="fake_host@fake_backend#fake_pool")
364 expect_share_path = self.driver.helper._generate_share_path(
365 'manila_fakeid')
366 mock_gns = self.mock_object(rest_helper.RestHelper,
367 '_get_nfs_share',
368 mock.Mock(return_value=None))
369 self.assertRaises(exception.ShareResourceNotFound,
370 self.driver.extend_share,
371 share,
372 2)
374 mock_gns.assert_called_once_with(expect_share_path)
376 @ddt.data('nfs', 'cifs')
377 def test_shrink_share(self, share_proto):
378 share = fake_share.fake_share(share_proto=share_proto,
379 size=5,
380 host="fake_host@fake_backend#fake_pool")
381 expect_share_path = self.driver.helper._generate_share_path(
382 'manila_fakeid')
383 mock_gns = self.mock_object(rest_helper.RestHelper,
384 '_get_nfs_share',
385 mock.Mock(return_value={
386 "path": "fake_path",
387 "clients": ["client"],
388 "protocol": "fake_protocol"
389 }))
390 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share',
391 mock.Mock(return_value={
392 "path": "fake_path",
393 "cifsname": "fake_cifsname",
394 "protocol": "fake_protocol",
395 "roList": ["fake_ro"],
396 "rwList": ["fake_rw"],
397 "allowList": ["fake_allow"],
398 "denyList": ["fake_deny"]
399 }))
400 mock_gf = self.mock_object(rest_helper.RestHelper, '_get_filesystem',
401 mock.Mock(return_value={
402 "name": "fake_name",
403 "poolName": "fake_pool",
404 "quotaStatus": "5GB",
405 "usedCapacity": '1GB'
406 }))
407 mock_uss = self.mock_object(rest_helper.RestHelper,
408 '_update_share_size')
409 self.driver.shrink_share(share, 3)
410 if share_proto == 'nfs':
411 mock_gns.assert_called_once_with(expect_share_path)
412 else:
413 mock_gcs.assert_called_once_with(expect_share_path)
415 mock_gf.assert_called_once_with('manila_fakeid')
416 mock_uss.assert_called_once_with('manila_fakeid', '3GB')
418 @ddt.data('nfs', 'cifs')
419 def test_shrink_share_not_exist(self, share_proto):
420 share = fake_share.fake_share(share_proto=share_proto,
421 size=3,
422 host="fake_host@fake_backend#fake_pool")
423 expect_share_path = self.driver.helper._generate_share_path(
424 'manila_fakeid')
425 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share',
426 mock.Mock(return_value=None))
427 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share',
428 mock.Mock(return_value=None))
430 self.assertRaises(exception.ShareResourceNotFound,
431 self.driver.shrink_share,
432 share,
433 1)
434 if share_proto == 'nfs':
435 mock_gns.assert_called_once_with(expect_share_path)
436 elif share_proto == 'cifs': 436 ↛ exitline 436 didn't return from function 'test_shrink_share_not_exist' because the condition on line 436 was always true
437 mock_gcs.assert_called_once_with(expect_share_path)
439 def test_shrink_share_size_fail(self):
440 share = fake_share.fake_share(share_proto='nfs',
441 size=3,
442 host="fake_host@fake_backend#fake_pool")
443 expect_share_path = self.driver.helper._generate_share_path(
444 'manila_fakeid')
445 mock_gns = self.mock_object(rest_helper.RestHelper,
446 '_get_nfs_share',
447 mock.Mock(return_value={
448 "path": "fake_path",
449 "clients": ["client"],
450 "protocol": "fake_protocol"
451 }))
452 mock_gf = self.mock_object(rest_helper.RestHelper, '_get_filesystem',
453 mock.Mock(return_value={
454 "name": "fake_name",
455 "poolName": "fake_pool",
456 "quotaStatus": "3GB",
457 "usedCapacity": '2GB'
458 }))
459 self.assertRaises(exception.ShareShrinkingPossibleDataLoss,
460 self.driver.shrink_share,
461 share,
462 1)
463 mock_gf.assert_called_once_with('manila_fakeid')
464 mock_gns.assert_called_once_with(expect_share_path)
466 @ddt.data('nfs', 'cifs')
467 def test_ensure_share(self, share_proto):
468 share = fake_share.fake_share(share_proto=share_proto,
469 host="fake_host@fake_backend#fake_pool")
470 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share',
471 mock.Mock(return_value={
472 "path": "fake_path",
473 "clients": ["client"],
474 "protocol": "fake_protocol"
475 }))
476 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share',
477 mock.Mock(return_value={
478 "path": "fake_path",
479 "cifsname": "fake_cifsname",
480 "protocol": "fake_protocol",
481 "roList": ["fake_ro"],
482 "rwList": ["fake_rw"],
483 "allowList": ["fake_allow"],
484 "denyList": ["fake_deny"],
485 }))
486 self.driver.helper.configuration.macrosan_nas_ip = "172.0.0.1"
487 locations = self.driver.ensure_share(self._context, share)
488 expect_share_path = self.driver.helper._generate_share_path(
489 'manila_fakeid')
490 if share_proto == 'nfs':
491 expect_locations = [r'172.0.0.1:/manila_fakeid/manila_fakeid']
492 self.assertEqual(locations, expect_locations)
493 mock_gns.assert_called_once_with(expect_share_path)
494 else:
495 expect_locations = [r'\\172.0.0.1\manila_fakeid']
496 self.assertEqual(locations, expect_locations)
497 mock_gcs.assert_called_once_with(expect_share_path)
499 def test_ensure_share_proto_fail(self):
500 share = fake_share.fake_share(host="fake_host@fake_backend#fake_pool")
501 self.mock_object(rest_helper.RestHelper, '_get_nfs_share',
502 mock.Mock(return_value={
503 "path": "fake_path",
504 "clients": ["client"],
505 "protocol": "fake_protocol"
506 }))
507 self.assertRaises(exception.MacrosanBackendExeption,
508 self.driver.ensure_share,
509 self._context,
510 share)
512 def test_ensure_share_not_exist(self):
513 share = fake_share.fake_share(share_proto='nfs',
514 host="fake_host@fake_backend#fake_pool")
515 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share',
516 mock.Mock(return_value=None))
517 self.assertRaises(exception.ShareResourceNotFound,
518 self.driver.ensure_share,
519 self._context,
520 share)
521 expect_share_path = self.driver.helper._generate_share_path(
522 'manila_fakeid')
523 mock_gns.assert_called_once_with(expect_share_path)
525 @ddt.data('nfs', 'cifs')
526 def test_allow_access_success(self, share_proto):
527 share = fake_share.fake_share(share_proto=share_proto,
528 host="fake_host@fake_backend#fake_pool")
529 if share_proto == 'nfs':
530 access = {
531 'access_type': 'ip',
532 'access_to': '0.0.0.0/0',
533 'access_level': 'rw',
534 }
535 else:
536 access = {
537 'access_type': 'user',
538 'access_to': 'fake_user',
539 'access_level': 'rw',
540 }
541 mock_gns = self.mock_object(rest_helper.RestHelper,
542 '_get_nfs_share',
543 mock.Mock(return_value={
544 "path": "fake_path",
545 "clients": ["client"],
546 "protocol": "fake_protocol"
547 }))
548 mock_gafns = self.mock_object(rest_helper.RestHelper,
549 '_get_access_from_nfs_share',
550 mock.Mock(return_value=None))
551 mock_anar = self.mock_object(rest_helper.RestHelper,
552 '_allow_nfs_access_rest')
553 mock_gcs = self.mock_object(rest_helper.RestHelper,
554 '_get_cifs_share',
555 mock.Mock(return_value={
556 "path": "fake_path",
557 "cifsname": "fake_cifsname",
558 "protocol": "fake_protocol",
559 "roList": ["fake_ro"],
560 "rwList": ["fake_rw"],
561 "allowList": ["fake_allow"],
562 "denyList": ["fake_deny"],
563 }))
564 mock_gafcs = self.mock_object(rest_helper.RestHelper,
565 '_get_access_from_cifs_share',
566 mock.Mock(return_value=None))
567 mock_acar = self.mock_object(rest_helper.RestHelper,
568 '_allow_cifs_access_rest')
569 self.driver.helper._allow_access(share, access)
570 expect_share_path = self.driver.helper._generate_share_path(
571 'manila_fakeid')
572 if access['access_to'] == '0.0.0.0/0':
573 access['access_to'] = '*'
574 if share_proto == 'nfs':
575 mock_gns.assert_called_once_with(expect_share_path)
576 mock_gafns.assert_called_once_with(expect_share_path,
577 access['access_to'])
578 mock_anar.assert_called_once_with(expect_share_path,
579 access['access_to'],
580 access['access_level'])
581 else:
582 mock_gcs.assert_called_once_with(expect_share_path)
583 mock_gafcs.assert_called_once_with(expect_share_path,
584 access['access_to'])
585 mock_acar.assert_called_once_with(expect_share_path,
586 access['access_to'],
587 access['access_level'])
589 def test_allow_access_nfs_change(self):
590 share = fake_share.fake_share(share_proto='nfs',
591 host="fake_host@fake_backend#fake_pool")
592 access = {
593 'access_type': 'ip',
594 'access_to': '172.0.0.1',
595 'access_level': 'rw',
596 }
597 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share',
598 mock.Mock(return_value={
599 "path": "/manila_fakeid",
600 "clients": ["client"],
601 "protocol": "fake_protocol"
602 }))
603 mock_gafns = self.mock_object(rest_helper.RestHelper,
604 '_get_access_from_nfs_share',
605 mock.Mock(return_value={
606 "path": "/manila_fakeid",
607 "clientName": "fake_client_name",
608 "accessRight": "ro",
609 }))
610 mock_cnar = self.mock_object(rest_helper.RestHelper,
611 '_change_nfs_access_rest')
612 self.driver.helper._allow_access(share, access)
613 expect_share_path = self.driver.helper._generate_share_path(
614 'manila_fakeid')
615 mock_gns.assert_called_once_with(expect_share_path)
616 mock_gafns.assert_called_once_with(expect_share_path,
617 access['access_to'])
618 mock_cnar.assert_called_once_with(expect_share_path,
619 access['access_to'],
620 access['access_level'])
622 def test_allow_access_cifs_change(self):
623 share = fake_share.fake_share(share_proto='cifs',
624 host="fake_host@fake_backend#fake_pool")
625 access = {
626 'access_type': 'user',
627 'access_to': 'fake_user',
628 'access_level': 'rw',
629 }
630 mock_gcs = self.mock_object(rest_helper.RestHelper,
631 '_get_cifs_share',
632 mock.Mock(return_value={
633 "path": "fake_path",
634 "cifsname": "fake_cifsname",
635 "protocol": "fake_protocol",
636 "roList": ["fake_ro"],
637 "rwList": ["fake_rw"],
638 "allowList": ["fake_allow"],
639 "denyList": ["fake_deny"],
640 }))
641 mock_gafcs = self.mock_object(rest_helper.RestHelper,
642 '_get_access_from_cifs_share',
643 mock.Mock(return_value={
644 "path": "fake_path",
645 "ugName": "fake_user",
646 "ugType": "0",
647 "accessRight": "ro",
648 }))
649 mock_ccar = self.mock_object(rest_helper.RestHelper,
650 '_change_cifs_access_rest')
651 self.driver.helper._allow_access(share, access)
652 expect_share_path = self.driver.helper._generate_share_path(
653 'manila_fakeid')
654 mock_gcs.assert_called_once_with(expect_share_path)
655 mock_gafcs.assert_called_once_with(expect_share_path,
656 access['access_to'])
657 mock_ccar.assert_called_once_with(expect_share_path,
658 access['access_to'],
659 access['access_level'],
660 '0')
662 @ddt.data(
663 {
664 'access_type': 'user',
665 'access_to': 'user_name',
666 'access_level': 'rw',
667 },
668 {
669 'access_type': 'user',
670 'access_to': 'group_name',
671 'access_level': 'rw',
672 },
673 {
674 'access_type': 'user',
675 'access_to': '/domain_user',
676 'access_level': 'rw',
677 },
678 {
679 'access_type': 'user',
680 'access_to': '/domain_group',
681 'access_level': 'rw',
682 },
683 )
684 def test_allow_access_cifs(self, access):
685 share = fake_share.fake_share(share_proto='cifs',
686 host="fake_host@fake_backend#fake_pool")
687 mock_gcs = self.mock_object(rest_helper.RestHelper,
688 '_get_cifs_share',
689 mock.Mock(return_value={
690 "path": "fake_path",
691 "cifsname": "fake_cifsname",
692 "protocol": "fake_protocol",
693 "roList": ["fake_ro"],
694 "rwList": ["fake_rw"],
695 "allowList": ["fake_allow"],
696 "denyList": ["fake_deny"],
697 }))
698 mock_gafcs = self.mock_object(rest_helper.RestHelper,
699 '_get_access_from_cifs_share',
700 mock.Mock(return_value=None))
701 mock_acar = self.mock_object(rest_helper.RestHelper,
702 '_allow_cifs_access_rest')
704 self.driver.helper._allow_access(share, access)
705 expect_share_path = self.driver.helper._generate_share_path(
706 'manila_fakeid')
707 mock_gcs.assert_called_once_with(expect_share_path)
708 mock_gafcs.assert_called_once_with(expect_share_path,
709 access['access_to'])
710 mock_acar.assert_called_once_with(expect_share_path,
711 access['access_to'],
712 access['access_level'])
714 @ddt.data('nfs', 'cifs')
715 def test_allow_access_share_not_exist(self, share_proto):
716 share = fake_share.fake_share(share_proto=share_proto,
717 host="fake_host@fake_backend#fake_pool")
718 access = {}
719 if share_proto == 'nfs':
720 access = {
721 'access_type': 'ip',
722 'access_to': '172.0.0.1',
723 'access_level': 'rw',
724 }
725 else:
726 access = {
727 'access_type': 'user',
728 'access_to': 'fake_user',
729 'access_level': 'rw',
730 }
731 mock_gns = self.mock_object(rest_helper.RestHelper, '_get_nfs_share',
732 mock.Mock(return_value=None))
733 mock_gcs = self.mock_object(rest_helper.RestHelper, '_get_cifs_share',
734 mock.Mock(return_value=None))
735 self.assertRaises(exception.ShareResourceNotFound,
736 self.driver.helper._allow_access,
737 share,
738 access)
739 expect_share_path = self.driver.helper._generate_share_path(
740 'manila_fakeid')
741 if share_proto == 'nfs':
742 mock_gns.assert_called_once_with(expect_share_path)
743 else:
744 mock_gcs.assert_called_once_with(expect_share_path)
746 def test_allow_access_proto_fail(self):
747 share = fake_share.fake_share(host="fake_host@fake_backend#fake_pool")
748 access = {
749 'access_type': 'user',
750 'access_to': 'fake_user',
751 'access_level': 'rw',
752 }
753 self.assertRaises(exception.MacrosanBackendExeption,
754 self.driver.helper._allow_access,
755 share,
756 access)
758 def test_allow_access_nfs_user_fail(self):
759 share = fake_share.fake_share(share_proto='nfs',
760 host="fake_host@fake_backend#fake_pool")
761 access = {
762 'access_type': 'user',
763 'access_to': 'fake_user',
764 'access_level': 'rw',
765 }
766 self.assertRaises(exception.InvalidShareAccess,
767 self.driver.helper._allow_access,
768 share,
769 access)
771 def test_allow_access_cifs_ip_fail(self):
772 share = fake_share.fake_share(share_proto='cifs',
773 host="fake_host@fake_backend#fake_pool")
774 access = {
775 'access_type': 'ip',
776 'access_to': '172.0.0.1',
777 'access_level': 'rw',
778 }
779 self.assertRaises(exception.InvalidShareAccess,
780 self.driver.helper._allow_access,
781 share,
782 access)
784 def test_allow_access_nfs_level_fail(self):
785 share = fake_share.fake_share(share_proto='nfs',
786 host="fake_host@fake_backend#fake_pool")
787 access = {
788 'access_type': 'ip',
789 'access_to': '172.0.0.1',
790 'access_level': 'r',
791 }
792 self.assertRaises(exception.InvalidShareAccess,
793 self.driver.helper._allow_access,
794 share,
795 access)
797 @ddt.data('nfs', 'cifs')
798 def test_deny_access(self, share_proto):
799 share = fake_share.fake_share(share_proto=share_proto,
800 host="fake_host@fake_backend#fake_pool")
801 if share_proto == 'nfs':
802 access = {
803 'access_type': 'ip',
804 'access_to': '0.0.0.0/0',
805 'access_level': 'rw',
806 }
807 else:
808 access = {
809 'access_type': 'user',
810 'access_to': 'fake_user',
811 'access_level': 'rw',
812 }
813 mock_gafns = self.mock_object(rest_helper.RestHelper,
814 '_get_access_from_nfs_share',
815 mock.Mock(return_value={
816 "path": "fake_path",
817 "clientName": "fake_client_name",
818 "accessRight": "rw",
819 }))
820 mock_dnar = self.mock_object(rest_helper.RestHelper,
821 '_delete_nfs_access_rest')
822 mock_gafcs = self.mock_object(rest_helper.RestHelper,
823 '_get_access_from_cifs_share',
824 mock.Mock(return_value={
825 "path": "fake_path",
826 "ugName": "fake_user",
827 "ugType": "0",
828 "accessRight": "rw",
829 }))
830 mock_dcar = self.mock_object(rest_helper.RestHelper,
831 '_delete_cifs_access_rest')
832 expect_share_path = self.driver.helper._generate_share_path(
833 'manila_fakeid')
834 self.driver.helper._deny_access(share, access)
835 if access['access_to'] == '0.0.0.0/0':
836 access['access_to'] = '*'
837 if share_proto == 'nfs':
838 mock_gafns.assert_called_once_with(expect_share_path,
839 access['access_to'])
840 mock_dnar.assert_called_once_with(expect_share_path,
841 access['access_to'])
842 else:
843 mock_gafcs.assert_called_once_with(expect_share_path,
844 access['access_to'])
845 mock_dcar.assert_called_once_with(expect_share_path,
846 "fake_user", "0")
848 def test_deny_access_nfs_type_fail(self):
849 share = fake_share.fake_share(share_proto='nfs',
850 host="fake_host@fake_backend#fake_pool")
851 access = {
852 'access_type': 'fake_type',
853 'access_to': '172.0.0.1',
854 'access_level': 'rw',
855 }
856 result = self.driver.helper._deny_access(share, access)
857 self.assertIsNone(result)
859 def test_deny_access_nfs_share_not_exist(self):
860 share = fake_share.fake_share(share_proto='nfs',
861 host="fake_host@fake_backend#fake_pool")
862 access = {
863 'access_type': 'ip',
864 'access_to': '172.0.0.1',
865 'access_level': 'rw',
866 }
867 mock_gafns = self.mock_object(rest_helper.RestHelper,
868 '_get_access_from_nfs_share',
869 mock.Mock(return_value=None))
870 result = self.driver.helper._deny_access(share, access)
871 self.assertIsNone(result)
872 expect_share_path = self.driver.helper._generate_share_path(
873 'manila_fakeid')
874 mock_gafns.assert_called_once_with(expect_share_path,
875 access['access_to'])
877 def test_deny_access_cifs_type_fail(self):
878 share = fake_share.fake_share(share_proto='cifs',
879 host="fake_host@fake_backend#fake_pool")
880 access = {
881 'access_type': 'fake_type',
882 'access_to': 'fake_user',
883 'access_level': 'rw',
884 }
885 result = self.driver.helper._deny_access(share, access)
886 self.assertIsNone(result)
888 def test_deny_access_cifs_share_not_exist(self):
889 share = fake_share.fake_share(share_proto='cifs',
890 host="fake_host@fake_backend#fake_pool")
891 access = {
892 'access_type': 'user',
893 'access_to': 'fake_user',
894 'access_level': 'rw',
895 }
896 mock_gafcs = self.mock_object(rest_helper.RestHelper,
897 '_get_access_from_cifs_share',
898 mock.Mock(return_value=None))
899 result = self.driver.helper._deny_access(share, access)
900 self.assertIsNone(result)
901 expect_share_path = self.driver.helper._generate_share_path(
902 'manila_fakeid')
903 mock_gafcs.assert_called_once_with(expect_share_path,
904 access['access_to'])
906 def test_update_access_add_delete(self):
907 share = fake_share.fake_share(share_proto='nfs',
908 host="fake_host@fake_backend#fake_pool")
909 add_rules = [{'access_type': 'ip',
910 'access_to': '172.0.2.1',
911 'access_level': 'rw', }]
912 delete_rules = [{'access_type': 'ip',
913 'access_to': '172.0.2.2',
914 'access_level': 'rw', }]
915 self.mock_object(macrosan_helper.MacrosanHelper,
916 '_allow_access')
917 self.mock_object(macrosan_helper.MacrosanHelper,
918 '_deny_access')
919 self.driver.update_access(self._context, share,
920 None, add_rules, delete_rules, None)
922 @ddt.data('nfs', 'cifs')
923 def test_update_access_nfs(self, proto):
924 share = fake_share.fake_share(share_proto=proto,
925 host="fake_host@fake_backend#fake_pool")
926 if proto == 'nfs':
927 access_rules = [{'access_type': 'ip',
928 'access_to': '172.0.3.1',
929 'access_level': 'rw', },
930 {'access_type': 'ip',
931 'access_to': '172.0.3.2',
932 'access_level': 'rw', }]
933 else:
934 access_rules = [{'access_type': 'user',
935 'access_to': 'user_l',
936 'access_level': 'rw', },
937 {'access_type': 'user',
938 'access_to': 'user_a',
939 'access_level': 'rw', }]
940 mock_ca = self.mock_object(macrosan_helper.MacrosanHelper,
941 '_clear_access')
942 self.mock_object(macrosan_helper.MacrosanHelper,
943 '_allow_access')
944 self.driver.update_access(self._context, share,
945 access_rules, {}, {}, {})
946 mock_ca.assert_called_once_with(share, None)
948 def test_update_access_fail(self):
949 share = fake_share.fake_share(share_proto='nfs',
950 host="fake_host@fake_backend#fake_pool")
951 access_rules = [{'access_id': 'fakeid',
952 'access_type': 'ip',
953 'access_to': '172.0.3.1',
954 'access_level': 'rw', }]
955 mock_ca = self.mock_object(macrosan_helper.MacrosanHelper,
956 '_clear_access')
957 self.mock_object(macrosan_helper.MacrosanHelper,
958 '_allow_access',
959 mock.Mock(side_effect=exception.InvalidShareAccess(
960 reason='fake_exception')))
961 result = self.driver.update_access(self._context, share,
962 access_rules, None, None, None)
963 expect = {
964 'fakeid': {
965 'state': 'error',
966 }
967 }
968 self.assertEqual(result, expect)
969 mock_ca.assert_called_once_with(share, None)
971 def test_update_access_add_fail(self):
972 share = fake_share.fake_share(share_proto='nfs',
973 host="fake_host@fake_backend#fake_pool")
974 add_rules = [{'access_id': 'fakeid',
975 'access_type': 'ip',
976 'access_to': '172.0.2.1',
977 'access_level': 'rw', }]
978 delete_rules = []
979 self.mock_object(macrosan_helper.MacrosanHelper,
980 '_allow_access',
981 mock.Mock(side_effect=exception.InvalidShareAccess(
982 reason='fake_exception')))
983 self.mock_object(macrosan_helper.MacrosanHelper,
984 '_deny_access')
985 result = self.driver.update_access(self._context, share,
986 None, add_rules, delete_rules, None)
987 expect = {
988 'fakeid': {
989 'state': 'error'
990 }
991 }
992 self.assertEqual(result, expect)
994 @ddt.data('nfs', 'cifs')
995 def test__clear_access(self, share_proto):
996 share = fake_share.fake_share(share_proto=share_proto,
997 host="fake_host@fake_backend#fake_pool")
998 fake_nfs_share_backend = [
999 {
1000 'share_path': 'fake_path',
1001 'access_to': '172.0.0.1',
1002 'access_level': 'rw'
1003 },
1004 {
1005 'share_path': 'default_path',
1006 'access_to': '172.0.0.2',
1007 'access_level': 'rw'
1008 }]
1009 fake_cifs_share_backend = [
1010 {
1011 'share_path': 'fake_path',
1012 'access_to': 'user_name',
1013 'ugType': '0',
1014 'access_level': 'rw'
1015 },
1016 {
1017 'share_path': 'default_path',
1018 'access_to': 'manilanobody',
1019 'ugType': '0',
1020 'access_level': 'rw'
1021 }]
1022 mock_ganar = self.mock_object(
1023 rest_helper.RestHelper,
1024 '_get_all_nfs_access_rest',
1025 mock.Mock(return_value=fake_nfs_share_backend))
1026 mock_gacar = self.mock_object(
1027 rest_helper.RestHelper, '_get_all_cifs_access_rest',
1028 mock.Mock(return_value=fake_cifs_share_backend))
1029 self.mock_object(rest_helper.RestHelper,
1030 '_delete_nfs_access_rest')
1031 self.mock_object(rest_helper.RestHelper,
1032 '_delete_cifs_access_rest')
1033 self.driver.helper._clear_access(share)
1034 expect_share_path = self.driver.helper._generate_share_path(
1035 'manila_fakeid')
1036 if share_proto == 'nfs':
1037 mock_ganar.assert_called_once_with(expect_share_path)
1038 else:
1039 mock_gacar.assert_called_once_with(expect_share_path)
1041 @ddt.data('nfs', 'cifs')
1042 def test__clear_access_no_access_list(self, share_proto):
1043 share = fake_share.fake_share(share_proto=share_proto,
1044 host="fake_host@fake_backend#fake_pool")
1045 mock_ganar = self.mock_object(
1046 rest_helper.RestHelper,
1047 '_get_all_nfs_access_rest',
1048 mock.Mock(return_value=[]))
1049 mock_gacar = self.mock_object(
1050 rest_helper.RestHelper, '_get_all_cifs_access_rest',
1051 mock.Mock(return_value=[]))
1052 self.driver.helper._clear_access(share)
1053 expect_share_path = self.driver.helper._generate_share_path(
1054 'manila_fakeid')
1055 if share_proto == 'nfs':
1056 mock_ganar.assert_called_once_with(expect_share_path)
1057 else:
1058 mock_gacar.assert_called_once_with(expect_share_path)
1060 @ddt.data(constants.USER_NOT_EXIST,
1061 constants.USER_EXIST,
1062 constants.USER_FORMAT_ERROR)
1063 def test__ensure_user(self, query_result):
1064 mock_qu = self.mock_object(rest_helper.RestHelper,
1065 '_query_user',
1066 mock.Mock(return_value=query_result))
1067 mock_qg = self.mock_object(
1068 rest_helper.RestHelper,
1069 '_query_group',
1070 mock.Mock(return_value=constants.GROUP_NOT_EXIST))
1071 mock_alg = self.mock_object(rest_helper.RestHelper,
1072 '_add_localgroup')
1073 mock_alu = self.mock_object(rest_helper.RestHelper,
1074 '_add_localuser')
1075 result = self.driver.helper._ensure_user('fake_user',
1076 'fake_passwd',
1077 'fake_group')
1078 if query_result == constants.USER_NOT_EXIST:
1079 mock_qg.assert_called_once_with('fake_group')
1080 mock_alg.assert_called_once_with('fake_group')
1081 mock_alu.assert_called_once_with('fake_user',
1082 'fake_passwd',
1083 'fake_group')
1084 self.assertTrue(result)
1085 elif query_result == constants.USER_EXIST:
1086 self.assertTrue(result)
1087 else:
1088 self.assertFalse(result)
1089 mock_qu.assert_called_once_with('fake_user')
1091 def test__ensure_user_fail(self):
1092 mock_qu = self.mock_object(
1093 rest_helper.RestHelper,
1094 '_query_user',
1095 mock.Mock(return_value=constants.USER_NOT_EXIST))
1096 mock_qg = self.mock_object(
1097 rest_helper.RestHelper,
1098 '_query_group',
1099 mock.Mock(return_value=constants.GROUP_FORMAT_ERROR))
1100 self.assertRaises(exception.InvalidInput,
1101 self.driver.helper._ensure_user,
1102 'fake_user',
1103 'fake_passwd',
1104 'fake_group')
1105 mock_qu.assert_called_once_with('fake_user')
1106 mock_qg.assert_called_once_with('fake_group')
1108 def test__update_share_stats(self):
1109 self.driver.helper.pools = ['fake_pool']
1110 mock_gap = self.mock_object(rest_helper.RestHelper,
1111 '_get_all_pool',
1112 mock.Mock(return_value='fake_result'))
1113 mock_gpc = self.mock_object(macrosan_helper.MacrosanHelper,
1114 '_get_pool_capacity',
1115 mock.Mock(return_value={
1116 "totalcapacity": 10,
1117 "freecapacity": 9,
1118 "allocatedcapacity": 1,
1119 }))
1120 mock_uss = self.mock_object(driver.ShareDriver, '_update_share_stats')
1122 self.driver._update_share_stats()
1124 data = {}
1125 data['vendor_name'] = self.driver.VENDOR
1126 data['driver_version'] = self.driver.VERSION
1127 data['storage_protocol'] = self.driver.PROTOCOL
1128 data['share_backend_name'] = 'fake_share_backend_name'
1129 data['pools'] = [{
1130 'pool_name': 'fake_pool',
1131 'total_capacity_gb': 10,
1132 'free_capacity_gb': 9,
1133 'allocated_capacity_gb': 1,
1134 'reserved_percentage': 0,
1135 'reserved_snapshot_percentage': 0,
1136 'reserved_share_extend_percentage': 0,
1137 'dedupe': False,
1138 'compression': False,
1139 'qos': False,
1140 'thin_provisioning': False,
1141 'snapshot_support': False,
1142 'create_share_from_snapshot_support':
1143 False,
1144 }]
1145 mock_gap.assert_called_once()
1146 mock_gpc.assert_called_once_with('fake_pool', 'fake_result')
1147 mock_uss.assert_called_once_with(data)
1149 def test__update_share_stats_pool_not_exist(self):
1150 self.driver.helper.pools = ['fake_pool']
1151 self.mock_object(rest_helper.RestHelper, '_get_all_pool',
1152 mock.Mock(return_value='fake_result'))
1153 self.mock_object(macrosan_helper.MacrosanHelper,
1154 '_get_pool_capacity',
1155 mock.Mock(return_value={}))
1156 self.assertRaises(exception.InvalidInput,
1157 self.driver._update_share_stats
1158 )
1160 def test__get_pool_capacity(self):
1161 self.mock_object(macrosan_helper.MacrosanHelper,
1162 '_find_pool_info',
1163 mock.Mock(return_value={
1164 "name": "fake_pool",
1165 "totalcapacity": "100.0G",
1166 "allocatedcapacity": "22G",
1167 "freecapacity": "78G",
1168 "health": "ONLINE",
1169 "rw": "off",
1170 }))
1171 res = self.driver.helper._get_pool_capacity("fake_pool",
1172 "fake_result")
1173 self.assertEqual(100, res['totalcapacity'])
1174 self.assertEqual(78, res['freecapacity'])
1175 self.assertEqual(22, res['allocatedcapacity'])
1177 def test__generate_share_name(self):
1178 share = fake_share.fake_share(host="fake_host@fake_backend#fake_pool")
1179 result = self.driver.helper._generate_share_name(share)
1180 self.assertEqual("manila_fakeid", result)
1182 def test__format_name(self):
1183 a = 'fake-1234567890-1234567890-1234567890'
1184 expect = 'fake_1234567890_1234567890_1234'
1185 result = self.driver.helper._format_name(a)
1186 self.assertEqual(expect, result)
1188 def test__generate_share_path(self):
1189 share_name = 'manila_fakeid'
1190 result = self.driver.helper._generate_share_path(share_name)
1192 self.assertEqual(r'/manila_fakeid/manila_fakeid', result)
1194 @ddt.data('nfs', 'cifs')
1195 def test__get_location_path(self, share_proto):
1196 self.driver.helper.configuration.macrosan_nas_ip = "172.0.0.1"
1197 result = self.driver.helper._get_location_path('fake_path',
1198 'fake_name',
1199 share_proto)
1200 if share_proto == 'nfs':
1201 expect = r'172.0.0.1:fake_path'
1202 elif share_proto == 'cifs': 1202 ↛ 1204line 1202 didn't jump to line 1204 because the condition on line 1202 was always true
1203 expect = r'\\172.0.0.1\fake_name'
1204 self.assertEqual(expect, result)
1206 def test__get_share_instance_pnp_pool_error(self):
1207 share = fake_share.fake_share(
1208 share_proto="nfs", host="fake_host@fake_backend")
1209 self.assertRaises(exception.InvalidHost,
1210 self.driver.helper._get_share_instance_pnp,
1211 share)
1213 def test__get_share_instance_pnp_proto_error(self):
1214 share = fake_share.fake_share(
1215 share_proto="CephFS", host="fake_host@fake_backend#fake_pool")
1216 self.assertRaises(exception.MacrosanBackendExeption,
1217 self.driver.helper._get_share_instance_pnp,
1218 share)
1220 @ddt.data('2000000000', '2000000KB', '2000MB', '20GB', '2TB')
1221 def test__unit_convert_toGB(self, capacity):
1222 convert = {'2000000000': '%.0f' % (float(2000000000) / 1024 ** 3),
1223 '2000000KB': '%.0f' % (float(2000000) / 1024 ** 2),
1224 '2000MB': '%.0f' % (float(2000) / 1024),
1225 '20GB': '%.0f' % float(20),
1226 '2TB': '%.0f' % (float(2) * 1024)}
1227 expect = float(convert[capacity])
1228 result = self.driver.helper._unit_convert_toGB(capacity)
1229 self.assertEqual(expect, result)
1231 @ddt.data('nfs', 'cifs')
1232 def test__get_share(self, proto):
1233 proto = proto.upper()
1234 mock_gns = self.mock_object(rest_helper.RestHelper,
1235 '_get_nfs_share',
1236 mock.Mock(return_value={
1237 "path": "/manila_fakeid",
1238 "clients": ["client"],
1239 "protocol": "NFS"
1240 }))
1241 mock_gcs = self.mock_object(rest_helper.RestHelper,
1242 '_get_cifs_share',
1243 mock.Mock(return_value={
1244 "path": "fake_path",
1245 "cifsname": "fake_cifsname",
1246 "protocol": "CIFS",
1247 "roList": ["fake_ro"],
1248 "rwList": ["fake_rw"],
1249 "allowList": ["fake_allow"],
1250 "denyList": ["fake_deny"],
1251 }))
1252 expect_nfs = {
1253 "path": "/manila_fakeid",
1254 "clients": ["client"],
1255 "protocol": "NFS"}
1256 expect_cifs = {
1257 "path": "fake_path",
1258 "cifsname": "fake_cifsname",
1259 "protocol": "CIFS",
1260 "roList": ["fake_ro"],
1261 "rwList": ["fake_rw"],
1262 "allowList": ["fake_allow"],
1263 "denyList": ["fake_deny"]}
1264 result = self.driver.helper._get_share('fake_path', proto)
1265 if proto == 'NFS':
1266 mock_gns.assert_called_once_with('fake_path')
1267 self.assertEqual(expect_nfs, result)
1268 elif proto == 'CIFS': 1268 ↛ exitline 1268 didn't return from function 'test__get_share' because the condition on line 1268 was always true
1269 mock_gcs.assert_called_once_with('fake_path')
1270 self.assertEqual(expect_cifs, result)
1272 def test__find_pool_info(self):
1273 pool_info = self.driver.helper._find_pool_info(
1274 'fake_pool',
1275 self.result_success_storage_pools)
1276 self.assertIsNotNone(pool_info)
1278 def test__find_pool_info_fail(self):
1279 pool_info = self.driver.helper._find_pool_info(
1280 'error_pool',
1281 self.result_success_storage_pools)
1282 expect = {}
1283 self.assertEqual(expect, pool_info)
1286@ddt.ddt
1287class RestHelperTestCase(test.TestCase):
1289 def setUp(self):
1290 self.mock_object(CONF, '_check_required_opts')
1291 super(RestHelperTestCase, self).setUp()
1293 def _safe_get(opt):
1294 return getattr(self.configuration, opt)
1296 self.configuration = mock.Mock(spec=configuration.Configuration)
1297 self.configuration.safe_get = mock.Mock(side_effect=_safe_get)
1298 self.configuration.macrosan_nas_http_protocol = 'https'
1299 self.configuration.macrosan_nas_ip = 'fake_ip'
1300 self.configuration.macrosan_nas_port = 'fake_port'
1301 self.configuration.macrosan_nas_prefix = 'nas'
1302 self.configuration.macrosan_nas_username = 'fake_username'
1303 self.configuration.macrosan_nas_password = 'fake_password'
1304 self.configuration.macrosan_timeout = 60
1305 self.configuration.macrosan_ssl_cert_verify = False
1306 self.resthelper = rest_helper.RestHelper(
1307 configuration=self.configuration)
1308 self.post = 'POST'
1309 self.get = 'GET'
1310 self.delete = 'DELETE'
1311 self.put = 'PUT'
1312 self.fake_message = 'fake_message'
1313 self.result_success = {
1314 'code': 0,
1315 'message': 'success',
1316 'data': 'fake_data'
1317 }
1318 self.result_success_return_0 = {
1319 'code': 0,
1320 'message': 'success',
1321 'data': '0'
1322 }
1323 self.result_success_return_1 = {
1324 'code': 0,
1325 'message': 'success',
1326 'data': '1'
1327 }
1328 self.result_failed = {
1329 'code': 1,
1330 'message': 'failed',
1331 'data': 'fake_data'
1332 }
1333 self.result_failed_not_exist = {
1334 'code': constants.CODE_SOURCE_NOT_EXIST,
1335 'message': 'failed',
1336 'data': '',
1337 }
1338 self.result_success_storage_pools = {
1339 'code': 0,
1340 'message': 'success',
1341 'data': [{
1342 'name': 'fake_pool',
1343 'size': '1000.0G',
1344 'allocated': '100G',
1345 'free': '900G',
1346 'health': 'ONLINE',
1347 'rwStatus': 'off'
1348 }]
1349 }
1351 @ddt.data(
1352 {'url': 'fake_url', 'data': {'fake_data': 'fake_value'},
1353 'method': 'POST'},
1354 {'url': 'fake_url', 'data': None,
1355 'method': 'GET'},
1356 {'url': 'fake_url', 'data': {'fake_data': 'fake_value'},
1357 'method': 'DELETE'},
1358 {'url': 'fake_url', 'data': {'fake_data': 'fake_value'},
1359 'method': 'PUT'},
1360 )
1361 @ddt.unpack
1362 def test_call(self, url, data, method):
1363 self.resthelper._token = 'fake_token'
1364 request_method = method.lower()
1365 fake_response = FakeResponse(200, self.result_success)
1366 mock_request = self.mock_object(requests, request_method,
1367 mock.Mock(return_value=fake_response))
1368 self.resthelper.call(url, data, method)
1369 expected_url = ('https://%(ip)s:%(port)s/%(rest)s/%(url)s'
1370 % {'ip': 'fake_ip',
1371 'port': 'fake_port',
1372 'rest': 'nas',
1373 'url': 'fake_url'})
1374 header = {'Authorization': 'fake_token'}
1375 mock_request.assert_called_once_with(
1376 expected_url, data=data, headers=header,
1377 timeout=self.configuration.macrosan_timeout,
1378 verify=False)
1380 def test_call_method_fail(self):
1381 self.resthelper._token = 'fake_token'
1382 self.assertRaises(exception.ShareBackendException,
1383 self.resthelper.call,
1384 'fake_url',
1385 'fake_data',
1386 'error_method')
1388 def test_call_token_fail(self):
1389 self.resthelper._token = 'fake_token'
1390 fake_result_fail = {
1391 'code': 302,
1392 'message': 'fake_message',
1393 'data': 'fake_data'
1394 }
1395 self.mock_object(self.resthelper, 'do_request',
1396 mock.Mock(return_value=fake_result_fail))
1397 self.assertRaises(exception.MacrosanBackendExeption,
1398 self.resthelper.call,
1399 'fake_url',
1400 'fake_data',
1401 self.post)
1403 def test_call_token_none(self):
1404 self.resthelper._token = None
1405 self.mock_object(self.resthelper, 'do_request',
1406 mock.Mock(return_value=self.result_success))
1407 mock_l = self.mock_object(self.resthelper, 'login',
1408 mock.Mock(return_value='fake_token'))
1409 self.resthelper.call('fake_url', 'fake_data', self.post)
1410 mock_l.assert_called_once()
1412 def test_call_token_expired(self):
1413 self.resthelper._token = 'fake_token'
1414 fake_result = {
1415 'code': 301,
1416 'message': 'token expired',
1417 'data': 'fake_data'
1418 }
1419 self.mock_object(
1420 self.resthelper, 'do_request',
1421 mock.Mock(side_effect=[fake_result, self.result_success]))
1422 mock_l = self.mock_object(self.resthelper, 'login',
1423 mock.Mock(return_value='fake_token'))
1424 self.resthelper.call('fake_url', 'fake_data', self.post)
1425 mock_l.assert_called_once()
1427 def test_call_fail(self):
1428 self.resthelper._token = 'fake_token'
1429 fake_response = FakeResponse(302, self.result_success)
1430 self.mock_object(requests, 'post',
1431 mock.Mock(return_value=fake_response))
1432 self.assertRaises(exception.NetworkException,
1433 self.resthelper.call,
1434 'fake_url',
1435 'fake_data',
1436 self.post)
1438 def test_login(self):
1439 fake_result = {
1440 'code': 0,
1441 'message': 'Login success',
1442 'data': 'fake_token'
1443 }
1444 mock_rd = self.mock_object(self.resthelper, 'do_request',
1445 mock.Mock(return_value=fake_result))
1446 self.resthelper.login()
1447 login_data = {'userName': self.configuration.macrosan_nas_username,
1448 'userPasswd': self.configuration.macrosan_nas_password}
1449 mock_rd.assert_called_once_with('rest/token', login_data,
1450 self.post)
1451 self.assertEqual('fake_token', self.resthelper._token)
1453 def test_login_fail(self):
1454 mock_rd = self.mock_object(self.resthelper, 'do_request',
1455 mock.Mock(return_value=self.result_failed))
1457 self.assertRaises(exception.ShareBackendException,
1458 self.resthelper.login)
1459 login_data = {'userName': self.configuration.macrosan_nas_username,
1460 'userPasswd': self.configuration.macrosan_nas_password}
1461 mock_rd.assert_called_once_with('rest/token', login_data,
1462 self.post)
1464 def test__assert_result_code(self):
1465 self.resthelper._assert_result_code(self.result_success,
1466 self.fake_message)
1468 def test__assert_result_code_fail(self):
1469 self.assertRaises(exception.ShareBackendException,
1470 self.resthelper._assert_result_code,
1471 self.result_failed,
1472 self.fake_message)
1474 def test__assert_result_data(self):
1475 self.resthelper._assert_result_data(self.result_success,
1476 self.fake_message)
1478 def test__assert_result_data_fail(self):
1479 fake_result = {
1480 'code': 0,
1481 'message': 'fake_message'
1482 }
1483 self.assertRaises(exception.ShareBackendException,
1484 self.resthelper._assert_result_data,
1485 fake_result,
1486 self.fake_message)
1488 def test__create_nfs_share(self):
1489 mock_call = self.mock_object(self.resthelper,
1490 'call')
1491 self.mock_object(self.resthelper,
1492 '_assert_result_code')
1493 self.resthelper._create_nfs_share('fake_path')
1494 url = 'rest/nfsShare'
1495 data = {
1496 'path': 'fake_path',
1497 'authority': 'ro',
1498 'accessClient': '192.0.2.0',
1499 }
1500 mock_call.assert_called_once_with(url, data, self.post)
1502 def test__get_nfs_share(self):
1503 fake_result = {
1504 'code': 0,
1505 'message': 'success',
1506 'data': {
1507 "path": "fake_path",
1508 "clients": ["client"],
1509 "protocol": "fake_protocol"
1510 }
1511 }
1512 mock_call = self.mock_object(self.resthelper,
1513 'call',
1514 mock.Mock(return_value=fake_result))
1515 self.mock_object(self.resthelper,
1516 '_assert_result_code')
1517 result = self.resthelper._get_nfs_share('fake_path')
1518 expect = {
1519 "path": "fake_path",
1520 "clients": ["client"],
1521 "protocol": "fake_protocol"
1522 }
1523 self.assertEqual(expect, result)
1524 url = 'rest/nfsShare?path=fake_path'
1525 mock_call.assert_called_once_with(url, None, self.get)
1527 def test__delete_nfs_share(self):
1528 mock_call = self.mock_object(self.resthelper,
1529 'call')
1530 self.mock_object(self.resthelper,
1531 '_assert_result_code')
1532 self.resthelper._delete_nfs_share('fake_path')
1533 url = 'rest/nfsShare?path=fake_path'
1534 mock_call.assert_called_once_with(url, None, self.delete)
1536 def test__create_cifs_share(self):
1537 mock_call = self.mock_object(self.resthelper,
1538 'call')
1539 self.mock_object(self.resthelper,
1540 '_assert_result_code')
1541 self.resthelper._create_cifs_share('fake_name',
1542 'fake_path',
1543 ['fake_user'],
1544 ['0'])
1545 url = 'rest/cifsShare'
1546 data = {
1547 'path': 'fake_path',
1548 'cifsName': 'fake_name',
1549 'cifsDescription': '',
1550 'RoList': [],
1551 'RoListType': [],
1552 'RwList': ['fake_user'],
1553 'RwListType': ['0'],
1554 'allowList': [],
1555 'denyList': [],
1556 }
1557 mock_call.assert_called_once_with(url, data, self.post)
1559 def test__get_cifs_share(self):
1560 fake_result = {
1561 'code': 0,
1562 'message': 'success',
1563 'data': {
1564 "path": "fake_path",
1565 "cifsname": "fake_cifsname",
1566 "protocol": "fake_protocol",
1567 "roList": ["fake_ro"],
1568 "rwList": ["fake_rw"],
1569 "allowList": ["fake_allow"],
1570 "denyList": ["fake_deny"]
1571 }
1572 }
1573 mock_call = self.mock_object(self.resthelper,
1574 'call',
1575 mock.Mock(return_value=fake_result))
1576 self.mock_object(self.resthelper,
1577 '_assert_result_code')
1578 result = self.resthelper._get_cifs_share('fake_path')
1579 expect = {
1580 "path": "fake_path",
1581 "cifsname": "fake_cifsname",
1582 "protocol": "fake_protocol",
1583 "roList": ["fake_ro"],
1584 "rwList": ["fake_rw"],
1585 "allowList": ["fake_allow"],
1586 "denyList": ["fake_deny"]
1587 }
1588 self.assertEqual(expect, result)
1589 url = 'rest/cifsShare?path=fake_path'
1590 mock_call.assert_called_once_with(url, None, self.get)
1592 def test__delete_cifs_share(self):
1593 mock_call = self.mock_object(self.resthelper,
1594 'call')
1595 self.mock_object(self.resthelper,
1596 '_assert_result_code')
1597 self.resthelper._delete_cifs_share('fake_name', 'fake_path')
1598 url = 'rest/cifsShare?path=fake_path&cifsName=fake_name'
1599 mock_call.assert_called_once_with(url, None, self.delete)
1601 def test__update_share_size(self):
1602 mock_call = self.mock_object(self.resthelper,
1603 'call')
1604 self.mock_object(self.resthelper,
1605 '_assert_result_code')
1606 self.resthelper._update_share_size('fake_filesystem', '2GB')
1607 url = 'rest/filesystem/fake_filesystem'
1608 data = {
1609 'capacity': '2GB',
1610 }
1611 mock_call.assert_called_once_with(url, data, self.put)
1613 def test___create_filesystem(self):
1614 mock_call = self.mock_object(self.resthelper,
1615 'call')
1616 self.mock_object(self.resthelper,
1617 '_assert_result_code')
1618 self.resthelper._create_filesystem('fake_filesystem',
1619 'fake_pool',
1620 '1GB')
1621 url = 'rest/filesystem'
1622 data = {
1623 'fsName': 'fake_filesystem',
1624 'poolName': 'fake_pool',
1625 'createType': '0',
1626 'fileSystemQuota': '1GB',
1627 'fileSystemReserve': '1GB',
1628 'wormStatus': 0,
1629 'defaultTimeStatus': 0,
1630 'defaultTimeNum': 0,
1631 'defaultTimeUnit': 'year',
1632 'isAutoLock': 0,
1633 'isAutoDelete': 0,
1634 'lockTime': 0
1635 }
1636 mock_call.assert_called_once_with(url, data, self.post)
1638 def test__delete_filesystem(self):
1639 mock_call = self.mock_object(self.resthelper,
1640 'call')
1641 self.mock_object(self.resthelper,
1642 '_assert_result_code')
1643 self.resthelper._delete_filesystem('fake_filesystem')
1644 url = 'rest/filesystem/fake_filesystem'
1645 mock_call.assert_called_once_with(url, None, self.delete)
1647 def test__get_filesystem(self):
1648 fake_result = {
1649 'code': 0,
1650 'message': 'success',
1651 'data': {
1652 'name': 'fake_filesystem',
1653 'poolName': 'fake_pool',
1654 }
1655 }
1656 mock_call = self.mock_object(self.resthelper,
1657 'call',
1658 mock.Mock(return_value=fake_result))
1659 self.mock_object(self.resthelper,
1660 '_assert_result_code')
1661 result = self.resthelper._get_filesystem('fake_filesystem')
1662 expect = {
1663 'name': 'fake_filesystem',
1664 'poolName': 'fake_pool',
1665 }
1666 self.assertEqual(expect, result)
1667 url = 'rest/filesystem/fake_filesystem'
1668 mock_call.assert_called_once_with(url, None, self.get)
1670 def test__create_filesystem_dir(self):
1671 mock_call = self.mock_object(self.resthelper,
1672 'call')
1673 self.mock_object(self.resthelper,
1674 '_assert_result_code')
1675 self.resthelper._create_filesystem_dir('/fake_path/fake_dir')
1676 url = 'rest/fileDir'
1677 data = {
1678 'path': '/fake_path',
1679 'dirName': 'fake_dir',
1680 }
1681 mock_call.assert_called_once_with(url, data, self.post)
1683 def test__delete_filesystem_dir(self):
1684 mock_call = self.mock_object(self.resthelper,
1685 'call')
1686 self.mock_object(self.resthelper,
1687 '_assert_result_code')
1688 self.resthelper._delete_filesystem_dir('/fake_path/fake_dir')
1689 url = 'rest/fileDir?path=/fake_path&dirName=fake_dir'
1690 mock_call.assert_called_once_with(url, None, self.delete)
1692 @ddt.data('nfs', 'cifs')
1693 def test__allow_access_rest(self, share_proto):
1694 share_proto = share_proto.upper()
1695 mock_anar = self.mock_object(self.resthelper,
1696 '_allow_nfs_access_rest')
1697 mock_acar = self.mock_object(self.resthelper,
1698 '_allow_cifs_access_rest')
1699 self.resthelper._allow_access_rest('fake_path', 'fake_access',
1700 'rw', share_proto)
1701 if share_proto == 'NFS':
1702 mock_anar.assert_called_once_with('fake_path',
1703 'fake_access',
1704 'rw')
1705 elif share_proto == 'CIFS': 1705 ↛ exitline 1705 didn't return from function 'test__allow_access_rest' because the condition on line 1705 was always true
1706 mock_acar.assert_called_once_with('fake_path',
1707 'fake_access',
1708 'rw')
1710 def test__allow_access_rest_proto_error(self):
1711 self.assertRaises(exception.InvalidInput,
1712 self.resthelper._allow_access_rest,
1713 'fake_path',
1714 'fake_access',
1715 'rw',
1716 'error_proto')
1718 def test__allow_nfs_access_rest(self):
1719 mock_call = self.mock_object(
1720 self.resthelper,
1721 'call',
1722 mock.Mock(return_value=self.result_success))
1723 self.mock_object(self.resthelper,
1724 '_assert_result_code')
1725 self.resthelper._allow_nfs_access_rest('fake_path', '172.0.0.1', 'rw')
1726 url = 'rest/nfsShareClient'
1727 data = {
1728 'path': 'fake_path',
1729 'client': '172.0.0.1',
1730 'authority': 'rw',
1731 }
1732 mock_call.assert_called_once_with(url, data, self.post)
1734 @ddt.data(
1735 {'access_to': 'fake_user',
1736 'group': False},
1737 {'access_to': 'fake_group',
1738 'group': True},
1739 {'access_to': '/fake_user',
1740 'group': False},
1741 {'access_to': '/fake_group',
1742 'group': True}
1743 )
1744 @ddt.unpack
1745 def test__allow_cifs_access_rest(self, access_to, group):
1746 ug_type_list = {
1747 'localUser': '0',
1748 'localGroup': '1',
1749 'adUser': '2',
1750 'adGroup': '3',
1751 }
1752 if not group:
1753 mock_call = self.mock_object(
1754 self.resthelper,
1755 'call',
1756 mock.Mock(return_value=self.result_success))
1757 else:
1758 mock_call = self.mock_object(
1759 self.resthelper,
1760 'call',
1761 mock.Mock(side_effect=[self.result_failed_not_exist,
1762 self.result_success]))
1763 self.mock_object(self.resthelper,
1764 '_assert_result_code')
1765 self.resthelper._allow_cifs_access_rest('fake_path',
1766 access_to,
1767 'rw')
1768 url = 'rest/cifsShareClient'
1769 actual_type = ug_type_list["localUser"]
1770 if '/' not in access_to:
1771 if not group:
1772 actual_type = ug_type_list["localUser"]
1773 access_to = access_to
1774 else:
1775 if not group:
1776 actual_type = ug_type_list["adUser"]
1777 access_to = access_to[access_to.index('/') + 1:]
1778 data = {
1779 'path': 'fake_path',
1780 'right': 'rw',
1781 'ugName': access_to,
1782 'ugType': actual_type,
1783 }
1784 if not group:
1785 mock_call.assert_called_once_with(url, data, self.post)
1786 else:
1787 mock_call.assert_called()
1789 def test__allow_cifs_access_rest_fail(self):
1790 mock_call = self.mock_object(
1791 self.resthelper,
1792 'call',
1793 mock.Mock(side_effect=[self.result_failed_not_exist,
1794 self.result_failed_not_exist]))
1795 self.assertRaises(exception.InvalidShare,
1796 self.resthelper._allow_cifs_access_rest,
1797 'fake_path',
1798 'fake_user',
1799 'rw')
1800 mock_call.assert_called()
1802 def test__get_access_from_nfs_share(self):
1803 fake_result = {
1804 'code': 0,
1805 'message': 'success',
1806 'data': {
1807 "path": "fake_path",
1808 "clientName": "fake_client",
1809 "accessRight": "rw",
1810 }
1811 }
1812 mock_call = self.mock_object(self.resthelper,
1813 'call',
1814 mock.Mock(return_value=fake_result))
1815 self.mock_object(self.resthelper,
1816 '_assert_result_code')
1817 result = self.resthelper._get_access_from_nfs_share('fake_path',
1818 'fake_client')
1819 expect = {
1820 "path": "fake_path",
1821 "clientName": "fake_client",
1822 "accessRight": "rw",
1823 }
1824 self.assertEqual(expect, result)
1825 url = 'rest/nfsShareClient?path=fake_path&client=fake_client'
1826 mock_call.assert_called_once_with(url, None, self.get)
1828 @ddt.data({'access_to': 'fake_user',
1829 'ug_type': '0',
1830 'code': 0,
1831 'group': False},
1832 {'access_to': 'fake_user',
1833 'ug_type': None,
1834 'code': 0,
1835 'group': False},
1836 {'access_to': 'fake_group',
1837 'ug_type': None,
1838 'code': 0,
1839 'group': True},
1840 {'access_to': 'fake_user',
1841 'ug_type': None,
1842 'code': 4,
1843 'group': False},
1844 {'access_to': '/fake_user',
1845 'ug_type': None,
1846 'code': 0,
1847 'group': False},
1848 {'access_to': '/fake_group',
1849 'ug_type': None,
1850 'code': 0,
1851 'group': True},
1852 {'access_to': '/fake_user',
1853 'ug_type': None,
1854 'code': 4,
1855 'group': False})
1856 @ddt.unpack
1857 def test__get_access_from_cifs_share(self,
1858 access_to, ug_type, code, group):
1859 fake_result_failed = {
1860 'code': code,
1861 'message': 'failed',
1862 'data': {}
1863 }
1864 fake_result = {
1865 'code': code,
1866 'message': 'success',
1867 'data': {
1868 'path': 'fake_path',
1869 'ugName': 'fake_user',
1870 'ugType': '0',
1871 'accessRight': 'rw'
1872 }
1873 }
1874 fake_result_group = {
1875 'code': code,
1876 'message': 'success',
1877 'data': {
1878 'path': 'fake_path',
1879 'ugName': 'fake_group',
1880 'ugType': '1',
1881 'accessRight': 'rw'
1882 }
1883 }
1884 if code == 4:
1885 fake_result = fake_result_failed
1886 ug_type_list = {
1887 'localUser': '0',
1888 'localGroup': '1',
1889 'adUser': '2',
1890 'adGroup': '3',
1891 }
1892 expect = {
1893 'path': 'fake_path',
1894 'ugName': 'fake_user',
1895 'ugType': '0',
1896 'accessRight': 'rw'
1897 }
1898 expect_group = {
1899 'path': 'fake_path',
1900 'ugName': 'fake_group',
1901 'ugType': '1',
1902 'accessRight': 'rw'
1903 }
1904 if '/' in access_to:
1905 expect['ugType'] = '2'
1906 expect_group['ugType'] = '3'
1907 fake_result['data']['ugType'] = '2'
1908 fake_result_group['data']['ugType'] = '3'
1909 if ug_type is not None:
1910 mock_call = self.mock_object(self.resthelper,
1911 'call',
1912 mock.Mock(return_value=fake_result))
1913 else:
1914 if not group:
1915 mock_call = self.mock_object(
1916 self.resthelper,
1917 'call',
1918 mock.Mock(return_value=fake_result))
1919 else:
1920 mock_call = self.mock_object(
1921 self.resthelper,
1922 'call',
1923 mock.Mock(side_effect=[fake_result_failed,
1924 fake_result_group]))
1926 self.mock_object(self.resthelper,
1927 '_assert_result_code')
1928 result = self.resthelper._get_access_from_cifs_share('fake_path',
1929 access_to,
1930 ug_type)
1931 if ug_type:
1932 self.assertEqual(expect, result)
1933 url = f'rest/cifsShareClient?path=fake_path&' \
1934 f'ugName={access_to}&ugType={ug_type}'
1935 mock_call.assert_called_once_with(url, None, self.get)
1936 else:
1937 if '/' not in access_to:
1938 if not group:
1939 actual_type = ug_type_list["localUser"]
1940 actual_access = access_to
1941 else:
1942 if not group:
1943 actual_type = ug_type_list["adUser"]
1944 actual_access = access_to[access_to.index('/') + 1:]
1945 if code == 4:
1946 self.assertIsNone(result)
1947 else:
1948 if not group:
1949 self.assertEqual(expect, result)
1950 url = f'rest/cifsShareClient?path=fake_path&' \
1951 f'ugName={actual_access}&' \
1952 f'ugType={actual_type}'
1953 mock_call.assert_called_once_with(url, None, self.get)
1954 else:
1955 self.assertEqual(expect_group, result)
1956 mock_call.assert_called()
1958 def test__get_all_nfs_access_rest(self):
1959 fake_result = {
1960 'code': 0,
1961 'message': 'success',
1962 'data': [
1963 {
1964 'path': 'fake_path',
1965 'clientName': '172.0.0.1',
1966 'accessRight': 'rw'
1967 },
1968 {
1969 'path': 'default_path',
1970 'clientName': '172.0.0.2',
1971 'accessRight': 'rw'
1972 }]
1973 }
1974 mock_call = self.mock_object(self.resthelper,
1975 'call',
1976 mock.Mock(return_value=fake_result))
1977 self.mock_object(self.resthelper,
1978 '_assert_result_code')
1979 result = self.resthelper._get_all_nfs_access_rest(
1980 '/manila_fakeid/manila_fakeid')
1981 expect = [
1982 {
1983 'share_path': 'fake_path',
1984 'access_to': '172.0.0.1',
1985 'access_level': 'rw'
1986 },
1987 {
1988 'share_path': 'default_path',
1989 'access_to': '172.0.0.2',
1990 'access_level': 'rw'
1991 }]
1992 self.assertEqual(expect, result)
1993 url = 'rest/allNfsShareClient?path=/manila_fakeid/manila_fakeid'
1994 mock_call.assert_called_once_with(url, None, self.get)
1996 def test__get_all_cifs_access_rest(self):
1997 fake_result = {
1998 'code': 0,
1999 'message': 'success',
2000 'data': [
2001 {
2002 'path': 'fake_path',
2003 'ugName': 'user_name',
2004 'ugType': '0',
2005 'accessRight': 'rw'
2006 },
2007 {
2008 'path': 'default_path',
2009 'ugName': 'manilanobody',
2010 'ugType': '0',
2011 'accessRight': 'rw'
2012 }]
2013 }
2014 mock_call = self.mock_object(self.resthelper,
2015 'call',
2016 mock.Mock(return_value=fake_result))
2017 self.mock_object(self.resthelper,
2018 '_assert_result_code')
2019 result = self.resthelper._get_all_cifs_access_rest(
2020 '/manila_fakeid/manila_fakeid')
2021 expect = [
2022 {
2023 'share_path': 'fake_path',
2024 'access_to': 'user_name',
2025 'ugType': '0',
2026 'access_level': 'rw'
2027 },
2028 {
2029 'share_path': 'default_path',
2030 'access_to': 'manilanobody',
2031 'ugType': '0',
2032 'access_level': 'rw'
2033 }]
2034 self.assertEqual(expect, result)
2035 url = 'rest/allCifsShareClient?path=/manila_fakeid/manila_fakeid'
2036 mock_call.assert_called_once_with(url, None, self.get)
2038 def test__change_nfs_access_rest(self):
2039 mock_call = self.mock_object(self.resthelper,
2040 'call')
2041 self.mock_object(self.resthelper,
2042 '_assert_result_code')
2043 self.resthelper._change_nfs_access_rest(
2044 '/manila_fakeid/manila_fakeid', '172.0.0.1', 'rw')
2045 url = 'rest/nfsShareClient'
2046 data = {
2047 'path': '/manila_fakeid/manila_fakeid',
2048 'oldNfsClientName': '172.0.0.1',
2049 'clientName': '',
2050 'accessRight': 'rw',
2051 'allSquash': '',
2052 'rootSquash': '',
2053 'secure': '',
2054 'anonuid': '',
2055 'anongid': '',
2056 }
2057 mock_call.assert_called_once_with(url, data, self.put)
2059 def test__change_cifs_access_rest(self):
2060 mock_call = self.mock_object(self.resthelper,
2061 'call')
2062 self.mock_object(self.resthelper,
2063 '_assert_result_code')
2064 self.resthelper._change_cifs_access_rest(
2065 '/manila_fakeid/manila_fakeid', '/fake_user', 'rw', '0')
2066 url = 'rest/cifsShareClient'
2067 data = {
2068 'path': '/manila_fakeid/manila_fakeid',
2069 'right': 'rw',
2070 'ugName': 'fake_user',
2071 'ugType': '0',
2072 }
2073 mock_call.assert_called_once_with(url, data, self.put)
2075 def test__delete_nfs_access_rest(self):
2076 mock_call = self.mock_object(self.resthelper,
2077 'call')
2078 self.mock_object(self.resthelper,
2079 '_assert_result_code')
2080 self.resthelper._delete_nfs_access_rest(
2081 '/manila_fakeid/manila_fakeid', '*')
2082 url = 'rest/nfsShareClient?path=/manila_fakeid/manila_fakeid&client=*'
2083 mock_call.assert_called_once_with(url, None, self.delete)
2085 def test__delete_cifs_access_rest(self):
2086 mock_call = self.mock_object(self.resthelper,
2087 'call')
2088 self.mock_object(self.resthelper,
2089 '_assert_result_code')
2090 self.resthelper._delete_cifs_access_rest(
2091 '/manila_fakeid/manila_fakeid', 'fake_user', '0')
2092 url = 'rest/cifsShareClient?path=/manila_fakeid/manila_fakeid' \
2093 '&ugName=fake_user&ugType=0'
2094 mock_call.assert_called_once_with(url, None, self.delete)
2096 def test__get_nfs_service_status(self):
2097 fake_result = {
2098 'code': 0,
2099 'message': 'success',
2100 'data': {
2101 'serviceStatus': constants.NFS_ENABLED,
2102 'nfs3Status': constants.NFS_SUPPORTED,
2103 'nfs4Status': constants.NFS_SUPPORTED
2104 }
2105 }
2106 mock_call = self.mock_object(self.resthelper,
2107 'call',
2108 mock.Mock(return_value=fake_result))
2109 self.mock_object(self.resthelper,
2110 '_assert_result_code')
2111 result = self.resthelper._get_nfs_service_status()
2112 expect = {
2113 'serviceStatus': constants.NFS_ENABLED,
2114 'nfs3Status': constants.NFS_SUPPORTED,
2115 'nfs4Status': constants.NFS_SUPPORTED
2116 }
2117 self.assertEqual(expect, result)
2118 url = 'rest/nfsService'
2119 mock_call.assert_called_once_with(url, None, self.get)
2121 def test__start_nfs_service(self):
2122 mock_call = self.mock_object(self.resthelper,
2123 'call')
2124 self.mock_object(self.resthelper,
2125 '_assert_result_code')
2126 self.resthelper._start_nfs_service()
2127 url = 'rest/nfsService'
2128 data = {
2129 "openStatus": "1",
2130 }
2131 mock_call.assert_called_once_with(url, data, self.put)
2133 def test__config_nfs_service(self):
2134 mock_call = self.mock_object(self.resthelper,
2135 'call')
2136 self.mock_object(self.resthelper,
2137 '_assert_result_code')
2138 self.resthelper._config_nfs_service()
2139 url = 'rest/nfsConfig'
2140 data = {
2141 'configNfs3': "yes",
2142 'configNfs4': "yes",
2143 }
2144 mock_call.assert_called_once_with(url, data, self.put)
2146 def test__get_cifs_service_status(self):
2147 mock_call = self.mock_object(
2148 self.resthelper,
2149 'call',
2150 mock.Mock(return_value=self.result_success_return_1))
2151 self.mock_object(self.resthelper,
2152 '_assert_result_code')
2153 result = self.resthelper._get_cifs_service_status()
2154 self.assertEqual('1', result)
2155 url = 'rest/cifsService'
2156 mock_call.assert_called_once_with(url, None, self.get)
2158 def test__start_cifs_service(self):
2159 mock_call = self.mock_object(self.resthelper,
2160 'call')
2161 self.mock_object(self.resthelper,
2162 '_assert_result_code')
2163 self.resthelper._start_cifs_service()
2164 url = 'rest/cifsService'
2165 data = {
2166 'openStatus': '1',
2167 }
2168 mock_call.assert_called_once_with(url, data, self.put)
2170 def test__config_cifs_service(self):
2171 mock_call = self.mock_object(self.resthelper,
2172 'call')
2173 self.mock_object(self.resthelper,
2174 '_assert_result_code')
2175 self.resthelper._config_cifs_service()
2176 url = 'rest/cifsConfig'
2177 data = {
2178 'workName': 'manila',
2179 'description': '',
2180 'access_way': 'user',
2181 'isCache': 'no',
2182 'adsName': '',
2183 'adsIP': '',
2184 'adsUSER': '',
2185 'adsPASSWD': '',
2186 'allowList': [],
2187 'denyList': [],
2188 }
2189 mock_call.assert_called_once_with(url, data, self.put)
2191 def test__get_all_pool(self):
2192 mock_call = self.mock_object(
2193 self.resthelper,
2194 'call',
2195 mock.Mock(return_value=self.result_success_storage_pools))
2196 self.mock_object(self.resthelper,
2197 '_assert_result_code')
2198 result = self.resthelper._get_all_pool()
2199 self.assertEqual(self.result_success_storage_pools, result)
2200 url = 'rest/storagepool'
2201 mock_call.assert_called_once_with(url, None, self.get)
2203 def test__query_user(self):
2204 mock_call = self.mock_object(
2205 self.resthelper,
2206 'call',
2207 mock.Mock(return_value=self.result_success_return_0))
2208 self.mock_object(self.resthelper,
2209 '_assert_result_code')
2210 result = self.resthelper._query_user('fake_user')
2211 self.assertEqual('0', result)
2212 url = 'rest/user/fake_user'
2213 mock_call.assert_called_once_with(url, None, self.get)
2215 def test__add_localuser(self):
2216 mock_call = self.mock_object(self.resthelper,
2217 'call')
2218 self.mock_object(self.resthelper,
2219 '_assert_result_code')
2220 self.resthelper._add_localuser('fake_user',
2221 'fake_passwd', 'fake_group')
2222 url = 'rest/localUser'
2223 data = {
2224 'userName': 'fake_user',
2225 'mgGroup': 'fake_group',
2226 'userPasswd': 'fake_passwd',
2227 'unusedGroup': []}
2228 mock_call.assert_called_once_with(url, data, self.post)
2230 def test__query_group(self):
2231 mock_call = self.mock_object(
2232 self.resthelper,
2233 'call',
2234 mock.Mock(return_value=self.result_success_return_0))
2235 self.mock_object(self.resthelper,
2236 '_assert_result_code')
2237 result = self.resthelper._query_group('fake_group')
2238 self.assertEqual('0', result)
2239 url = 'rest/group/fake_group'
2240 mock_call.assert_called_once_with(url, None, self.get)
2242 def test__add_localgroup(self):
2243 mock_call = self.mock_object(self.resthelper,
2244 'call')
2245 self.mock_object(self.resthelper,
2246 '_assert_result_code')
2247 self.resthelper._add_localgroup('fake_group')
2248 url = 'rest/localGroup'
2249 data = {'groupName': 'fake_group'}
2250 mock_call.assert_called_once_with(url, data, self.post)