Coverage for manila/tests/share/drivers/dell_emc/plugins/unity/test_connection.py: 100%
583 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2016 EMC Corporation.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import copy
17import ddt
19from manila import exception
20from manila.share.drivers.dell_emc.common.enas import utils as enas_utils
21from manila import test
22from manila.tests.share.drivers.dell_emc.plugins.unity import fake_exceptions
23from manila.tests.share.drivers.dell_emc.plugins.unity import res_mock
24from manila.tests.share.drivers.dell_emc.plugins.unity import utils
25from oslo_utils import units
26from unittest import mock
29@ddt.ddt
30class TestConnection(test.TestCase):
31 client = None
33 @classmethod
34 def setUpClass(cls):
35 cls.emc_share_driver = res_mock.FakeEMCShareDriver()
37 @res_mock.patch_connection_init
38 def test_connect(self, connection):
39 connection.connect(res_mock.FakeEMCShareDriver(dhss=True), None)
41 @res_mock.patch_connection_init
42 def test_connect_with_ipv6(self, connection):
43 connection.connect(res_mock.FakeEMCShareDriverIPv6(
44 dhss=True), None)
46 @res_mock.patch_connection
47 def test_connect__invalid_pool_configuration(self, connection):
48 f = connection.client.system.get_pool
49 f.side_effect = fake_exceptions.UnityResourceNotFoundError()
51 self.assertRaises(exception.BadConfigurationException,
52 connection._config_pool,
53 'faked_pool_name')
55 @res_mock.mock_manila_input
56 @res_mock.patch_connection
57 def test_create_nfs_share(self, connection, mocked_input):
58 share = mocked_input['nfs_share']
59 share_server = mocked_input['share_server']
61 location = connection.create_share(None, share, share_server)
63 exp_location = [
64 {'path': 'fake_ip_addr_1:/cb532599-8dc6-4c3e-bb21-74ea54be566c'},
65 {'path': 'fake_ip_addr_2:/cb532599-8dc6-4c3e-bb21-74ea54be566c'},
66 ]
67 exp_location = sorted(exp_location, key=lambda x: sorted(x['path']))
68 location = sorted(location, key=lambda x: sorted(x['path']))
69 self.assertEqual(exp_location, location)
71 @res_mock.mock_manila_input
72 @res_mock.patch_connection
73 def test_create_cifs_share(self, connection, mocked_input):
74 share = mocked_input['cifs_share']
75 share_server = mocked_input['share_server']
77 location = connection.create_share(None, share, share_server)
79 exp_location = [
80 {'path': r'\\fake_ip_addr_1\716100cc-e0b4-416b-ac27-d38dd019330d'},
81 {'path': r'\\fake_ip_addr_2\716100cc-e0b4-416b-ac27-d38dd019330d'},
82 ]
83 exp_location = sorted(exp_location, key=lambda x: sorted(x['path']))
84 location = sorted(location, key=lambda x: sorted(x['path']))
85 self.assertEqual(exp_location, location)
87 @res_mock.mock_manila_input
88 @res_mock.patch_connection
89 def test_create_share_with_invalid_proto(self, connection, mocked_input):
90 share = mocked_input['invalid_share']
91 share_server = mocked_input['share_server']
93 self.assertRaises(exception.InvalidShare,
94 connection.create_share,
95 None,
96 share,
97 share_server)
99 @res_mock.mock_manila_input
100 @res_mock.patch_connection
101 def test_create_share_without_share_server(self, connection,
102 mocked_input):
103 share = mocked_input['cifs_share']
105 self.assertRaises(exception.InvalidInput,
106 connection.create_share,
107 None,
108 share,
109 None)
111 @res_mock.mock_manila_input
112 @res_mock.patch_connection
113 def test_create_share__no_server_name_in_backend_details(self, connection,
114 mocked_input):
115 share = mocked_input['cifs_share']
116 share_server = {
117 'backend_details': {'share_server_name': None},
118 'id': 'test',
119 'identifier': '',
120 }
122 self.assertRaises(exception.InvalidInput,
123 connection.create_share,
124 None,
125 share,
126 share_server)
128 @res_mock.mock_manila_input
129 @res_mock.patch_connection
130 def test_create_share_with_invalid_share_server(self, connection,
131 mocked_input):
132 share = mocked_input['cifs_share']
133 share_server = mocked_input['share_server']
135 self.assertRaises(exception.EMCUnityError,
136 connection.create_share,
137 None,
138 share,
139 share_server)
141 @res_mock.mock_manila_input
142 @res_mock.patch_connection
143 def test_delete_share(self, connection, mocked_input):
144 share = mocked_input['cifs_share']
145 share_server = mocked_input['share_server']
147 connection.delete_share(None, share, share_server)
149 @res_mock.mock_manila_input
150 @res_mock.patch_connection
151 def test_delete_share__with_invalid_share(self, connection, mocked_input):
152 share = mocked_input['cifs_share']
154 connection.delete_share(None, share, None)
156 @res_mock.mock_manila_input
157 @res_mock.patch_connection
158 def test_delete_share__create_from_snap(self, connection,
159 mocked_input):
160 share = mocked_input['cifs_share']
161 share_server = mocked_input['share_server']
163 connection.delete_share(None, share, share_server)
165 @res_mock.mock_manila_input
166 @res_mock.patch_connection
167 def test_delete_share__create_from_snap_but_not_isolated(self,
168 connection,
169 mocked_input):
170 share = mocked_input['cifs_share']
171 share_server = mocked_input['share_server']
173 connection.delete_share(None, share, share_server)
175 @res_mock.mock_manila_input
176 @res_mock.patch_connection
177 def test_delete_share__but_not_isolated(self, connection,
178 mocked_input):
179 share = mocked_input['cifs_share']
180 share_server = mocked_input['share_server']
182 connection.delete_share(None, share, share_server)
184 @res_mock.mock_manila_input
185 @res_mock.patch_connection
186 def test_shrink_cifs_share(self, connection, mocked_input):
187 share = mocked_input['shrink_cifs_share']
188 new_size = 4 * units.Gi
190 connection.shrink_share(share, new_size)
192 @res_mock.mock_manila_input
193 @res_mock.patch_connection
194 def test_shrink_nfs_share(self, connection, mocked_input):
195 share = mocked_input['shrink_nfs_share']
196 new_size = 4 * units.Gi
198 connection.shrink_share(share, new_size)
200 @res_mock.mock_manila_input
201 @res_mock.patch_connection
202 def test_extend_cifs_share(self, connection, mocked_input):
203 share = mocked_input['cifs_share']
204 share_server = mocked_input['share_server']
205 new_size = 50 * units.Gi
207 connection.extend_share(share, new_size, share_server)
209 @res_mock.mock_manila_input
210 @res_mock.patch_connection
211 def test_extend_nfs_share(self, connection, mocked_input):
212 share = mocked_input['nfs_share']
213 share_server = mocked_input['share_server']
214 new_size = 50 * units.Gi
216 connection.extend_share(share, new_size, share_server)
218 @res_mock.mock_manila_input
219 @res_mock.patch_connection
220 def test_extend_share__create_from_snap(self, connection, mocked_input):
221 share = mocked_input['cifs_share']
222 share_server = mocked_input['share_server']
223 new_size = 50 * units.Gi
225 self.assertRaises(exception.ShareExtendingError,
226 connection.extend_share,
227 share,
228 new_size,
229 share_server)
231 @res_mock.mock_manila_input
232 @res_mock.patch_connection
233 def test_shrink_share_create_from_snap(self, connection, mocked_input):
234 share = mocked_input['shrink_cifs_share']
235 share_server = mocked_input['share_server']
236 new_size = 4 * units.Gi
238 self.assertRaises(exception.ShareShrinkingError,
239 connection.shrink_share,
240 share,
241 new_size,
242 share_server)
244 @res_mock.mock_manila_input
245 @res_mock.patch_connection
246 def test_create_snapshot_from_filesystem(self, connection, mocked_input):
247 snapshot = mocked_input['snapshot']
248 share_server = mocked_input['share_server']
250 result = connection.create_snapshot(None, snapshot, share_server)
251 self.assertEqual('ab411797-b1cf-4035-bf14-8771a7bf1805',
252 result['provider_location'])
254 @res_mock.mock_manila_input
255 @res_mock.patch_connection
256 def test_create_snapshot_from_snapshot(self, connection, mocked_input):
257 snapshot = mocked_input['snapshot']
258 share_server = mocked_input['share_server']
260 connection.create_snapshot(None, snapshot, share_server)
262 @res_mock.mock_manila_input
263 @res_mock.patch_connection
264 def test_delete_snapshot(self, connection, mocked_input):
265 snapshot = mocked_input['snapshot']
266 share_server = mocked_input['share_server']
268 connection.delete_snapshot(None, snapshot, share_server)
270 @res_mock.mock_manila_input
271 @res_mock.patch_connection
272 def test_ensure_share_exists(self, connection, mocked_input):
273 share = mocked_input['cifs_share']
275 connection.ensure_share(None, share, None)
277 @res_mock.mock_manila_input
278 @res_mock.patch_connection
279 def test_ensure_share_not_exists(self, connection, mocked_input):
280 share = mocked_input['cifs_share']
282 self.assertRaises(exception.ShareNotFound,
283 connection.ensure_share,
284 None,
285 share,
286 None)
288 @res_mock.patch_connection
289 def test_update_share_stats(self, connection):
290 stat_dict = copy.deepcopy(res_mock.STATS)
291 connection.update_share_stats(stat_dict)
292 self.assertEqual(5, len(stat_dict))
293 pool = stat_dict['pools'][0]
294 self.assertEqual('pool_1', pool['pool_name'])
295 self.assertEqual(
296 enas_utils.bytes_to_gb(500000.0), pool['total_capacity_gb'])
297 self.assertEqual(False, pool['qos'])
298 self.assertEqual(
299 enas_utils.bytes_to_gb(30000.0), pool['provisioned_capacity_gb'])
300 self.assertEqual(20, pool['max_over_subscription_ratio'])
301 self.assertEqual(
302 enas_utils.bytes_to_gb(10000.0), pool['allocated_capacity_gb'])
303 self.assertEqual(0, pool['reserved_percentage'])
304 self.assertEqual(0, pool['reserved_snapshot_percentage'])
305 self.assertEqual(0, pool['reserved_share_extend_percentage'])
306 self.assertTrue(pool['thin_provisioning'])
307 self.assertEqual(
308 enas_utils.bytes_to_gb(490000.0), pool['free_capacity_gb'])
310 @res_mock.patch_connection
311 def test_update_share_stats__nonexistent_pools(self, connection):
312 stat_dict = copy.deepcopy(res_mock.STATS)
314 self.assertRaises(exception.EMCUnityError,
315 connection.update_share_stats,
316 stat_dict)
318 @res_mock.mock_manila_input
319 @res_mock.patch_connection
320 def test_get_pool(self, connection, mocked_input):
321 share = mocked_input['cifs_share']
323 connection.get_pool(share)
325 @utils.patch_find_ports_by_mtu
326 @res_mock.mock_manila_input
327 @res_mock.patch_connection
328 def test_setup_server(self, connection, mocked_input, find_ports):
329 find_ports.return_value = {'SPA': {'spa_eth1'}}
330 network_info = mocked_input['network_info__flat']
331 server_info = connection.setup_server(network_info)
332 self.assertEqual(
333 {'share_server_name':
334 '78fd845f-8e7d-487f-bfde-051d83e78103'},
335 server_info)
336 self.assertIsNone(connection.client.system.create_nas_server.
337 call_args[1]['tenant'])
339 @utils.patch_find_ports_by_mtu
340 @res_mock.mock_manila_input
341 @res_mock.patch_connection
342 def test_setup_server__vlan_network(self, connection, mocked_input,
343 find_ports):
344 find_ports.return_value = {'SPA': {'spa_eth1'}}
345 network_info = mocked_input['network_info__vlan']
347 connection.setup_server(network_info)
348 self.assertEqual('tenant_1',
349 connection.client.system.create_nas_server
350 .call_args[1]['tenant'].id)
352 @utils.patch_find_ports_by_mtu
353 @res_mock.mock_manila_input
354 @res_mock.patch_connection
355 def test_setup_server__vxlan_network(self, connection, mocked_input,
356 find_ports):
357 find_ports.return_value = {'SPA': {'spa_eth1'}}
358 network_info = mocked_input['network_info__vxlan']
360 self.assertRaises(exception.NetworkBadConfigurationException,
361 connection.setup_server,
362 network_info)
364 @utils.patch_find_ports_by_mtu
365 @res_mock.mock_manila_input
366 @res_mock.patch_connection
367 def test_setup_server__active_directory(self, connection, mocked_input,
368 find_ports):
369 find_ports.return_value = {'SPA': {'spa_eth1'}}
370 network_info = mocked_input['network_info__active_directory']
372 connection.setup_server(network_info)
374 @utils.patch_find_ports_by_mtu
375 @res_mock.mock_manila_input
376 @res_mock.patch_connection
377 def test_setup_server__kerberos(self, connection, mocked_input,
378 find_ports):
379 find_ports.return_value = {'SPA': {'spa_eth1'}}
380 network_info = mocked_input['network_info__kerberos']
382 connection.setup_server(network_info)
384 @utils.patch_find_ports_by_mtu
385 @res_mock.mock_manila_input
386 @res_mock.patch_connection
387 def test_setup_server__throw_exception(self, connection, mocked_input,
388 find_ports):
389 find_ports.return_value = {'SPA': {'spa_eth1'}}
390 network_info = mocked_input['network_info__flat']
392 self.assertRaises(fake_exceptions.UnityException,
393 connection.setup_server,
394 network_info)
396 @res_mock.mock_manila_input
397 @res_mock.patch_connection
398 def test_teardown_server(self, connection, mocked_input):
399 server_detail = mocked_input['server_detail']
400 security_services = mocked_input['security_services']
402 connection.teardown_server(server_detail, security_services)
404 @res_mock.mock_manila_input
405 @res_mock.patch_connection
406 def test_teardown_server__no_server_detail(self, connection, mocked_input):
407 security_services = mocked_input['security_services']
409 connection.teardown_server(None, security_services)
411 @res_mock.mock_manila_input
412 @res_mock.patch_connection
413 def test_teardown_server__no_share_server_name(self, connection,
414 mocked_input):
415 server_detail = {'share_server_name': None}
416 security_services = mocked_input['security_services']
418 connection.teardown_server(server_detail, security_services)
420 @ddt.data({'configured_pools': None,
421 'matched_pools': {'pool_1', 'pool_2', 'nas_server_pool'}},
422 {'configured_pools': ['*'],
423 'matched_pools': {'pool_1', 'pool_2', 'nas_server_pool'}},
424 {'configured_pools': ['pool_*'],
425 'matched_pools': {'pool_1', 'pool_2'}},
426 {'configured_pools': ['*pool'],
427 'matched_pools': {'nas_server_pool'}},
428 {'configured_pools': ['nas_server_pool'],
429 'matched_pools': {'nas_server_pool'}},
430 {'configured_pools': ['nas_*', 'pool_*'],
431 'matched_pools': {'pool_1', 'pool_2', 'nas_server_pool'}})
432 @res_mock.patch_connection
433 @ddt.unpack
434 def test__get_managed_pools(self, connection, mocked_input):
435 configured_pools = mocked_input['configured_pools']
436 matched_pool = mocked_input['matched_pools']
438 pools = connection._get_managed_pools(configured_pools)
440 self.assertEqual(matched_pool, pools)
442 @res_mock.patch_connection
443 def test__get_managed_pools__invalid_pool_configuration(self, connection):
444 configured_pools = 'fake_pool'
446 self.assertRaises(exception.BadConfigurationException,
447 connection._get_managed_pools,
448 configured_pools)
450 @res_mock.patch_connection
451 def test_validate_port_configuration(self, connection):
452 sp_ports_map = connection.validate_port_configuration(['sp*'])
454 self.assertEqual({'spa_eth1', 'spa_eth2', 'spa_la_4'},
455 sp_ports_map['SPA'])
456 self.assertEqual({'spb_eth1'}, sp_ports_map['SPB'])
458 @res_mock.patch_connection
459 def test_validate_port_configuration_exception(self, connection):
460 self.assertRaises(exception.BadConfigurationException,
461 connection.validate_port_configuration,
462 ['xxxx*'])
464 @res_mock.patch_connection
465 def test__get_pool_name_from_host__no_pool_name(self, connection):
466 host = 'openstack@Unity'
468 self.assertRaises(exception.InvalidHost,
469 connection._get_pool_name_from_host,
470 host)
472 @res_mock.mock_manila_input
473 @res_mock.patch_connection
474 def test_create_cifs_share_from_snapshot(self, connection, mocked_input):
475 share = mocked_input['cifs_share']
476 snapshot = mocked_input['snapshot']
477 share_server = mocked_input['share_server']
479 connection.create_share_from_snapshot(None, share, snapshot,
480 share_server)
482 @res_mock.mock_manila_input
483 @res_mock.patch_connection
484 def test_create_nfs_share_from_snapshot(self, connection, mocked_input):
485 share = mocked_input['nfs_share']
486 snapshot = mocked_input['snapshot']
487 share_server = mocked_input['share_server']
489 connection.create_share_from_snapshot(None, share, snapshot,
490 share_server)
492 @res_mock.mock_manila_input
493 @res_mock.patch_connection
494 def test_create_share_from_snapshot_no_server_name(self,
495 connection,
496 mocked_input):
497 share = mocked_input['nfs_share']
498 snapshot = mocked_input['snapshot']
499 share_server = mocked_input['share_server__no_share_server_name']
501 self.assertRaises(exception.EMCUnityError,
502 connection.create_share_from_snapshot,
503 None,
504 share,
505 snapshot,
506 share_server)
508 @res_mock.mock_manila_input
509 @res_mock.patch_connection
510 def test_clear_share_access_cifs(self, connection, mocked_input):
511 share = mocked_input['cifs_share']
513 self.assertRaises(fake_exceptions.UnityException,
514 connection.clear_access,
515 share)
517 @res_mock.mock_manila_input
518 @res_mock.patch_connection
519 def test_clear_share_access_nfs(self, connection, mocked_input):
520 share = mocked_input['nfs_share']
522 self.assertRaises(fake_exceptions.UnityException,
523 connection.clear_access,
524 share)
526 @res_mock.mock_manila_input
527 @res_mock.patch_connection
528 def test_allow_rw_cifs_share_access(self, connection, mocked_input):
529 share = mocked_input['cifs_share']
530 rw_access = mocked_input['cifs_rw_access']
531 share_server = mocked_input['share_server']
533 connection.allow_access(None, share, rw_access, share_server)
535 @res_mock.mock_manila_input
536 @res_mock.patch_connection
537 def test_update_access_allow_rw(self, connection, mocked_input):
538 share = mocked_input['cifs_share']
539 rw_access = mocked_input['cifs_rw_access']
540 share_server = mocked_input['share_server']
542 connection.update_access(None, share, None, [rw_access], None,
543 share_server)
545 @res_mock.mock_manila_input
546 @res_mock.patch_connection
547 def test_update_access_recovery(self, connection, mocked_input):
548 share = mocked_input['cifs_share']
549 rw_access = mocked_input['cifs_rw_access']
550 share_server = mocked_input['share_server']
552 connection.update_access(None, share, [rw_access], None, None,
553 share_server)
555 @res_mock.mock_manila_input
556 @res_mock.patch_connection
557 def test_allow_ro_cifs_share_access(self, connection, mocked_input):
558 share = mocked_input['cifs_share']
559 rw_access = mocked_input['cifs_ro_access']
560 share_server = mocked_input['share_server']
562 connection.allow_access(None, share, rw_access, share_server)
564 @res_mock.mock_manila_input
565 @res_mock.patch_connection
566 def test_allow_rw_nfs_share_access(self, connection, mocked_input):
567 share = mocked_input['nfs_share']
568 rw_access = mocked_input['nfs_rw_access']
569 share_server = mocked_input['share_server']
571 connection.allow_access(None, share, rw_access, share_server)
573 @res_mock.mock_manila_input
574 @res_mock.patch_connection
575 def test_allow_rw_nfs_share_access_cidr(self, connection, mocked_input):
576 share = mocked_input['nfs_share']
577 rw_access = mocked_input['nfs_rw_access_cidr']
578 share_server = mocked_input['share_server']
580 connection.allow_access(None, share, rw_access, share_server)
582 @res_mock.mock_manila_input
583 @res_mock.patch_connection
584 def test_allow_ro_nfs_share_access(self, connection, mocked_input):
585 share = mocked_input['nfs_share']
586 ro_access = mocked_input['nfs_ro_access']
587 share_server = mocked_input['share_server']
589 connection.allow_access(None, share, ro_access, share_server)
591 @res_mock.mock_manila_input
592 @res_mock.patch_connection
593 def test_deny_cifs_share_access(self, connection, mocked_input):
594 share = mocked_input['cifs_share']
595 rw_access = mocked_input['cifs_rw_access']
596 share_server = mocked_input['share_server']
598 connection.deny_access(None, share, rw_access, share_server)
600 @res_mock.mock_manila_input
601 @res_mock.patch_connection
602 def test_deny_nfs_share_access(self, connection, mocked_input):
603 share = mocked_input['nfs_share']
604 rw_access = mocked_input['nfs_rw_access']
605 share_server = mocked_input['share_server']
607 connection.deny_access(None, share, rw_access, share_server)
609 @res_mock.mock_manila_input
610 @res_mock.patch_connection
611 def test_update_access_deny_nfs(self, connection, mocked_input):
612 share = mocked_input['nfs_share']
613 rw_access = mocked_input['nfs_rw_access']
615 connection.update_access(None, share, None, None, [rw_access], None)
617 @res_mock.mock_manila_input
618 @res_mock.patch_connection
619 def test__validate_cifs_share_access_type(self, connection, mocked_input):
620 share = mocked_input['cifs_share']
621 rw_access = mocked_input['invalid_access']
623 self.assertRaises(exception.InvalidShareAccess,
624 connection._validate_share_access_type,
625 share,
626 rw_access)
628 @res_mock.mock_manila_input
629 @res_mock.patch_connection
630 def test__validate_nfs_share_access_type(self, connection, mocked_input):
631 share = mocked_input['nfs_share']
632 rw_access = mocked_input['invalid_access']
634 self.assertRaises(exception.InvalidShareAccess,
635 connection._validate_share_access_type,
636 share,
637 rw_access)
639 @res_mock.patch_connection
640 def test_get_network_allocations_number(self, connection):
641 self.assertEqual(1, connection.get_network_allocations_number())
643 @res_mock.patch_connection
644 def test_get_proto_enum(self, connection):
645 self.assertIn('FSSupportedProtocolEnum.CIFS',
646 str(connection._get_proto_enum('CIFS')))
647 self.assertIn('FSSupportedProtocolEnum.NFS',
648 str(connection._get_proto_enum('nfs')))
650 @res_mock.mock_manila_input
651 @res_mock.patch_connection
652 def test_allow_access_error_access_level(self, connection, mocked_input):
653 share = mocked_input['nfs_share']
654 rw_access = mocked_input['invalid_access']
656 self.assertRaises(exception.InvalidShareAccessLevel,
657 connection.allow_access,
658 None, share, rw_access)
660 @res_mock.patch_connection
661 def test__create_network_interface_ipv6(self, connection):
662 connection.client.create_interface = mock.Mock(return_value=None)
663 nas_server = mock.Mock()
664 network = {'ip_address': '2001:db8:0:1:f816:3eff:fe76:35c4',
665 'cidr': '2001:db8:0:1:f816:3eff:fe76:35c4/64',
666 'gateway': '2001:db8:0:1::1',
667 'segmentation_id': '201'}
668 port_id = mock.Mock()
669 connection._create_network_interface(nas_server, network, port_id)
671 expected = {'ip_addr': '2001:db8:0:1:f816:3eff:fe76:35c4',
672 'netmask': None,
673 'gateway': '2001:db8:0:1::1',
674 'port_id': port_id,
675 'vlan_id': '201',
676 'prefix_length': '64'}
677 connection.client.create_interface.assert_called_once_with(nas_server,
678 **expected)
680 @res_mock.patch_connection
681 def test__create_network_interface_ipv4(self, connection):
682 connection.client.create_interface = mock.Mock(return_value=None)
683 nas_server = mock.Mock()
684 network = {'ip_address': '192.168.1.10',
685 'cidr': '192.168.1.10/24',
686 'gateway': '192.168.1.1',
687 'segmentation_id': '201'}
688 port_id = mock.Mock()
689 connection._create_network_interface(nas_server, network, port_id)
691 expected = {'ip_addr': '192.168.1.10',
692 'netmask': '255.255.255.0',
693 'gateway': '192.168.1.1',
694 'port_id': port_id,
695 'vlan_id': '201'}
696 connection.client.create_interface.assert_called_once_with(nas_server,
697 **expected)
699 @res_mock.mock_manila_input
700 @res_mock.patch_connection
701 def test_revert_to_snapshot(self, connection, mocked_input):
702 context = mock.Mock()
703 snapshot = mocked_input['snapshot']
704 share_access_rules = [mocked_input['nfs_rw_access'], ]
705 snapshot_access_rules = [mocked_input['nfs_rw_access'], ]
707 connection.revert_to_snapshot(context, snapshot, share_access_rules,
708 snapshot_access_rules)
710 @res_mock.patch_connection_init
711 def test_dhss_false_connect_without_nas_server(self, connection):
712 self.assertRaises(exception.BadConfigurationException,
713 connection.connect,
714 res_mock.FakeEMCShareDriver(dhss=False), None)
716 @res_mock.mock_manila_input
717 @res_mock.patch_connection
718 def test_dhss_false_create_nfs_share(self, connection, mocked_input):
719 connection.driver_handles_share_servers = False
720 connection.unity_share_server = 'test-dhss-false-427f-b4de-0ad83el5j8'
721 share = mocked_input['dhss_false_nfs_share']
722 share_server = mocked_input['share_server']
724 location = connection.create_share(None, share, share_server)
726 exp_location = [
727 {'path': 'fake_ip_addr_1:/cb532599-8dc6-4c3e-bb21-74ea54be566c'},
728 {'path': 'fake_ip_addr_2:/cb532599-8dc6-4c3e-bb21-74ea54be566c'},
729 ]
730 exp_location = sorted(exp_location, key=lambda x: sorted(x['path']))
731 location = sorted(location, key=lambda x: sorted(x['path']))
732 self.assertEqual(exp_location, location)
734 @res_mock.mock_manila_input
735 @res_mock.patch_connection
736 def test_dhss_false_create_cifs_share(self, connection, mocked_input):
737 connection.driver_handles_share_servers = False
738 connection.unity_share_server = 'test-dhss-false-427f-b4de-0ad83el5j8'
739 share = mocked_input['dhss_false_cifs_share']
740 share_server = mocked_input['share_server']
742 location = connection.create_share(None, share, share_server)
744 exp_location = [
745 {'path': r'\\fake_ip_addr_1\716100cc-e0b4-416b-ac27-d38dd019330d'},
746 {'path': r'\\fake_ip_addr_2\716100cc-e0b4-416b-ac27-d38dd019330d'},
747 ]
748 exp_location = sorted(exp_location, key=lambda x: sorted(x['path']))
749 location = sorted(location, key=lambda x: sorted(x['path']))
750 self.assertEqual(exp_location, location)
752 @res_mock.mock_manila_input
753 @res_mock.patch_connection
754 def test_get_share_server_id(self, connection, mocked_input):
755 share_server = mocked_input['share_server']
756 result = connection._get_server_name(share_server)
757 expected = 'c2e48947-98ed-4eae-999b-fa0b83731dfd'
758 self.assertEqual(expected, result)
760 @res_mock.mock_manila_input
761 @res_mock.patch_connection
762 def test_manage_snapshot(self, connection, mocked_input):
763 snapshot = mocked_input['snapshot']
764 driver_options = {'size': 8}
765 result = connection.manage_existing_snapshot(snapshot,
766 driver_options, None)
767 expected = {'provider_location': '23047-ef2344-4563cvw-r4323cwed',
768 'size': 8}
769 self.assertEqual(expected, result)
771 @res_mock.mock_manila_input
772 @res_mock.patch_connection
773 def test_manage_snapshot_wrong_size_type(self, connection, mocked_input):
774 snapshot = mocked_input['snapshot']
775 driver_options = {'size': 'str_size'}
776 self.assertRaises(exception.ManageInvalidShareSnapshot,
777 connection.manage_existing_snapshot,
778 snapshot, driver_options, None)
780 @res_mock.mock_manila_input
781 @res_mock.patch_connection
782 def test_manage_snapshot_with_server(self, connection, mocked_input):
783 share_server = mocked_input['share_server']
784 snapshot = mocked_input['snapshot']
785 driver_options = {}
786 result = connection.manage_existing_snapshot_with_server(
787 snapshot, driver_options, share_server)
788 expected = {'provider_location': '23047-ef2344-4563cvw-r4323cwed',
789 'size': 1}
790 self.assertEqual(expected, result)
792 @res_mock.mock_manila_input
793 @res_mock.patch_connection
794 def test_get_share_server_network_info(self, connection, mocked_input):
795 share_server = mocked_input['share_server']
796 identifier = 'test_manage_nas_server'
797 result = connection.get_share_server_network_info(None, share_server,
798 identifier, None)
799 expected = ['fake_ip_addr_1', 'fake_ip_addr_2']
800 self.assertEqual(expected, result)
802 @res_mock.mock_manila_input
803 @res_mock.patch_connection
804 def test_manage_server(self, connection, mocked_input):
805 share_server = mocked_input['share_server']
806 identifier = 'test_manage_nas_server'
807 result = connection.manage_server(None, share_server, identifier, None)
808 expected = (identifier, None)
809 self.assertEqual(expected, result)
811 @res_mock.mock_manila_input
812 @res_mock.patch_connection
813 def test_manage_nfs_share(self, connection, mocked_input):
814 share = mocked_input['managed_nfs_share']
815 driver_options = {'size': 3}
816 result = connection.manage_existing(share, driver_options)
817 path = '172.168.201.201:/ad1caddf-097e-462c-8ac6-5592ed6fe22f'
818 expected = {'export_locations': {'path': path}, 'size': 3}
819 self.assertEqual(expected, result)
821 @res_mock.mock_manila_input
822 @res_mock.patch_connection
823 def test_manage_nfs_share_with_server(self, connection, mocked_input):
824 share = mocked_input['managed_nfs_share']
825 share_server = mocked_input['share_server']
826 driver_options = {'size': 8}
827 result = connection.manage_existing_with_server(share, driver_options,
828 share_server)
829 path = '172.168.201.201:/ad1caddf-097e-462c-8ac6-5592ed6fe22f'
830 expected = {'export_locations': {'path': path}, 'size': 8}
831 self.assertEqual(expected, result)
833 @res_mock.mock_manila_input
834 @res_mock.patch_connection
835 def test_manage_cifs_share(self, connection, mocked_input):
836 share = mocked_input['managed_cifs_share']
837 driver_options = {'size': 3}
838 result = connection.manage_existing(share, driver_options)
839 path = '\\\\10.0.0.1\\bd23121f-hg4e-432c-12cd2c5-bb93dfghe212'
840 expected = {'export_locations': {'path': path}, 'size': 3}
841 self.assertEqual(expected, result)
843 @res_mock.mock_manila_input
844 @res_mock.patch_connection
845 def test_manage_cifs_share_with_server(self, connection, mocked_input):
846 connection.client.create_interface = mock.Mock(return_value=None)
847 share = mocked_input['managed_cifs_share']
848 share_server = mocked_input['share_server']
849 driver_options = {'size': 3}
850 result = connection.manage_existing_with_server(share, driver_options,
851 share_server)
852 path = '\\\\10.0.0.1\\bd23121f-hg4e-432c-12cd2c5-bb93dfghe212'
853 expected = {'export_locations': {'path': path}, 'size': 3}
854 self.assertEqual(expected, result)
856 @res_mock.mock_manila_input
857 @res_mock.patch_connection
858 def test_manage_with_wrong_size_data_type(self, connection, mocked_input):
859 connection.client.create_interface = mock.Mock(return_value=None)
860 share = mocked_input['managed_nfs_share']
861 share_server = mocked_input['share_server']
862 driver_options = {'size': 'str_size'}
863 self.assertRaises(exception.ManageInvalidShare,
864 connection.manage_existing_with_server,
865 share, driver_options, share_server)
867 @res_mock.mock_manila_input
868 @res_mock.patch_connection
869 def test_manage_without_size(self, connection, mocked_input):
870 connection.client.create_interface = mock.Mock(return_value=None)
871 share = mocked_input['managed_nfs_share']
872 share_server = mocked_input['share_server']
873 driver_options = {'size': 0}
874 result = connection.manage_existing_with_server(share, driver_options,
875 share_server)
876 path = '172.168.201.201:/ad1caddf-097e-462c-8ac6-5592ed6fe22f'
877 expected = {'export_locations': {'path': path}, 'size': 1}
878 self.assertEqual(expected, result)
880 @res_mock.mock_manila_input
881 @res_mock.patch_connection
882 def test_manage_without_export_locations(self, connection, mocked_input):
883 connection.client.create_interface = mock.Mock(return_value=None)
884 share = mocked_input['nfs_share']
885 share_server = mocked_input['share_server']
886 driver_options = {'size': 3}
887 self.assertRaises(exception.ManageInvalidShare,
888 connection.manage_existing_with_server,
889 share, driver_options, share_server)
891 @res_mock.mock_manila_input
892 @res_mock.patch_connection
893 def test_get_default_filter_function_disable_report(self, connection,
894 mocked_input):
895 expected = None
896 actual = connection.get_default_filter_function()
897 self.assertEqual(expected, actual)
899 @res_mock.mock_manila_input
900 @res_mock.patch_connection
901 def test_get_default_filter_function_enable_report(self, connection,
902 mocked_input):
903 expected = "share.size >= 3"
904 connection.report_default_filter_function = True
905 actual = connection.get_default_filter_function()
906 self.assertEqual(expected, actual)