Coverage for manila/tests/share/test_driver.py: 100%
504 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2012 NetApp
2# Copyright 2014 Mirantis Inc.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16"""Unit tests for the Share driver module."""
18import time
19from unittest import mock
22import ddt
24from manila.common import constants
25from manila import exception
26from manila import network
27from manila.share import configuration
28from manila.share import driver
29from manila import test
30from manila.tests import utils as test_utils
31from manila import utils
34def fake_execute_with_raise(*cmd, **kwargs):
35 raise exception.ProcessExecutionError
38def fake_sleep(duration):
39 pass
42class ShareDriverWithExecuteMixin(driver.ShareDriver, driver.ExecuteMixin):
43 pass
46@ddt.ddt
47class ShareDriverTestCase(test.TestCase):
48 _SNAPSHOT_METHOD_NAMES = ["create_snapshot", "delete_snapshot"]
50 def setUp(self):
51 super(ShareDriverTestCase, self).setUp()
52 self.utils = utils
53 self.mock_object(self.utils, 'execute', fake_execute_with_raise)
54 self.time = time
55 self.mock_object(self.time, 'sleep', fake_sleep)
56 driver.CONF.set_default('driver_handles_share_servers', True)
58 def test__try_execute(self):
59 execute_mixin = ShareDriverWithExecuteMixin(
60 True, configuration=configuration.Configuration(None))
61 self.assertRaises(exception.ProcessExecutionError,
62 execute_mixin._try_execute)
64 def test_verify_share_driver_mode_option_type(self):
65 data = {'DEFAULT': {'driver_handles_share_servers': 'True'}}
66 with test_utils.create_temp_config_with_opts(data):
67 share_driver = driver.ShareDriver([True, False])
68 self.assertTrue(share_driver.driver_handles_share_servers)
70 def _instantiate_share_driver(self, network_config_group,
71 driver_handles_share_servers,
72 admin_network_config_group=None):
73 self.mock_object(network, 'API')
74 config = mock.Mock()
75 config.append_config_values = mock.Mock()
76 config.config_group = 'fake_config_group'
77 config.network_config_group = network_config_group
78 if admin_network_config_group:
79 config.admin_network_config_group = admin_network_config_group
80 config.safe_get = mock.Mock(return_value=driver_handles_share_servers)
82 share_driver = driver.ShareDriver([True, False], configuration=config)
84 self.assertTrue(hasattr(share_driver, 'configuration'))
85 config.append_config_values.assert_called_once_with(driver.share_opts)
86 if driver_handles_share_servers:
87 calls = []
88 if network_config_group:
89 calls.append(mock.call(
90 config_group_name=config.network_config_group))
91 else:
92 calls.append(mock.call(
93 config_group_name=config.config_group))
94 if admin_network_config_group:
95 calls.append(mock.call(
96 config_group_name=config.admin_network_config_group,
97 label='admin'))
98 network.API.assert_has_calls(calls)
99 self.assertTrue(hasattr(share_driver, 'network_api'))
100 self.assertTrue(hasattr(share_driver, 'admin_network_api'))
101 self.assertIsNotNone(share_driver.network_api)
102 self.assertIsNotNone(share_driver.admin_network_api)
103 else:
104 self.assertFalse(hasattr(share_driver, 'network_api'))
105 self.assertTrue(hasattr(share_driver, 'admin_network_api'))
106 self.assertIsNone(share_driver.admin_network_api)
107 self.assertFalse(network.API.called)
108 return share_driver
110 def test_instantiate_share_driver(self):
111 self._instantiate_share_driver(None, True)
113 def test_instantiate_share_driver_another_config_group(self):
114 self._instantiate_share_driver("fake_network_config_group", True)
116 def test_instantiate_share_driver_with_admin_network(self):
117 self._instantiate_share_driver(
118 "fake_network_config_group", True,
119 "fake_admin_network_config_group")
121 def test_instantiate_share_driver_no_configuration(self):
122 self.mock_object(network, 'API')
124 share_driver = driver.ShareDriver(True, configuration=None)
126 self.assertIsNone(share_driver.configuration)
127 network.API.assert_called_once_with(config_group_name=None)
129 def test_get_share_stats_refresh_false(self):
130 share_driver = driver.ShareDriver(True, configuration=None)
131 share_driver._stats = {'fake_key': 'fake_value'}
133 result = share_driver.get_share_stats(False)
135 self.assertEqual(share_driver._stats, result)
137 def test_get_share_stats_refresh_true(self):
138 conf = configuration.Configuration(None)
139 expected_keys = [
140 'qos', 'driver_version', 'share_backend_name',
141 'free_capacity_gb', 'total_capacity_gb',
142 'driver_handles_share_servers',
143 'reserved_percentage', 'reserved_snapshot_percentage',
144 'reserved_share_extend_percentage',
145 'vendor_name', 'storage_protocol',
146 'snapshot_support', 'mount_snapshot_support',
147 'mount_point_name_support',
148 ]
149 share_driver = driver.ShareDriver(True, configuration=conf)
150 fake_stats = {'fake_key': 'fake_value'}
151 share_driver._stats = fake_stats
153 result = share_driver.get_share_stats(True)
155 self.assertNotEqual(fake_stats, result)
156 for key in expected_keys:
157 self.assertIn(key, result)
158 self.assertEqual('Open Source', result['vendor_name'])
160 def test_get_share_status(self):
161 share_driver = self._instantiate_share_driver(None, False)
162 self.assertRaises(NotImplementedError,
163 share_driver.get_share_status,
164 None, None)
166 @ddt.data(
167 {'opt': True, 'allowed': True},
168 {'opt': True, 'allowed': (True, False)},
169 {'opt': True, 'allowed': [True, False]},
170 {'opt': True, 'allowed': set([True, False])},
171 {'opt': False, 'allowed': False},
172 {'opt': False, 'allowed': (True, False)},
173 {'opt': False, 'allowed': [True, False]},
174 {'opt': False, 'allowed': set([True, False])})
175 @ddt.unpack
176 def test__verify_share_server_handling_valid_cases(self, opt, allowed):
177 conf = configuration.Configuration(None)
178 self.mock_object(conf, 'safe_get', mock.Mock(return_value=opt))
179 share_driver = driver.ShareDriver(allowed, configuration=conf)
180 self.assertTrue(conf.safe_get.called)
181 self.assertEqual(opt, share_driver.driver_handles_share_servers)
183 @ddt.data(
184 {'opt': False, 'allowed': True},
185 {'opt': True, 'allowed': False},
186 {'opt': None, 'allowed': True},
187 {'opt': 'True', 'allowed': True},
188 {'opt': 'False', 'allowed': False},
189 {'opt': [], 'allowed': True},
190 {'opt': True, 'allowed': []},
191 {'opt': True, 'allowed': ['True']},
192 {'opt': False, 'allowed': ['False']})
193 @ddt.unpack
194 def test__verify_share_server_handling_invalid_cases(self, opt, allowed):
195 conf = configuration.Configuration(None)
196 self.mock_object(conf, 'safe_get', mock.Mock(return_value=opt))
197 self.assertRaises(
198 exception.ManilaException,
199 driver.ShareDriver, allowed, configuration=conf)
200 self.assertTrue(conf.safe_get.called)
202 def test_setup_server_handling_disabled(self):
203 share_driver = self._instantiate_share_driver(None, False)
204 # We expect successful execution, nothing to assert
205 share_driver.setup_server('Nothing is expected to happen.')
207 def test_setup_server_handling_enabled(self):
208 share_driver = self._instantiate_share_driver(None, True)
209 self.assertRaises(
210 NotImplementedError,
211 share_driver.setup_server,
212 'fake_network_info')
214 def test_teardown_server_handling_disabled(self):
215 share_driver = self._instantiate_share_driver(None, False)
216 # We expect successful execution, nothing to assert
217 share_driver.teardown_server('Nothing is expected to happen.')
219 def test_teardown_server_handling_enabled(self):
220 share_driver = self._instantiate_share_driver(None, True)
221 self.assertRaises(
222 NotImplementedError,
223 share_driver.teardown_server,
224 'fake_share_server_details')
226 def _assert_is_callable(self, obj, attr):
227 self.assertTrue(callable(getattr(obj, attr)))
229 @ddt.data('manage_existing',
230 'unmanage')
231 def test_drivers_methods_needed_by_manage_functionality(self, method):
232 share_driver = self._instantiate_share_driver(None, False)
234 self._assert_is_callable(share_driver, method)
236 @ddt.data('manage_existing_snapshot',
237 'unmanage_snapshot')
238 def test_drivers_methods_needed_by_manage_snapshot_functionality(
239 self, method):
240 share_driver = self._instantiate_share_driver(None, False)
242 self._assert_is_callable(share_driver, method)
244 @ddt.data('revert_to_snapshot',
245 'revert_to_replicated_snapshot')
246 def test_drivers_methods_needed_by_share_revert_to_snapshot_functionality(
247 self, method):
248 share_driver = self._instantiate_share_driver(None, False)
250 self._assert_is_callable(share_driver, method)
252 @ddt.data(True, False)
253 def test_get_share_server_pools(self, value):
254 driver.CONF.set_default('driver_handles_share_servers', value)
255 share_driver = driver.ShareDriver(value)
256 self.assertEqual([],
257 share_driver.get_share_server_pools('fake_server'))
259 def test_check_for_setup_error(self):
260 driver.CONF.set_default('driver_handles_share_servers', False)
261 share_driver = driver.ShareDriver(False)
262 share_driver.configuration = configuration.Configuration(None)
263 share_driver.check_for_setup_error()
265 def test_snapshot_support_exists(self):
266 driver.CONF.set_default('driver_handles_share_servers', True)
267 fake_method = lambda *args, **kwargs: None # noqa: E731
268 child_methods = {
269 "create_snapshot": fake_method,
270 "delete_snapshot": fake_method,
271 }
272 child_class_instance = type(
273 "NotRedefined", (driver.ShareDriver, ), child_methods)(True)
274 self.mock_object(child_class_instance, "configuration")
276 child_class_instance._update_share_stats()
278 self.assertTrue(child_class_instance._stats["snapshot_support"])
279 self.assertTrue(child_class_instance.configuration.safe_get.called)
281 @ddt.data(
282 ([], [], False),
283 (_SNAPSHOT_METHOD_NAMES, [], True),
284 (_SNAPSHOT_METHOD_NAMES, _SNAPSHOT_METHOD_NAMES, True),
285 (_SNAPSHOT_METHOD_NAMES[0:1], _SNAPSHOT_METHOD_NAMES[1:], True),
286 ([], _SNAPSHOT_METHOD_NAMES, True),
287 )
288 @ddt.unpack
289 def test_check_redefined_driver_methods(self, common_drv_meth_names,
290 child_drv_meth_names,
291 expected_result):
292 # This test covers the case of drivers inheriting other drivers or
293 # common classes.
295 driver.CONF.set_default('driver_handles_share_servers', True)
297 common_drv_methods, child_drv_methods = [
298 {method_name: lambda *args, **kwargs: None # noqa: E731
299 for method_name in method_names}
300 for method_names in (common_drv_meth_names,
301 child_drv_meth_names)]
303 common_drv = type(
304 "NotRedefinedCommon", (driver.ShareDriver, ), common_drv_methods)
305 child_drv_instance = type("NotRedefined", (common_drv, ),
306 child_drv_methods)(True)
308 has_redefined_methods = (
309 child_drv_instance._has_redefined_driver_methods(
310 self._SNAPSHOT_METHOD_NAMES))
312 self.assertEqual(expected_result, has_redefined_methods)
314 @ddt.data(
315 (),
316 ("create_snapshot"),
317 ("delete_snapshot"),
318 ("create_snapshot", "delete_snapshotFOO"),
319 )
320 def test_snapshot_support_absent(self, methods):
321 driver.CONF.set_default('driver_handles_share_servers', True)
322 fake_method = lambda *args, **kwargs: None # noqa: E731
323 child_methods = {}
324 for method in methods:
325 child_methods[method] = fake_method
326 child_class_instance = type(
327 "NotRedefined", (driver.ShareDriver, ), child_methods)(True)
328 self.mock_object(child_class_instance, "configuration")
330 child_class_instance._update_share_stats()
332 self.assertFalse(child_class_instance._stats["snapshot_support"])
333 self.assertTrue(child_class_instance.configuration.safe_get.called)
335 @ddt.data(True, False)
336 def test_snapshot_support_not_exists_and_set_explicitly(
337 self, snapshots_are_supported):
338 driver.CONF.set_default('driver_handles_share_servers', True)
339 child_class_instance = type(
340 "NotRedefined", (driver.ShareDriver, ), {})(True)
341 self.mock_object(child_class_instance, "configuration")
343 child_class_instance._update_share_stats(
344 {"snapshot_support": snapshots_are_supported})
346 self.assertEqual(
347 snapshots_are_supported,
348 child_class_instance._stats["snapshot_support"])
349 self.assertTrue(child_class_instance.configuration.safe_get.called)
351 @ddt.data(True, False)
352 def test_snapshot_support_exists_and_set_explicitly(
353 self, snapshots_are_supported):
354 driver.CONF.set_default('driver_handles_share_servers', True)
355 fake_method = lambda *args, **kwargs: None # noqa: E731
356 child_methods = {
357 "create_snapshot": fake_method,
358 "delete_snapshot": fake_method,
359 }
360 child_class_instance = type(
361 "NotRedefined", (driver.ShareDriver, ), child_methods)(True)
362 self.mock_object(child_class_instance, "configuration")
364 child_class_instance._update_share_stats(
365 {"snapshot_support": snapshots_are_supported})
367 self.assertEqual(
368 snapshots_are_supported,
369 child_class_instance._stats["snapshot_support"])
370 self.assertTrue(child_class_instance.configuration.safe_get.called)
372 def test_create_share_from_snapshot_support_exists(self):
373 driver.CONF.set_default('driver_handles_share_servers', True)
374 fake_method = lambda *args, **kwargs: None # noqa: E731
375 child_methods = {
376 "create_share_from_snapshot": fake_method,
377 "create_snapshot": fake_method,
378 "delete_snapshot": fake_method,
379 }
380 child_class_instance = type(
381 "NotRedefined", (driver.ShareDriver, ), child_methods)(True)
382 self.mock_object(child_class_instance, "configuration")
384 child_class_instance._update_share_stats()
386 self.assertTrue(
387 child_class_instance._stats["create_share_from_snapshot_support"])
388 self.assertTrue(child_class_instance.configuration.safe_get.called)
390 @ddt.data(
391 (),
392 ("create_snapshot"),
393 ("create_share_from_snapshotFOO"),
394 )
395 def test_create_share_from_snapshot_support_absent(self, methods):
396 driver.CONF.set_default('driver_handles_share_servers', True)
397 fake_method = lambda *args, **kwargs: None # noqa: E731
398 child_methods = {}
399 for method in methods:
400 child_methods[method] = fake_method
401 child_class_instance = type(
402 "NotRedefined", (driver.ShareDriver, ), child_methods)(True)
403 self.mock_object(child_class_instance, "configuration")
405 child_class_instance._update_share_stats()
407 self.assertFalse(
408 child_class_instance._stats["create_share_from_snapshot_support"])
409 self.assertTrue(child_class_instance.configuration.safe_get.called)
411 @ddt.data(True, False)
412 def test_create_share_from_snapshot_not_exists_and_set_explicitly(
413 self, creating_shares_from_snapshot_is_supported):
414 driver.CONF.set_default('driver_handles_share_servers', True)
415 child_class_instance = type(
416 "NotRedefined", (driver.ShareDriver, ), {})(True)
417 self.mock_object(child_class_instance, "configuration")
419 child_class_instance._update_share_stats({
420 "create_share_from_snapshot_support":
421 creating_shares_from_snapshot_is_supported,
422 })
424 self.assertEqual(
425 creating_shares_from_snapshot_is_supported,
426 child_class_instance._stats["create_share_from_snapshot_support"])
427 self.assertTrue(child_class_instance.configuration.safe_get.called)
429 @ddt.data(True, False)
430 def test_create_share_from_snapshot_exists_and_set_explicitly(
431 self, create_share_from_snapshot_supported):
432 driver.CONF.set_default('driver_handles_share_servers', True)
433 fake_method = lambda *args, **kwargs: None # noqa: E731
434 child_methods = {"create_share_from_snapshot": fake_method}
435 child_class_instance = type(
436 "NotRedefined", (driver.ShareDriver, ), child_methods)(True)
437 self.mock_object(child_class_instance, "configuration")
439 child_class_instance._update_share_stats({
440 "create_share_from_snapshot_support":
441 create_share_from_snapshot_supported,
442 })
444 self.assertEqual(
445 create_share_from_snapshot_supported,
446 child_class_instance._stats["create_share_from_snapshot_support"])
447 self.assertTrue(child_class_instance.configuration.safe_get.called)
449 def test_get_periodic_hook_data(self):
450 share_driver = self._instantiate_share_driver(None, False)
451 share_instances = ["list", "of", "share", "instances"]
453 result = share_driver.get_periodic_hook_data(
454 "fake_context", share_instances)
456 self.assertEqual(share_instances, result)
458 def test_get_admin_network_allocations_number(self):
459 share_driver = self._instantiate_share_driver(None, True)
461 self.assertEqual(
462 0, share_driver.get_admin_network_allocations_number())
464 def test_allocate_admin_network_count_None(self):
465 share_driver = self._instantiate_share_driver(None, True)
466 ctxt = 'fake_context'
467 share_server = 'fake_share_server'
468 mock_get_admin_network_allocations_number = self.mock_object(
469 share_driver,
470 'get_admin_network_allocations_number',
471 mock.Mock(return_value=0))
472 self.mock_object(
473 share_driver.admin_network_api,
474 'allocate_network',
475 mock.Mock(side_effect=Exception('ShouldNotBeRaised')))
477 share_driver.allocate_admin_network(ctxt, share_server)
479 mock_get_admin_network_allocations_number.assert_called_once_with()
480 self.assertFalse(
481 share_driver.admin_network_api.allocate_network.called)
483 def test_allocate_admin_network_count_0(self):
484 share_driver = self._instantiate_share_driver(None, True)
485 ctxt = 'fake_context'
486 share_server = 'fake_share_server'
487 self.mock_object(
488 share_driver,
489 'get_admin_network_allocations_number',
490 mock.Mock(return_value=0))
491 self.mock_object(
492 share_driver.admin_network_api,
493 'allocate_network',
494 mock.Mock(side_effect=Exception('ShouldNotBeRaised')))
496 share_driver.allocate_admin_network(ctxt, share_server, count=0)
498 self.assertFalse(
499 share_driver.get_admin_network_allocations_number.called)
500 self.assertFalse(
501 share_driver.admin_network_api.allocate_network.called)
503 def test_allocate_admin_network_count_1_api_initialized(self):
504 share_driver = self._instantiate_share_driver(None, True)
505 ctxt = 'fake_context'
506 share_server = 'fake_share_server'
507 mock_get_admin_network_allocations_number = self.mock_object(
508 share_driver,
509 'get_admin_network_allocations_number',
510 mock.Mock(return_value=1))
511 self.mock_object(
512 share_driver.admin_network_api,
513 'allocate_network',
514 mock.Mock())
516 share_driver.allocate_admin_network(ctxt, share_server)
518 mock_get_admin_network_allocations_number.assert_called_once_with()
519 (share_driver.admin_network_api.allocate_network.
520 assert_called_once_with(ctxt, share_server, count=1))
522 def test_allocate_admin_network_count_1_api_not_initialized(self):
523 share_driver = self._instantiate_share_driver(None, True, None)
524 ctxt = 'fake_context'
525 share_server = 'fake_share_server'
526 share_driver._admin_network_api = None
527 mock_get_admin_network_allocations_number = self.mock_object(
528 share_driver,
529 'get_admin_network_allocations_number',
530 mock.Mock(return_value=1))
532 self.assertRaises(
533 exception.NetworkBadConfigurationException,
534 share_driver.allocate_admin_network,
535 ctxt, share_server,
536 )
537 mock_get_admin_network_allocations_number.assert_called_once_with()
539 def test_migration_start(self):
541 driver.CONF.set_default('driver_handles_share_servers', False)
542 share_driver = driver.ShareDriver(False)
544 self.assertRaises(NotImplementedError, share_driver.migration_start,
545 None, None, None, None, None, None, None)
547 def test_migration_continue(self):
549 driver.CONF.set_default('driver_handles_share_servers', False)
550 share_driver = driver.ShareDriver(False)
552 self.assertRaises(NotImplementedError, share_driver.migration_continue,
553 None, None, None, None, None, None, None)
555 def test_migration_complete(self):
557 driver.CONF.set_default('driver_handles_share_servers', False)
558 share_driver = driver.ShareDriver(False)
560 self.assertRaises(NotImplementedError, share_driver.migration_complete,
561 None, None, None, None, None, None, None)
563 def test_migration_cancel(self):
565 driver.CONF.set_default('driver_handles_share_servers', False)
566 share_driver = driver.ShareDriver(False)
568 self.assertRaises(NotImplementedError, share_driver.migration_cancel,
569 None, None, None, None, None, None, None)
571 def test_migration_get_progress(self):
573 driver.CONF.set_default('driver_handles_share_servers', False)
574 share_driver = driver.ShareDriver(False)
576 self.assertRaises(NotImplementedError,
577 share_driver.migration_get_progress,
578 None, None, None, None, None, None, None)
580 def test_share_server_migration_start(self):
581 driver.CONF.set_default('driver_handles_share_servers', True)
582 share_driver = driver.ShareDriver(True)
584 self.assertRaises(NotImplementedError,
585 share_driver.share_server_migration_start,
586 None, None, None, None, None)
588 def test_share_server_migration_continue(self):
589 driver.CONF.set_default('driver_handles_share_servers', True)
590 share_driver = driver.ShareDriver(True)
592 self.assertRaises(NotImplementedError,
593 share_driver.share_server_migration_continue,
594 None, None, None, None, None)
596 def test_share_server_migration_get_progress(self):
597 driver.CONF.set_default('driver_handles_share_servers', True)
598 share_driver = driver.ShareDriver(True)
600 self.assertRaises(NotImplementedError,
601 share_driver.share_server_migration_get_progress,
602 None, None, None, None, None)
604 def test_share_server_migration_cancel(self):
605 driver.CONF.set_default('driver_handles_share_servers', True)
606 share_driver = driver.ShareDriver(True)
608 self.assertRaises(NotImplementedError,
609 share_driver.share_server_migration_cancel,
610 None, None, None, None, None)
612 def test_share_server_migration_check_compatibility(self):
613 driver.CONF.set_default('driver_handles_share_servers', True)
614 share_driver = driver.ShareDriver(True)
615 expected_compatibility = {
616 'compatible': False,
617 'writable': False,
618 'nondisruptive': False,
619 'preserve_snapshots': False,
620 'migration_cancel': False,
621 'migration_get_progress': False,
622 }
624 driver_compatibility = (
625 share_driver.share_server_migration_check_compatibility(
626 None, None, None, None, None, None))
627 self.assertEqual(expected_compatibility, driver_compatibility)
629 def test_share_server_migration_complete(self):
630 driver.CONF.set_default('driver_handles_share_servers', True)
631 share_driver = driver.ShareDriver(True)
633 self.assertRaises(
634 NotImplementedError,
635 share_driver.share_server_migration_complete,
636 None, None, None, None, None, None)
638 @ddt.data(True, False)
639 def test_connection_get_info(self, admin):
641 expected = {
642 'mount': 'mount -vt nfs %(options)s /fake/fake_id %(path)s',
643 'unmount': 'umount -v %(path)s',
644 'access_mapping': {
645 'ip': ['nfs']
646 }
647 }
649 fake_share = {
650 'id': 'fake_id',
651 'share_proto': 'nfs',
652 'export_locations': [{
653 'path': '/fake/fake_id',
654 'is_admin_only': admin
655 }]
656 }
658 driver.CONF.set_default('driver_handles_share_servers', False)
659 share_driver = driver.ShareDriver(False)
660 share_driver.configuration = configuration.Configuration(None)
662 connection_info = share_driver.connection_get_info(
663 None, fake_share, "fake_server")
665 self.assertEqual(expected, connection_info)
667 def test_migration_check_compatibility(self):
669 driver.CONF.set_default('driver_handles_share_servers', False)
670 share_driver = driver.ShareDriver(False)
671 share_driver.configuration = configuration.Configuration(None)
672 expected = {
673 'compatible': False,
674 'writable': False,
675 'preserve_metadata': False,
676 'nondisruptive': False,
677 'preserve_snapshots': False,
678 }
680 result = share_driver.migration_check_compatibility(
681 None, None, None, None, None)
683 self.assertEqual(expected, result)
685 def test_update_access(self):
686 share_driver = driver.ShareDriver(True, configuration=None)
687 self.assertRaises(
688 NotImplementedError,
689 share_driver.update_access,
690 'ctx',
691 'fake_share',
692 'fake_access_rules',
693 'fake_add_rules',
694 'fake_delete_rules',
695 'fake_update_rules'
696 )
698 def test_create_replica(self):
699 share_driver = self._instantiate_share_driver(None, True)
700 self.assertRaises(NotImplementedError,
701 share_driver.create_replica,
702 'fake_context', ['r1', 'r2'],
703 'fake_new_replica', [], [])
705 def test_delete_replica(self):
706 share_driver = self._instantiate_share_driver(None, True)
707 self.assertRaises(NotImplementedError,
708 share_driver.delete_replica,
709 'fake_context', ['r1', 'r2'],
710 'fake_replica', [])
712 def test_promote_replica(self):
713 share_driver = self._instantiate_share_driver(None, True)
714 self.assertRaises(NotImplementedError,
715 share_driver.promote_replica,
716 'fake_context', [], 'fake_replica', [])
718 def test_update_replica_state(self):
719 share_driver = self._instantiate_share_driver(None, True)
720 self.assertRaises(NotImplementedError,
721 share_driver.update_replica_state,
722 'fake_context', ['r1', 'r2'], 'fake_replica', [], [])
724 def test_create_replicated_snapshot(self):
725 share_driver = self._instantiate_share_driver(None, False)
726 self.assertRaises(NotImplementedError,
727 share_driver.create_replicated_snapshot,
728 'fake_context', ['r1', 'r2'], ['s1', 's2'])
730 def test_delete_replicated_snapshot(self):
731 share_driver = self._instantiate_share_driver(None, False)
732 self.assertRaises(NotImplementedError,
733 share_driver.delete_replicated_snapshot,
734 'fake_context', ['r1', 'r2'], ['s1', 's2'])
736 def test_update_replicated_snapshot(self):
737 share_driver = self._instantiate_share_driver(None, False)
738 self.assertRaises(NotImplementedError,
739 share_driver.update_replicated_snapshot,
740 'fake_context', ['r1', 'r2'], 'r1',
741 ['s1', 's2'], 's1')
743 @ddt.data(True, False)
744 def test_share_group_snapshot_support_exists_and_equals_snapshot_support(
745 self, snapshots_are_supported):
746 driver.CONF.set_default('driver_handles_share_servers', True)
747 child_class_instance = driver.ShareDriver(True)
748 child_class_instance._snapshots_are_supported = snapshots_are_supported
749 self.mock_object(child_class_instance, "configuration")
751 child_class_instance._update_share_stats()
753 self.assertEqual(
754 snapshots_are_supported,
755 child_class_instance._stats["snapshot_support"])
756 self.assertTrue(child_class_instance.configuration.safe_get.called)
758 def test_create_share_group_from_share_group_snapshot(self):
759 share_driver = self._instantiate_share_driver(None, False)
760 fake_shares = [
761 {'id': 'fake_share_%d' % i,
762 'source_share_group_snapshot_member_id': 'fake_member_%d' % i}
763 for i in (1, 2)]
764 fake_share_group_dict = {
765 'source_share_group_snapshot_id': 'some_fake_uuid_abc',
766 'shares': fake_shares,
767 'id': 'some_fake_uuid_def',
768 }
769 fake_share_group_snapshot_dict = {
770 'share_group_snapshot_members': [
771 {'id': 'fake_member_1'}, {'id': 'fake_member_2'}],
772 'id': 'fake_share_group_snapshot_id',
773 }
774 mock_create = self.mock_object(
775 share_driver, 'create_share_from_snapshot',
776 mock.Mock(side_effect=['fake_export1', 'fake_export2']))
777 expected_share_updates = [
778 {
779 'id': 'fake_share_1',
780 'export_locations': 'fake_export1',
781 },
782 {
783 'id': 'fake_share_2',
784 'export_locations': 'fake_export2',
785 },
786 ]
788 share_group_update, share_update = (
789 share_driver.create_share_group_from_share_group_snapshot(
790 'fake_context', fake_share_group_dict,
791 fake_share_group_snapshot_dict))
793 mock_create.assert_has_calls([
794 mock.call(
795 'fake_context',
796 {'id': 'fake_share_1',
797 'source_share_group_snapshot_member_id': 'fake_member_1'},
798 {'id': 'fake_member_1'}),
799 mock.call(
800 'fake_context',
801 {'id': 'fake_share_2',
802 'source_share_group_snapshot_member_id': 'fake_member_2'},
803 {'id': 'fake_member_2'})
804 ])
805 self.assertIsNone(share_group_update)
806 self.assertEqual(expected_share_updates, share_update)
808 def test_create_share_group_from_share_group_snapshot_dhss(self):
809 share_driver = self._instantiate_share_driver(None, True)
810 mock_share_server = mock.Mock()
811 fake_shares = [
812 {'id': 'fake_share_1',
813 'source_share_group_snapshot_member_id': 'foo_member_1'},
814 {'id': 'fake_share_2',
815 'source_share_group_snapshot_member_id': 'foo_member_2'}]
816 fake_share_group_dict = {
817 'source_share_group_snapshot_id': 'some_fake_uuid',
818 'shares': fake_shares,
819 'id': 'eda52174-0442-476d-9694-a58327466c14',
820 }
821 fake_share_group_snapshot_dict = {
822 'share_group_snapshot_members': [
823 {'id': 'foo_member_1'}, {'id': 'foo_member_2'}],
824 'id': 'fake_share_group_snapshot_id'
825 }
826 mock_create = self.mock_object(
827 share_driver, 'create_share_from_snapshot',
828 mock.Mock(side_effect=['fake_export1', 'fake_export2']))
829 expected_share_updates = [
830 {'id': 'fake_share_1', 'export_locations': 'fake_export1'},
831 {'id': 'fake_share_2', 'export_locations': 'fake_export2'},
832 ]
834 share_group_update, share_update = (
835 share_driver.create_share_group_from_share_group_snapshot(
836 'fake_context',
837 fake_share_group_dict,
838 fake_share_group_snapshot_dict, share_server=mock_share_server,
839 )
840 )
842 mock_create.assert_has_calls([
843 mock.call(
844 'fake_context',
845 {'id': 'fake_share_%d' % i,
846 'source_share_group_snapshot_member_id': 'foo_member_%d' % i},
847 {'id': 'foo_member_%d' % i},
848 share_server=mock_share_server)
849 for i in (1, 2)
850 ])
851 self.assertIsNone(share_group_update)
852 self.assertEqual(expected_share_updates, share_update)
854 def test_create_share_group_from_share_group_snapshot_with_dict_raise(
855 self):
856 share_driver = self._instantiate_share_driver(None, False)
857 fake_shares = [
858 {'id': 'fake_share_%d' % i,
859 'source_share_group_snapshot_member_id': 'fake_member_%d' % i}
860 for i in (1, 2)]
861 fake_share_group_dict = {
862 'source_share_group_snapshot_id': 'some_fake_uuid_abc',
863 'shares': fake_shares,
864 'id': 'some_fake_uuid_def',
865 }
866 fake_share_group_snapshot_dict = {
867 'share_group_snapshot_members': [
868 {'id': 'fake_member_1'}, {'id': 'fake_member_2'}],
869 'id': 'fake_share_group_snapshot_id',
870 }
872 self.mock_object(
873 share_driver, 'create_share_from_snapshot',
874 mock.Mock(side_effect=[{
875 'export_locations': 'fake_export1',
876 'status': constants.STATUS_CREATING},
877 {'export_locations': 'fake_export2',
878 'status': constants.STATUS_CREATING}]))
880 self.assertRaises(
881 exception.InvalidShareInstance,
882 share_driver.create_share_group_from_share_group_snapshot,
883 'fake_context',
884 fake_share_group_dict,
885 fake_share_group_snapshot_dict)
887 def test_create_share_group_from_share_group_snapshot_with_dict(
888 self):
889 share_driver = self._instantiate_share_driver(None, False)
890 fake_shares = [
891 {'id': 'fake_share_%d' % i,
892 'source_share_group_snapshot_member_id': 'fake_member_%d' % i}
893 for i in (1, 2)]
894 fake_share_group_dict = {
895 'source_share_group_snapshot_id': 'some_fake_uuid_abc',
896 'shares': fake_shares,
897 'id': 'some_fake_uuid_def',
898 }
899 fake_share_group_snapshot_dict = {
900 'share_group_snapshot_members': [
901 {'id': 'fake_member_1'}, {'id': 'fake_member_2'}],
902 'id': 'fake_share_group_snapshot_id',
903 }
905 mock_create = self.mock_object(
906 share_driver, 'create_share_from_snapshot',
907 mock.Mock(side_effect=[{
908 'export_locations': 'fake_export1',
909 'status': constants.STATUS_CREATING_FROM_SNAPSHOT},
910 {'export_locations': 'fake_export2',
911 'status': constants.STATUS_AVAILABLE}]))
912 expected_share_updates = [
913 {
914 'id': 'fake_share_1',
915 'status': constants.STATUS_CREATING_FROM_SNAPSHOT,
916 'export_locations': 'fake_export1',
917 },
918 {
919 'id': 'fake_share_2',
920 'status': constants.STATUS_AVAILABLE,
921 'export_locations': 'fake_export2',
922 },
923 ]
925 share_group_update, share_update = (
926 share_driver.create_share_group_from_share_group_snapshot(
927 'fake_context', fake_share_group_dict,
928 fake_share_group_snapshot_dict))
930 mock_create.assert_has_calls([
931 mock.call(
932 'fake_context',
933 {'id': 'fake_share_1',
934 'source_share_group_snapshot_member_id': 'fake_member_1'},
935 {'id': 'fake_member_1'}),
936 mock.call(
937 'fake_context',
938 {'id': 'fake_share_2',
939 'source_share_group_snapshot_member_id': 'fake_member_2'},
940 {'id': 'fake_member_2'})
941 ])
942 self.assertIsNone(share_group_update)
943 self.assertEqual(expected_share_updates, share_update)
945 def test_update_share_server_security_service(self):
946 share_driver = self._instantiate_share_driver(None, True)
947 self.assertRaises(NotImplementedError,
948 share_driver.update_share_server_security_service,
949 'fake_context',
950 {'id', 'share_server_id'},
951 {'fake', 'fake_net_info'},
952 [{"id": "fake_instance_id"}],
953 [{"id": "fake_rule_id"}],
954 {'id', 'fake_sec_service_id'},
955 current_security_service=None)
957 def test_check_update_share_server_security_service(self):
958 share_driver = self._instantiate_share_driver(None, True)
959 self.assertRaises(
960 NotImplementedError,
961 share_driver.check_update_share_server_security_service,
962 'fake_context',
963 {'id', 'share_server_id'},
964 {'fake', 'fake_net_info'},
965 [{"id": "fake_instance_id"}],
966 [{"id": "fake_rule_id"}],
967 {'id', 'fake_sec_service_id'},
968 current_security_service=None)
970 def test_check_update_share_server_network_allocations(self):
971 share_driver = self._instantiate_share_driver(None, True)
972 self.assertRaises(
973 NotImplementedError,
974 share_driver.check_update_share_server_network_allocations,
975 'fake_context',
976 {'id', 'share_server_id'},
977 {'admin_network_allocations': [], 'subnets': []},
978 {"id": "fake_subnet_id"},
979 [{"id": "fake_security_service_id"}],
980 [{'id', 'fake_share_instance_id'}],
981 [{"id": "fake_rule_id"}])
983 def test_update_share_server_network_allocations(self):
984 share_driver = self._instantiate_share_driver(None, True)
985 self.assertRaises(
986 NotImplementedError,
987 share_driver.update_share_server_network_allocations,
988 'fake_context',
989 {'id', 'share_server_id'},
990 {'admin_network_allocations': [], 'subnets': []},
991 {
992 'share_network_subnet_id': 'fake_share_network_subnet_id',
993 'neutron_net_id': 'fake_neutron_net_id',
994 'neutron_subnet_id': 'fake_neutron_subnet_id',
995 'network_allocations': []
996 },
997 [{"id": "fake_security_service_id"}],
998 [{"id": "fake_share_id"}],
999 [{"id": "fake_snapshot_id"}])
1001 def test_create_share_group_from_sg_snapshot_with_no_members(self):
1002 share_driver = self._instantiate_share_driver(None, False)
1003 fake_share_group_dict = {}
1004 fake_share_group_snapshot_dict = {'share_group_snapshot_members': []}
1006 share_group_update, share_update = (
1007 share_driver.create_share_group_from_share_group_snapshot(
1008 'fake_context', fake_share_group_dict,
1009 fake_share_group_snapshot_dict))
1011 self.assertIsNone(share_group_update)
1012 self.assertIsNone(share_update)
1014 def test_create_share_group_snapshot(self):
1015 fake_snap_member_1 = {
1016 'id': '6813e06b-a8f5-4784-b17d-f3e91afa370e',
1017 'share_id': 'a3ebdba5-b4e1-46c8-a0ea-a9ac8daf5296',
1018 'share_group_snapshot_id': 'fake_share_group_snapshot_id',
1019 'share_instance_id': 'fake_share_instance_id_1',
1020 'provider_location': 'should_not_be_used_1',
1021 'share_name': 'share_fake_share_instance_id_1',
1022 'name': 'share-snapshot-6813e06b-a8f5-4784-b17d-f3e91afa370e',
1023 'share': {
1024 'id': '420f978b-dbf6-4b3c-92fe-f5b17a0bb5e2',
1025 'size': 3,
1026 'share_proto': 'fake_share_proto',
1027 },
1028 }
1029 fake_snap_member_2 = {
1030 'id': '1e010dfe-545b-432d-ab95-4ef03cd82f89',
1031 'share_id': 'a3ebdba5-b4e1-46c8-a0ea-a9ac8daf5296',
1032 'share_group_snapshot_id': 'fake_share_group_snapshot_id',
1033 'share_instance_id': 'fake_share_instance_id_2',
1034 'provider_location': 'should_not_be_used_2',
1035 'share_name': 'share_fake_share_instance_id_2',
1036 'name': 'share-snapshot-1e010dfe-545b-432d-ab95-4ef03cd82f89',
1037 'share': {
1038 'id': '420f978b-dbf6-4b3c-92fe-f5b17a0bb5e2',
1039 'size': '2',
1040 'share_proto': 'fake_share_proto',
1041 },
1042 }
1043 fake_snap_dict = {
1044 'status': 'available',
1045 'project_id': '13c0be6290934bd98596cfa004650049',
1046 'user_id': 'a0314a441ca842019b0952224aa39192',
1047 'description': None,
1048 'deleted': '0',
1049 'share_group_id': '4b04fdc3-00b9-4909-ba1a-06e9b3f88b67',
1050 'share_group_snapshot_members': [
1051 fake_snap_member_1, fake_snap_member_2],
1052 'deleted_at': None,
1053 'id': 'f6aa3b59-57eb-421e-965c-4e182538e36a',
1054 'name': None
1055 }
1056 share_driver = self._instantiate_share_driver(None, False)
1057 share_driver._stats['snapshot_support'] = True
1058 mock_create_snap = self.mock_object(
1059 share_driver, 'create_snapshot',
1060 mock.Mock(side_effect=lambda *args, **kwargs: {
1061 'foo_k': 'foo_v', 'bar_k': 'bar_v_%s' % args[1]['id']}))
1063 share_group_snapshot_update, member_update_list = (
1064 share_driver.create_share_group_snapshot(
1065 'fake_context', fake_snap_dict))
1067 mock_create_snap.assert_has_calls([
1068 mock.call(
1069 'fake_context',
1070 {'snapshot_id': member['share_group_snapshot_id'],
1071 'share_id': member['share_id'],
1072 'share_instance_id': member['share']['id'],
1073 'id': member['id'],
1074 'share': member['share'],
1075 'share_name': member['share_name'],
1076 'name': member['name'],
1077 'size': member['share']['size'],
1078 'share_size': member['share']['size'],
1079 'share_proto': member['share']['share_proto'],
1080 'provider_location': None},
1081 share_server=None)
1082 for member in (fake_snap_member_1, fake_snap_member_2)
1083 ])
1084 self.assertIsNone(share_group_snapshot_update)
1085 self.assertEqual(
1086 [{'id': member['id'], 'foo_k': 'foo_v',
1087 'bar_k': 'bar_v_%s' % member['id']}
1088 for member in (fake_snap_member_1, fake_snap_member_2)],
1089 member_update_list,
1090 )
1092 def test_create_share_group_snapshot_failed_snapshot(self):
1093 fake_snap_member_1 = {
1094 'id': '6813e06b-a8f5-4784-b17d-f3e91afa370e',
1095 'share_id': 'a3ebdba5-b4e1-46c8-a0ea-a9ac8daf5296',
1096 'share_group_snapshot_id': 'fake_share_group_snapshot_id',
1097 'share_instance_id': 'fake_share_instance_id_1',
1098 'provider_location': 'should_not_be_used_1',
1099 'share_name': 'share_fake_share_instance_id_1',
1100 'name': 'share-snapshot-6813e06b-a8f5-4784-b17d-f3e91afa370e',
1101 'share': {
1102 'id': '420f978b-dbf6-4b3c-92fe-f5b17a0bb5e2',
1103 'size': 3,
1104 'share_proto': 'fake_share_proto',
1105 },
1106 }
1107 fake_snap_member_2 = {
1108 'id': '1e010dfe-545b-432d-ab95-4ef03cd82f89',
1109 'share_id': 'a3ebdba5-b4e1-46c8-a0ea-a9ac8daf5296',
1110 'share_group_snapshot_id': 'fake_share_group_snapshot_id',
1111 'share_instance_id': 'fake_share_instance_id_2',
1112 'provider_location': 'should_not_be_used_2',
1113 'share_name': 'share_fake_share_instance_id_2',
1114 'name': 'share-snapshot-1e010dfe-545b-432d-ab95-4ef03cd82f89',
1115 'share': {
1116 'id': '420f978b-dbf6-4b3c-92fe-f5b17a0bb5e2',
1117 'size': '2',
1118 'share_proto': 'fake_share_proto',
1119 },
1120 }
1121 fake_snap_dict = {
1122 'status': 'available',
1123 'project_id': '13c0be6290934bd98596cfa004650049',
1124 'user_id': 'a0314a441ca842019b0952224aa39192',
1125 'description': None,
1126 'deleted': '0',
1127 'share_group_id': '4b04fdc3-00b9-4909-ba1a-06e9b3f88b67',
1128 'share_group_snapshot_members': [
1129 fake_snap_member_1, fake_snap_member_2],
1130 'deleted_at': None,
1131 'id': 'f6aa3b59-57eb-421e-965c-4e182538e36a',
1132 'name': None
1133 }
1134 expected_exception = exception.ManilaException
1136 share_driver = self._instantiate_share_driver(None, False)
1137 share_driver._stats['snapshot_support'] = True
1138 mock_create_snap = self.mock_object(
1139 share_driver, 'create_snapshot',
1140 mock.Mock(side_effect=[None, expected_exception]))
1141 mock_delete_snap = self.mock_object(share_driver, 'delete_snapshot')
1143 self.assertRaises(
1144 expected_exception,
1145 share_driver.create_share_group_snapshot,
1146 'fake_context', fake_snap_dict)
1148 fake_snap_member_1_expected = {
1149 'snapshot_id': fake_snap_member_1['share_group_snapshot_id'],
1150 'share_id': fake_snap_member_1['share_id'],
1151 'share_instance_id': fake_snap_member_1['share']['id'],
1152 'id': fake_snap_member_1['id'],
1153 'share': fake_snap_member_1['share'],
1154 'share_name': fake_snap_member_1['share_name'],
1155 'name': fake_snap_member_1['name'],
1156 'size': fake_snap_member_1['share']['size'],
1157 'share_size': fake_snap_member_1['share']['size'],
1158 'share_proto': fake_snap_member_1['share']['share_proto'],
1159 'provider_location': None,
1160 }
1161 mock_create_snap.assert_has_calls([
1162 mock.call(
1163 'fake_context',
1164 {'snapshot_id': member['share_group_snapshot_id'],
1165 'share_id': member['share_id'],
1166 'share_instance_id': member['share']['id'],
1167 'id': member['id'],
1168 'share': member['share'],
1169 'share_name': member['share_name'],
1170 'name': member['name'],
1171 'size': member['share']['size'],
1172 'share_size': member['share']['size'],
1173 'share_proto': member['share']['share_proto'],
1174 'provider_location': None},
1175 share_server=None)
1176 for member in (fake_snap_member_1, fake_snap_member_2)
1177 ])
1178 mock_delete_snap.assert_called_with(
1179 'fake_context', fake_snap_member_1_expected, share_server=None)
1181 def test_create_share_group_snapshot_no_support(self):
1182 fake_snap_dict = {
1183 'status': 'available',
1184 'project_id': '13c0be6290934bd98596cfa004650049',
1185 'user_id': 'a0314a441ca842019b0952224aa39192',
1186 'description': None,
1187 'deleted': '0',
1188 'share_group_id': '4b04fdc3-00b9-4909-ba1a-06e9b3f88b67',
1189 'share_group_snapshot_members': [
1190 {
1191 'status': 'available',
1192 'share_type_id': '1a9ed31e-ee70-483d-93ba-89690e028d7f',
1193 'user_id': 'a0314a441ca842019b0952224aa39192',
1194 'deleted': 'False',
1195 'share_proto': 'NFS',
1196 'project_id': '13c0be6290934bd98596cfa004650049',
1197 'share_group_snapshot_id':
1198 'f6aa3b59-57eb-421e-965c-4e182538e36a',
1199 'deleted_at': None,
1200 'id': '6813e06b-a8f5-4784-b17d-f3e91afa370e',
1201 'size': 1
1202 },
1203 ],
1204 'deleted_at': None,
1205 'id': 'f6aa3b59-57eb-421e-965c-4e182538e36a',
1206 'name': None
1207 }
1208 share_driver = self._instantiate_share_driver(None, False)
1209 share_driver._stats['snapshot_support'] = False
1211 self.assertRaises(
1212 exception.ShareGroupSnapshotNotSupported,
1213 share_driver.create_share_group_snapshot,
1214 'fake_context', fake_snap_dict)
1216 def test_create_share_group_snapshot_no_members(self):
1217 fake_snap_dict = {
1218 'status': 'available',
1219 'project_id': '13c0be6290934bd98596cfa004650049',
1220 'user_id': 'a0314a441ca842019b0952224aa39192',
1221 'description': None,
1222 'deleted': '0',
1223 'share_group_id': '4b04fdc3-00b9-4909-ba1a-06e9b3f88b67',
1224 'share_group_snapshot_members': [],
1225 'deleted_at': None,
1226 'id': 'f6aa3b59-57eb-421e-965c-4e182538e36a',
1227 'name': None
1228 }
1229 share_driver = self._instantiate_share_driver(None, False)
1230 share_driver._stats['snapshot_support'] = True
1232 share_group_snapshot_update, member_update_list = (
1233 share_driver.create_share_group_snapshot(
1234 'fake_context', fake_snap_dict))
1236 self.assertIsNone(share_group_snapshot_update)
1237 self.assertIsNone(member_update_list)
1239 def test_delete_share_group_snapshot(self):
1240 fake_snap_member_1 = {
1241 'id': '6813e06b-a8f5-4784-b17d-f3e91afa370e',
1242 'share_id': 'a3ebdba5-b4e1-46c8-a0ea-a9ac8daf5296',
1243 'share_group_snapshot_id': 'fake_share_group_snapshot_id',
1244 'share_instance_id': 'fake_share_instance_id_1',
1245 'provider_location': 'fake_provider_location_2',
1246 'share_name': 'share_fake_share_instance_id_1',
1247 'name': 'share-snapshot-6813e06b-a8f5-4784-b17d-f3e91afa370e',
1248 'share': {
1249 'id': '420f978b-dbf6-4b3c-92fe-f5b17a0bb5e2',
1250 'size': 3,
1251 'share_proto': 'fake_share_proto',
1252 },
1253 }
1254 fake_snap_member_2 = {
1255 'id': '1e010dfe-545b-432d-ab95-4ef03cd82f89',
1256 'share_id': 'a3ebdba5-b4e1-46c8-a0ea-a9ac8daf5296',
1257 'share_group_snapshot_id': 'fake_share_group_snapshot_id',
1258 'share_instance_id': 'fake_share_instance_id_2',
1259 'provider_location': 'fake_provider_location_2',
1260 'share_name': 'share_fake_provider_location_2',
1261 'name': 'share-snapshot-1e010dfe-545b-432d-ab95-4ef03cd82f89',
1262 'share': {
1263 'id': '420f978b-dbf6-4b3c-92fe-f5b17a0bb5e2',
1264 'size': '2',
1265 'share_proto': 'fake_share_proto',
1266 },
1267 }
1268 fake_snap_dict = {
1269 'status': 'available',
1270 'project_id': '13c0be6290934bd98596cfa004650049',
1271 'user_id': 'a0314a441ca842019b0952224aa39192',
1272 'description': None,
1273 'deleted': '0',
1274 'share_group_id': '4b04fdc3-00b9-4909-ba1a-06e9b3f88b67',
1275 'share_group_snapshot_members': [
1276 fake_snap_member_1, fake_snap_member_2],
1277 'deleted_at': None,
1278 'id': 'f6aa3b59-57eb-421e-965c-4e182538e36a',
1279 'name': None
1280 }
1282 share_driver = self._instantiate_share_driver(None, False)
1283 share_driver._stats['share_group_snapshot_support'] = True
1284 mock_delete_snap = self.mock_object(share_driver, 'delete_snapshot')
1286 share_group_snapshot_update, member_update_list = (
1287 share_driver.delete_share_group_snapshot(
1288 'fake_context', fake_snap_dict))
1290 mock_delete_snap.assert_has_calls([
1291 mock.call(
1292 'fake_context',
1293 {'snapshot_id': member['share_group_snapshot_id'],
1294 'share_id': member['share_id'],
1295 'share_instance_id': member['share']['id'],
1296 'id': member['id'],
1297 'share': member['share'],
1298 'size': member['share']['size'],
1299 'share_size': member['share']['size'],
1300 'share_name': member['share_name'],
1301 'name': member['name'],
1302 'share_proto': member['share']['share_proto'],
1303 'provider_location': member['provider_location']},
1304 share_server=None)
1305 for member in (fake_snap_member_1, fake_snap_member_2)
1306 ])
1307 self.assertIsNone(share_group_snapshot_update)
1308 self.assertIsNone(member_update_list)
1310 def test_snapshot_update_access(self):
1311 share_driver = self._instantiate_share_driver(None, False)
1312 self.assertRaises(NotImplementedError,
1313 share_driver.snapshot_update_access,
1314 'fake_context', 'fake_snapshot', ['r1', 'r2'],
1315 [], [])
1317 @ddt.data({'user_networks': set([4]), 'conf': [4],
1318 'expected': {'ipv4': True, 'ipv6': False}},
1319 {'user_networks': set([6]), 'conf': [4],
1320 'expected': {'ipv4': False, 'ipv6': False}},
1321 {'user_networks': set([4, 6]), 'conf': [4],
1322 'expected': {'ipv4': True, 'ipv6': False}},
1323 {'user_networks': set([4]), 'conf': [6],
1324 'expected': {'ipv4': False, 'ipv6': False}},
1325 {'user_networks': set([6]), 'conf': [6],
1326 'expected': {'ipv4': False, 'ipv6': True}},
1327 {'user_networks': set([4, 6]), 'conf': [6],
1328 'expected': {'ipv4': False, 'ipv6': True}},
1329 {'user_networks': set([4]), 'conf': [4, 6],
1330 'expected': {'ipv4': True, 'ipv6': False}},
1331 {'user_networks': set([6]), 'conf': [4, 6],
1332 'expected': {'ipv4': False, 'ipv6': True}},
1333 {'user_networks': set([4, 6]), 'conf': [4, 6],
1334 'expected': {'ipv4': True, 'ipv6': True}},
1335 )
1336 @ddt.unpack
1337 def test_add_ip_version_capability_if_dhss_true(self,
1338 user_networks,
1339 conf,
1340 expected):
1341 share_driver = self._instantiate_share_driver(None, True)
1342 self.mock_object(share_driver, 'get_configured_ip_versions',
1343 mock.Mock(return_value=conf))
1344 versions = mock.PropertyMock(return_value=user_networks)
1345 type(share_driver.network_api).enabled_ip_versions = versions
1346 data = {'share_backend_name': 'fake_backend'}
1348 result = share_driver.add_ip_version_capability(data)
1350 self.assertIsNotNone(result['ipv4_support'])
1351 self.assertEqual(expected['ipv4'], result['ipv4_support'])
1352 self.assertIsNotNone(result['ipv6_support'])
1353 self.assertEqual(expected['ipv6'], result['ipv6_support'])
1355 @ddt.data({'conf': [4],
1356 'expected': {'ipv4': True, 'ipv6': False}},
1357 {'conf': [6],
1358 'expected': {'ipv4': False, 'ipv6': True}},
1359 {'conf': [4, 6],
1360 'expected': {'ipv4': True, 'ipv6': True}},
1361 )
1362 @ddt.unpack
1363 def test_add_ip_version_capability_if_dhss_false(self, conf, expected):
1364 share_driver = self._instantiate_share_driver(None, False)
1365 self.mock_object(share_driver, 'get_configured_ip_versions',
1366 mock.Mock(return_value=conf))
1367 data = {'share_backend_name': 'fake_backend'}
1368 result = share_driver.add_ip_version_capability(data)
1370 self.assertIsNotNone(result['ipv4_support'])
1371 self.assertEqual(expected['ipv4'], result['ipv4_support'])
1372 self.assertIsNotNone(result['ipv6_support'])
1373 self.assertEqual(expected['ipv6'], result['ipv6_support'])