Coverage for manila/tests/share/drivers/quobyte/test_quobyte.py: 98%
293 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2015 Quobyte, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16from unittest import mock
18from oslo_config import cfg
19from oslo_utils import units
21from manila import context
22from manila import exception
23from manila.share import configuration as config
24from manila.share import driver
25from manila.share.drivers.quobyte import jsonrpc
26from manila.share.drivers.quobyte import quobyte
27from manila import test
28from manila.tests import fake_share
30CONF = cfg.CONF
33def fake_rpc_handler(name, *args, **kwargs):
34 if name == 'resolveVolumeName':
35 return None
36 elif name == 'createVolume':
37 return {'volume_uuid': 'voluuid'}
38 elif name == 'exportVolume':
39 return {'nfs_server_ip': 'fake_location',
40 'nfs_export_path': '/fake_share'}
41 elif name == 'getConfiguration':
42 return {
43 "tenant_configuration": [{
44 "domain_name": "fake_domain_name",
45 "volume_access": [
46 {"volume_uuid": "fake_id_1",
47 "restrict_to_network": "10.0.0.1",
48 "read_only": False},
49 {"volume_uuid": "fake_id_1",
50 "restrict_to_network": "10.0.0.2",
51 "read_only": False},
52 {"volume_uuid": "fake_id_2",
53 "restrict_to_network": "10.0.0.3",
54 "read_only": False}
55 ]},
56 {"domain_name": "fake_domain_name_2",
57 "volume_access": [
58 {"volume_uuid": "fake_id_3",
59 "restrict_to_network": "10.0.0.4",
60 "read_only": False},
61 {"volume_uuid": "fake_id_3",
62 "restrict_to_network": "10.0.0.5",
63 "read_only": True},
64 {"volume_uuid": "fake_id_4",
65 "restrict_to_network": "10.0.0.6",
66 "read_only": False}
67 ]}
68 ]
69 }
70 else:
71 return "Unknown fake rpc handler call"
74def create_fake_access(access_adr,
75 access_id='fake_access_id',
76 access_type='ip',
77 access_level='rw'):
78 return {
79 'access_id': access_id,
80 'access_type': access_type,
81 'access_to': access_adr,
82 'access_level': access_level
83 }
86class QuobyteShareDriverTestCase(test.TestCase):
87 """Tests QuobyteShareDriver."""
89 def setUp(self):
90 super(QuobyteShareDriverTestCase, self).setUp()
92 self._context = context.get_admin_context()
94 CONF.set_default('driver_handles_share_servers', False)
96 self.fake_conf = config.Configuration(None)
97 self._driver = quobyte.QuobyteShareDriver(configuration=self.fake_conf)
98 self._driver.rpc = mock.Mock()
99 self.share = fake_share.fake_share(
100 share_proto='NFS',
101 export_location='fake_location:/quobyte/fake_share')
102 self.access = fake_share.fake_access()
104 @mock.patch('manila.share.drivers.quobyte.jsonrpc.JsonRpc', mock.Mock())
105 def test_do_setup_success(self):
106 self._driver.rpc.call = mock.Mock(return_value=None)
108 self._driver.do_setup(self._context)
110 self._driver.rpc.call.assert_called_with('getInformation', {})
112 @mock.patch('manila.share.drivers.quobyte.jsonrpc.JsonRpc.__init__',
113 mock.Mock(return_value=None))
114 @mock.patch.object(jsonrpc.JsonRpc, 'call',
115 side_effect=exception.QBRpcException(
116 result='fake_result',
117 qbcode=666))
118 def test_do_setup_failure(self, mock_call):
119 self.assertRaises(exception.QBException,
120 self._driver.do_setup, self._context)
122 @mock.patch.object(quobyte.QuobyteShareDriver, "_resize_share")
123 def test_create_share_new_volume(self, qb_resize_mock):
124 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler)
126 result = self._driver.create_share(self._context, self.share)
128 self.assertEqual(self.share['export_location'], result)
129 self._driver.rpc.call.assert_has_calls([
130 mock.call('createVolume', dict(
131 name=self.share['name'],
132 tenant_domain=self.share['project_id'],
133 root_user_id=self.fake_conf.quobyte_default_volume_user,
134 root_group_id=self.fake_conf.quobyte_default_volume_group,
135 configuration_name=self.fake_conf.quobyte_volume_configuration
136 )),
137 mock.call('exportVolume',
138 dict(protocol='NFS', volume_uuid='voluuid'))])
139 qb_resize_mock.assert_called_once_with(self.share, self.share['size'])
141 @mock.patch.object(quobyte.QuobyteShareDriver, "_resize_share")
142 def test_create_share_existing_volume(self, qb_resize_mock):
143 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler)
145 result = self._driver.create_share(self._context, self.share)
147 self.assertEqual(self.share['export_location'], result)
148 resolv_params = {'tenant_domain': 'fake_project_uuid',
149 'volume_name': 'fakename'}
150 sett_params = {'tenant': {'tenant_id': 'fake_project_uuid'}}
151 create_params = dict(
152 name='fakename',
153 tenant_domain='fake_project_uuid',
154 root_user_id='root',
155 root_group_id='root',
156 configuration_name='BASE')
157 self._driver.rpc.call.assert_has_calls([
158 mock.call('resolveVolumeName', resolv_params,
159 [jsonrpc.ERROR_ENOENT, jsonrpc.ERROR_ENTITY_NOT_FOUND]),
160 mock.call('setTenant', sett_params,
161 expected_errors=[jsonrpc.ERROR_GARBAGE_ARGS]),
162 mock.call('createVolume', create_params),
163 mock.call('exportVolume', dict(protocol='NFS',
164 volume_uuid='voluuid'))])
165 qb_resize_mock.assert_called_once_with(self.share, self.share['size'])
167 def test_create_share_wrong_protocol(self):
168 share = {'share_proto': 'WRONG_PROTOCOL'}
170 self.assertRaises(exception.QBException,
171 self._driver.create_share,
172 context=None,
173 share=share)
175 def test_delete_share_existing_volume(self):
176 def rpc_handler(name, *args):
177 if name == 'resolveVolumeName':
178 return {'volume_uuid': 'voluuid'}
179 elif name == 'exportVolume': 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true
180 return {}
182 self._driver.configuration.quobyte_delete_shares = True
183 self._driver.rpc.call = mock.Mock(wraps=rpc_handler)
185 self._driver.delete_share(self._context, self.share)
187 resolv_params = {'volume_name': 'fakename',
188 'tenant_domain': 'fake_project_uuid'}
189 self._driver.rpc.call.assert_has_calls([
190 mock.call('resolveVolumeName', resolv_params,
191 [jsonrpc.ERROR_ENOENT, jsonrpc.ERROR_ENTITY_NOT_FOUND]),
192 mock.call('deleteVolume', {'volume_uuid': 'voluuid'})])
194 def test_delete_share_existing_volume_disabled(self):
195 def rpc_handler(name, *args):
196 if name == 'resolveVolumeName':
197 return {'volume_uuid': 'voluuid'}
198 elif name == 'exportVolume': 198 ↛ exitline 198 didn't return from function 'rpc_handler' because the condition on line 198 was always true
199 return {}
201 CONF.set_default('quobyte_delete_shares', False)
202 self._driver.rpc.call = mock.Mock(wraps=rpc_handler)
204 self._driver.delete_share(self._context, self.share)
206 self._driver.rpc.call.assert_called_with(
207 'exportVolume', {'volume_uuid': 'voluuid', 'remove_export': True})
209 @mock.patch.object(quobyte.LOG, 'warning')
210 def test_delete_share_nonexisting_volume(self, mock_warning):
211 def rpc_handler(name, *args):
212 if name == 'resolveVolumeName': 212 ↛ exitline 212 didn't return from function 'rpc_handler' because the condition on line 212 was always true
213 return None
215 self._driver.rpc.call = mock.Mock(wraps=rpc_handler)
217 self._driver.delete_share(self._context, self.share)
219 mock_warning.assert_called_with(
220 'No volume found for share %(project_id)s/%(name)s',
221 {'project_id': 'fake_project_uuid', 'name': 'fakename'})
223 def test_allow_access(self):
224 def rpc_handler(name, *args):
225 if name == 'resolveVolumeName':
226 return {'volume_uuid': 'voluuid'}
227 elif name == 'exportVolume': 227 ↛ exitline 227 didn't return from function 'rpc_handler' because the condition on line 227 was always true
228 return {'nfs_server_ip': '10.10.1.1',
229 'nfs_export_path': '/voluuid'}
231 self._driver.rpc.call = mock.Mock(wraps=rpc_handler)
233 self._driver._allow_access(self._context, self.share, self.access)
235 exp_params = {'volume_uuid': 'voluuid',
236 'read_only': False,
237 'add_allow_ip': '10.0.0.1'}
238 self._driver.rpc.call.assert_called_with('exportVolume', exp_params)
240 def test_allow_ro_access(self):
241 def rpc_handler(name, *args):
242 if name == 'resolveVolumeName':
243 return {'volume_uuid': 'voluuid'}
244 elif name == 'exportVolume': 244 ↛ exitline 244 didn't return from function 'rpc_handler' because the condition on line 244 was always true
245 return {'nfs_server_ip': '10.10.1.1',
246 'nfs_export_path': '/voluuid'}
248 self._driver.rpc.call = mock.Mock(wraps=rpc_handler)
249 ro_access = fake_share.fake_access(access_level='ro')
251 self._driver._allow_access(self._context, self.share, ro_access)
253 exp_params = {'volume_uuid': 'voluuid',
254 'read_only': True,
255 'add_allow_ip': '10.0.0.1'}
256 self._driver.rpc.call.assert_called_with('exportVolume', exp_params)
258 def test_allow_access_nonip(self):
259 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler)
261 self.access = fake_share.fake_access(**{"access_type":
262 "non_existant_access_type"})
264 self.assertRaises(exception.InvalidShareAccess,
265 self._driver._allow_access,
266 self._context, self.share, self.access)
268 def test_deny_access(self):
269 def rpc_handler(name, *args):
270 if name == 'resolveVolumeName':
271 return {'volume_uuid': 'voluuid'}
272 elif name == 'exportVolume': 272 ↛ exitline 272 didn't return from function 'rpc_handler' because the condition on line 272 was always true
273 return {'nfs_server_ip': '10.10.1.1',
274 'nfs_export_path': '/voluuid'}
276 self._driver.rpc.call = mock.Mock(wraps=rpc_handler)
278 self._driver._deny_access(self._context, self.share, self.access)
280 self._driver.rpc.call.assert_called_with(
281 'exportVolume',
282 {'volume_uuid': 'voluuid', 'remove_allow_ip': '10.0.0.1'})
284 @mock.patch.object(quobyte.LOG, 'debug')
285 def test_deny_access_nonip(self, mock_debug):
286 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler)
287 self.access = fake_share.fake_access(
288 access_type="non_existant_access_type")
290 self._driver._deny_access(self._context, self.share, self.access)
292 mock_debug.assert_called_with(
293 'Quobyte driver only supports ip access control. '
294 'Ignoring deny access call for %s , %s',
295 'fakename', 'fake_project_uuid')
297 def test_resolve_volume_name(self):
298 self._driver.rpc.call = mock.Mock(
299 return_value={'volume_uuid': 'fake_uuid'})
301 self._driver._resolve_volume_name('fake_vol_name', 'fake_domain_name')
303 exp_params = {'volume_name': 'fake_vol_name',
304 'tenant_domain': 'fake_domain_name'}
305 self._driver.rpc.call.assert_called_with(
306 'resolveVolumeName', exp_params,
307 [jsonrpc.ERROR_ENOENT, jsonrpc.ERROR_ENTITY_NOT_FOUND])
309 def test_resolve_volume_name_NOENT(self):
310 self._driver.rpc.call = mock.Mock(
311 return_value=None)
313 self.assertIsNone(
314 self._driver._resolve_volume_name('fake_vol_name',
315 'fake_domain_name'))
316 self._driver.rpc.call.assert_called_once_with(
317 'resolveVolumeName',
318 dict(volume_name='fake_vol_name',
319 tenant_domain='fake_domain_name'),
320 [jsonrpc.ERROR_ENOENT, jsonrpc.ERROR_ENTITY_NOT_FOUND]
321 )
323 def test_resolve_volume_name_other_error(self):
324 self._driver.rpc.call = mock.Mock(
325 side_effect=exception.QBRpcException(
326 result='fubar',
327 qbcode=666))
329 self.assertRaises(exception.QBRpcException,
330 self._driver._resolve_volume_name,
331 volume_name='fake_vol_name',
332 tenant_domain='fake_domain_name')
334 @mock.patch.object(driver.ShareDriver, '_update_share_stats')
335 def test_update_share_stats(self, mock_uss):
336 self._driver._get_capacities = mock.Mock(return_value=[42, 23])
338 self._driver._update_share_stats()
340 mock_uss.assert_called_once_with(
341 dict(storage_protocol='NFS',
342 vendor_name='Quobyte',
343 share_backend_name=self._driver.backend_name,
344 driver_version=self._driver.DRIVER_VERSION,
345 total_capacity_gb=42,
346 free_capacity_gb=23,
347 reserved_percentage=0,
348 reserved_snapshot_percentage=0,
349 reserved_share_extend_percentage=0))
351 def test_get_capacities_gb(self):
352 capval = 42115548133
353 useval = 19695128917
354 replfact = 3
355 self._driver._get_qb_replication_factor = mock.Mock(
356 return_value=replfact)
357 self._driver.rpc.call = mock.Mock(
358 return_value={'total_physical_capacity': str(capval),
359 'total_physical_usage': str(useval)})
361 self.assertEqual((39.223160718, 6.960214182),
362 self._driver._get_capacities())
364 def test_get_capacities_gb_full(self):
365 capval = 1024 * 1024 * 1024 * 3
366 useval = 1024 * 1024 * 1024 * 3 + 1
367 replfact = 1
368 self._driver._get_qb_replication_factor = mock.Mock(
369 return_value=replfact)
370 self._driver.rpc.call = mock.Mock(
371 return_value={'total_physical_capacity': str(capval),
372 'total_physical_usage': str(useval)})
374 self.assertEqual((3.0, 0), self._driver._get_capacities())
376 def test_get_replication(self):
377 fakerepl = 42
378 self._driver.configuration.quobyte_volume_configuration = 'fakeVolConf'
379 self._driver.rpc.call = mock.Mock(
380 return_value={'configuration':
381 {'volume_metadata_configuration':
382 {'replication_factor':
383 str(fakerepl)}}})
385 self.assertEqual(fakerepl, self._driver._get_qb_replication_factor())
387 @mock.patch.object(quobyte.QuobyteShareDriver,
388 "_resolve_volume_name",
389 return_value="fake_uuid")
390 def test_ensure_share(self, mock_qb_resolve_volname):
391 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler)
393 result = self._driver.ensure_share(self._context, self.share, None)
395 self.assertEqual(self.share["export_location"], result)
396 (mock_qb_resolve_volname.
397 assert_called_once_with(self.share['name'],
398 self.share['project_id']))
399 self._driver.rpc.call.assert_has_calls([
400 mock.call('exportVolume', dict(
401 volume_uuid="fake_uuid",
402 protocol='NFS'
403 ))])
405 @mock.patch.object(quobyte.QuobyteShareDriver,
406 "_resolve_volume_name",
407 return_value=None)
408 def test_ensure_deleted_share(self, mock_qb_resolve_volname):
409 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler)
411 self.assertRaises(exception.ShareResourceNotFound,
412 self._driver.ensure_share,
413 self._context, self.share, None)
414 (mock_qb_resolve_volname.
415 assert_called_once_with(self.share['name'],
416 self.share['project_id']))
418 @mock.patch.object(quobyte.QuobyteShareDriver, "_resize_share")
419 def test_extend_share(self, mock_qsd_resize_share):
420 self._driver.extend_share(ext_share=self.share,
421 ext_size=2,
422 share_server=None)
423 mock_qsd_resize_share.assert_called_once_with(share=self.share,
424 new_size=2)
426 @mock.patch.object(quobyte.QuobyteShareDriver, "_resolve_volume_name",
427 return_value="fake_volume_uuid")
428 def test_resize_share(self, mock_qb_resolv):
429 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler)
430 manila_size = 7
431 newsize_bytes = manila_size * units.Gi
433 self._driver._resize_share(share=self.share, new_size=manila_size)
435 exp_params = {
436 "quotas": [{
437 "consumer": [{
438 "type": "VOLUME",
439 "identifier": "fake_volume_uuid",
440 "tenant_id": self.share["project_id"]
441 }],
442 "limits": [{
443 "type": "LOGICAL_DISK_SPACE",
444 "value": newsize_bytes,
445 }],
446 }]}
447 self._driver.rpc.call.assert_has_calls([
448 mock.call('setQuota', exp_params)])
449 mock_qb_resolv.assert_called_once_with(self.share['name'],
450 self.share['project_id'])
452 @mock.patch.object(quobyte.QuobyteShareDriver,
453 "_resolve_volume_name",
454 return_value="fake_id_3")
455 def test_fetch_existing_access(self, mock_qb_resolve_volname):
456 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler)
457 old_access_1 = create_fake_access(access_id="old_1",
458 access_adr="10.0.0.4")
459 old_access_2 = create_fake_access(access_id="old_2",
460 access_adr="10.0.0.5")
462 exist_list = self._driver._fetch_existing_access(context=self._context,
463 share=self.share)
465 # assert expected result here
466 self.assertEqual([old_access_1['access_to'],
467 old_access_2['access_to']],
468 [e.get('access_to') for e in exist_list])
469 (mock_qb_resolve_volname.
470 assert_called_once_with(self.share['name'],
471 self.share['project_id']))
473 @mock.patch.object(quobyte.QuobyteShareDriver, "_resize_share")
474 def test_shrink_share(self, mock_qsd_resize_share):
475 self._driver.shrink_share(shrink_share=self.share,
476 shrink_size=3,
477 share_server=None)
478 mock_qsd_resize_share.assert_called_once_with(share=self.share,
479 new_size=3)
481 def test_subtract_access_lists(self):
482 access_1 = create_fake_access(access_id="new_1",
483 access_adr="10.0.0.5",
484 access_type="rw",)
485 access_2 = create_fake_access(access_id="old_1",
486 access_adr="10.0.0.1",
487 access_type="rw")
488 access_3 = create_fake_access(access_id="old_2",
489 access_adr="10.0.0.3",
490 access_type="ro")
491 access_4 = create_fake_access(access_id="new_2",
492 access_adr="10.0.0.6",
493 access_type="rw")
494 access_5 = create_fake_access(access_id="old_3",
495 access_adr="10.0.0.4",
496 access_type="rw")
497 min_list = [access_1, access_2, access_3, access_4]
498 sub_list = [access_5, access_3, access_2]
500 self.assertEqual([access_1, access_4],
501 self._driver._subtract_access_lists(min_list,
502 sub_list))
504 def test_subtract_access_lists_level(self):
505 access_1 = create_fake_access(access_id="new_1",
506 access_adr="10.0.0.5",
507 access_level="rw")
508 access_2 = create_fake_access(access_id="old_1",
509 access_adr="10.0.0.1",
510 access_level="rw")
511 access_3 = create_fake_access(access_id="old_2",
512 access_adr="10.0.0.3",
513 access_level="rw")
514 access_4 = create_fake_access(access_id="new_2",
515 access_adr="10.0.0.6",
516 access_level="rw")
517 access_5 = create_fake_access(access_id="old_2_ro",
518 access_adr="10.0.0.3",
519 access_level="ro")
520 min_list = [access_1, access_2, access_3, access_4]
521 sub_list = [access_5, access_2]
523 self.assertEqual([access_1, access_3, access_4],
524 self._driver._subtract_access_lists(min_list,
525 sub_list))
527 def test_subtract_access_lists_type(self):
528 access_1 = create_fake_access(access_id="new_1",
529 access_adr="10.0.0.5",
530 access_type="ip")
531 access_2 = create_fake_access(access_id="old_1",
532 access_adr="10.0.0.1",
533 access_type="ip")
534 access_3 = create_fake_access(access_id="old_2",
535 access_adr="10.0.0.3",
536 access_type="ip")
537 access_4 = create_fake_access(access_id="new_2",
538 access_adr="10.0.0.6",
539 access_type="ip")
540 access_5 = create_fake_access(access_id="old_2_ro",
541 access_adr="10.0.0.3",
542 access_type="other")
543 min_list = [access_1, access_2, access_3, access_4]
544 sub_list = [access_5, access_2]
546 self.assertEqual([access_1, access_3, access_4],
547 self._driver._subtract_access_lists(min_list,
548 sub_list))
550 @mock.patch.object(quobyte.QuobyteShareDriver, "_allow_access")
551 @mock.patch.object(quobyte.QuobyteShareDriver, "_deny_access")
552 def test_update_access_add_delete(self, qb_deny_mock, qb_allow_mock):
553 access_1 = create_fake_access(access_id="new_1",
554 access_adr="10.0.0.5",
555 access_level="rw")
556 access_2 = create_fake_access(access_id="old_1",
557 access_adr="10.0.0.1",
558 access_level="rw")
559 access_3 = create_fake_access(access_id="old_2",
560 access_adr="10.0.0.3",
561 access_level="rw")
563 self._driver.update_access(self._context,
564 self.share,
565 access_rules=None,
566 add_rules=[access_1],
567 delete_rules=[access_2, access_3],
568 update_rules=[])
570 qb_allow_mock.assert_called_once_with(self._context,
571 self.share, access_1)
572 deny_calls = [mock.call(self._context, self.share, access_2),
573 mock.call(self._context, self.share, access_3)]
574 qb_deny_mock.assert_has_calls(deny_calls)
576 @mock.patch.object(quobyte.LOG, "warning")
577 def test_update_access_no_rules(self, qb_log_mock):
578 self._driver.update_access(context=None, share=None, access_rules=[],
579 add_rules=[], delete_rules=[],
580 update_rules=[])
582 qb_log_mock.assert_has_calls([mock.ANY])
584 @mock.patch.object(quobyte.QuobyteShareDriver, "_subtract_access_lists")
585 @mock.patch.object(quobyte.QuobyteShareDriver, "_fetch_existing_access")
586 @mock.patch.object(quobyte.QuobyteShareDriver, "_allow_access")
587 def test_update_access_recovery_additionals(self,
588 qb_allow_mock,
589 qb_exist_mock,
590 qb_subtr_mock):
591 new_access_1 = create_fake_access(access_id="new_1",
592 access_adr="10.0.0.2")
593 old_access = create_fake_access(access_id="fake_access_id",
594 access_adr="10.0.0.1")
595 new_access_2 = create_fake_access(access_id="new_2",
596 access_adr="10.0.0.3")
597 add_access_rules = [new_access_1,
598 old_access,
599 new_access_2]
600 qb_exist_mock.return_value = [old_access]
601 qb_subtr_mock.side_effect = [[new_access_1, new_access_2], []]
603 self._driver.update_access(self._context, self.share,
604 access_rules=add_access_rules, add_rules=[],
605 delete_rules=[], update_rules=[])
607 assert_calls = [mock.call(self._context, self.share, new_access_1),
608 mock.call(self._context, self.share, new_access_2)]
609 qb_allow_mock.assert_has_calls(assert_calls, any_order=True)
610 qb_exist_mock.assert_called_once_with(self._context, self.share)
612 @mock.patch.object(quobyte.QuobyteShareDriver, "_subtract_access_lists")
613 @mock.patch.object(quobyte.QuobyteShareDriver, "_fetch_existing_access")
614 @mock.patch.object(quobyte.QuobyteShareDriver, "_deny_access")
615 def test_update_access_recovery_superfluous(self,
616 qb_deny_mock,
617 qb_exist_mock,
618 qb_subtr_mock):
620 old_access_1 = create_fake_access(access_id="old_1",
621 access_adr="10.0.0.1")
622 missing_access_1 = create_fake_access(access_id="mis_1",
623 access_adr="10.0.0.2")
624 old_access_2 = create_fake_access(access_id="old_2",
625 access_adr="10.0.0.3")
626 qb_exist_mock.side_effect = [[old_access_1, old_access_2]]
627 qb_subtr_mock.side_effect = [[], [missing_access_1]]
628 old_access_rules = [old_access_1, old_access_2]
630 self._driver.update_access(self._context, self.share,
631 access_rules=old_access_rules, add_rules=[],
632 delete_rules=[], update_rules=[])
634 qb_deny_mock.assert_called_once_with(self._context,
635 self.share,
636 (missing_access_1))
637 qb_exist_mock.assert_called_once_with(self._context, self.share)
639 @mock.patch.object(quobyte.QuobyteShareDriver, "_subtract_access_lists")
640 @mock.patch.object(quobyte.QuobyteShareDriver, "_fetch_existing_access")
641 @mock.patch.object(quobyte.QuobyteShareDriver, "_deny_access")
642 @mock.patch.object(quobyte.QuobyteShareDriver, "_allow_access")
643 def test_update_access_recovery_add_superfluous(self,
644 qb_allow_mock,
645 qb_deny_mock,
646 qb_exist_mock,
647 qb_subtr_mock):
648 new_access_1 = create_fake_access(access_id="new_1",
649 access_adr="10.0.0.5")
650 old_access_1 = create_fake_access(access_id="old_1",
651 access_adr="10.0.0.1")
652 old_access_2 = create_fake_access(access_id="old_2",
653 access_adr="10.0.0.3")
654 old_access_3 = create_fake_access(access_id="old_3",
655 access_adr="10.0.0.4")
656 miss_access_1 = create_fake_access(access_id="old_3",
657 access_adr="10.0.0.4")
658 new_access_2 = create_fake_access(access_id="new_2",
659 access_adr="10.0.0.3",
660 access_level="ro")
661 new_access_rules = [new_access_1, old_access_1, old_access_2,
662 old_access_3, new_access_2]
663 qb_exist_mock.return_value = [old_access_1, old_access_2,
664 old_access_3, miss_access_1]
665 qb_subtr_mock.side_effect = [[new_access_1, new_access_2],
666 [miss_access_1, old_access_2]]
668 self._driver.update_access(self._context, self.share,
669 new_access_rules, add_rules=[],
670 delete_rules=[], update_rules=[])
672 a_calls = [mock.call(self._context, self.share, new_access_1),
673 mock.call(self._context, self.share, new_access_2)]
674 qb_allow_mock.assert_has_calls(a_calls)
675 b_calls = [mock.call(self._context, self.share, miss_access_1),
676 mock.call(self._context, self.share, old_access_2)]
677 qb_deny_mock.assert_has_calls(b_calls)
678 qb_exist_mock.assert_called_once_with(self._context, self.share)