Coverage for manila/tests/share/drivers/quobyte/test_quobyte.py: 98%

293 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2015 Quobyte, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16from unittest import mock 

17 

18from oslo_config import cfg 

19from oslo_utils import units 

20 

21from manila import context 

22from manila import exception 

23from manila.share import configuration as config 

24from manila.share import driver 

25from manila.share.drivers.quobyte import jsonrpc 

26from manila.share.drivers.quobyte import quobyte 

27from manila import test 

28from manila.tests import fake_share 

29 

30CONF = cfg.CONF 

31 

32 

33def fake_rpc_handler(name, *args, **kwargs): 

34 if name == 'resolveVolumeName': 

35 return None 

36 elif name == 'createVolume': 

37 return {'volume_uuid': 'voluuid'} 

38 elif name == 'exportVolume': 

39 return {'nfs_server_ip': 'fake_location', 

40 'nfs_export_path': '/fake_share'} 

41 elif name == 'getConfiguration': 

42 return { 

43 "tenant_configuration": [{ 

44 "domain_name": "fake_domain_name", 

45 "volume_access": [ 

46 {"volume_uuid": "fake_id_1", 

47 "restrict_to_network": "10.0.0.1", 

48 "read_only": False}, 

49 {"volume_uuid": "fake_id_1", 

50 "restrict_to_network": "10.0.0.2", 

51 "read_only": False}, 

52 {"volume_uuid": "fake_id_2", 

53 "restrict_to_network": "10.0.0.3", 

54 "read_only": False} 

55 ]}, 

56 {"domain_name": "fake_domain_name_2", 

57 "volume_access": [ 

58 {"volume_uuid": "fake_id_3", 

59 "restrict_to_network": "10.0.0.4", 

60 "read_only": False}, 

61 {"volume_uuid": "fake_id_3", 

62 "restrict_to_network": "10.0.0.5", 

63 "read_only": True}, 

64 {"volume_uuid": "fake_id_4", 

65 "restrict_to_network": "10.0.0.6", 

66 "read_only": False} 

67 ]} 

68 ] 

69 } 

70 else: 

71 return "Unknown fake rpc handler call" 

72 

73 

74def create_fake_access(access_adr, 

75 access_id='fake_access_id', 

76 access_type='ip', 

77 access_level='rw'): 

78 return { 

79 'access_id': access_id, 

80 'access_type': access_type, 

81 'access_to': access_adr, 

82 'access_level': access_level 

83 } 

84 

85 

86class QuobyteShareDriverTestCase(test.TestCase): 

87 """Tests QuobyteShareDriver.""" 

88 

89 def setUp(self): 

90 super(QuobyteShareDriverTestCase, self).setUp() 

91 

92 self._context = context.get_admin_context() 

93 

94 CONF.set_default('driver_handles_share_servers', False) 

95 

96 self.fake_conf = config.Configuration(None) 

97 self._driver = quobyte.QuobyteShareDriver(configuration=self.fake_conf) 

98 self._driver.rpc = mock.Mock() 

99 self.share = fake_share.fake_share( 

100 share_proto='NFS', 

101 export_location='fake_location:/quobyte/fake_share') 

102 self.access = fake_share.fake_access() 

103 

104 @mock.patch('manila.share.drivers.quobyte.jsonrpc.JsonRpc', mock.Mock()) 

105 def test_do_setup_success(self): 

106 self._driver.rpc.call = mock.Mock(return_value=None) 

107 

108 self._driver.do_setup(self._context) 

109 

110 self._driver.rpc.call.assert_called_with('getInformation', {}) 

111 

112 @mock.patch('manila.share.drivers.quobyte.jsonrpc.JsonRpc.__init__', 

113 mock.Mock(return_value=None)) 

114 @mock.patch.object(jsonrpc.JsonRpc, 'call', 

115 side_effect=exception.QBRpcException( 

116 result='fake_result', 

117 qbcode=666)) 

118 def test_do_setup_failure(self, mock_call): 

119 self.assertRaises(exception.QBException, 

120 self._driver.do_setup, self._context) 

121 

122 @mock.patch.object(quobyte.QuobyteShareDriver, "_resize_share") 

123 def test_create_share_new_volume(self, qb_resize_mock): 

124 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler) 

125 

126 result = self._driver.create_share(self._context, self.share) 

127 

128 self.assertEqual(self.share['export_location'], result) 

129 self._driver.rpc.call.assert_has_calls([ 

130 mock.call('createVolume', dict( 

131 name=self.share['name'], 

132 tenant_domain=self.share['project_id'], 

133 root_user_id=self.fake_conf.quobyte_default_volume_user, 

134 root_group_id=self.fake_conf.quobyte_default_volume_group, 

135 configuration_name=self.fake_conf.quobyte_volume_configuration 

136 )), 

137 mock.call('exportVolume', 

138 dict(protocol='NFS', volume_uuid='voluuid'))]) 

139 qb_resize_mock.assert_called_once_with(self.share, self.share['size']) 

140 

141 @mock.patch.object(quobyte.QuobyteShareDriver, "_resize_share") 

142 def test_create_share_existing_volume(self, qb_resize_mock): 

143 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler) 

144 

145 result = self._driver.create_share(self._context, self.share) 

146 

147 self.assertEqual(self.share['export_location'], result) 

148 resolv_params = {'tenant_domain': 'fake_project_uuid', 

149 'volume_name': 'fakename'} 

150 sett_params = {'tenant': {'tenant_id': 'fake_project_uuid'}} 

151 create_params = dict( 

152 name='fakename', 

153 tenant_domain='fake_project_uuid', 

154 root_user_id='root', 

155 root_group_id='root', 

156 configuration_name='BASE') 

157 self._driver.rpc.call.assert_has_calls([ 

158 mock.call('resolveVolumeName', resolv_params, 

159 [jsonrpc.ERROR_ENOENT, jsonrpc.ERROR_ENTITY_NOT_FOUND]), 

160 mock.call('setTenant', sett_params, 

161 expected_errors=[jsonrpc.ERROR_GARBAGE_ARGS]), 

162 mock.call('createVolume', create_params), 

163 mock.call('exportVolume', dict(protocol='NFS', 

164 volume_uuid='voluuid'))]) 

165 qb_resize_mock.assert_called_once_with(self.share, self.share['size']) 

166 

167 def test_create_share_wrong_protocol(self): 

168 share = {'share_proto': 'WRONG_PROTOCOL'} 

169 

170 self.assertRaises(exception.QBException, 

171 self._driver.create_share, 

172 context=None, 

173 share=share) 

174 

175 def test_delete_share_existing_volume(self): 

176 def rpc_handler(name, *args): 

177 if name == 'resolveVolumeName': 

178 return {'volume_uuid': 'voluuid'} 

179 elif name == 'exportVolume': 179 ↛ 180line 179 didn't jump to line 180 because the condition on line 179 was never true

180 return {} 

181 

182 self._driver.configuration.quobyte_delete_shares = True 

183 self._driver.rpc.call = mock.Mock(wraps=rpc_handler) 

184 

185 self._driver.delete_share(self._context, self.share) 

186 

187 resolv_params = {'volume_name': 'fakename', 

188 'tenant_domain': 'fake_project_uuid'} 

189 self._driver.rpc.call.assert_has_calls([ 

190 mock.call('resolveVolumeName', resolv_params, 

191 [jsonrpc.ERROR_ENOENT, jsonrpc.ERROR_ENTITY_NOT_FOUND]), 

192 mock.call('deleteVolume', {'volume_uuid': 'voluuid'})]) 

193 

194 def test_delete_share_existing_volume_disabled(self): 

195 def rpc_handler(name, *args): 

196 if name == 'resolveVolumeName': 

197 return {'volume_uuid': 'voluuid'} 

198 elif name == 'exportVolume': 198 ↛ exitline 198 didn't return from function 'rpc_handler' because the condition on line 198 was always true

199 return {} 

200 

201 CONF.set_default('quobyte_delete_shares', False) 

202 self._driver.rpc.call = mock.Mock(wraps=rpc_handler) 

203 

204 self._driver.delete_share(self._context, self.share) 

205 

206 self._driver.rpc.call.assert_called_with( 

207 'exportVolume', {'volume_uuid': 'voluuid', 'remove_export': True}) 

208 

209 @mock.patch.object(quobyte.LOG, 'warning') 

210 def test_delete_share_nonexisting_volume(self, mock_warning): 

211 def rpc_handler(name, *args): 

212 if name == 'resolveVolumeName': 212 ↛ exitline 212 didn't return from function 'rpc_handler' because the condition on line 212 was always true

213 return None 

214 

215 self._driver.rpc.call = mock.Mock(wraps=rpc_handler) 

216 

217 self._driver.delete_share(self._context, self.share) 

218 

219 mock_warning.assert_called_with( 

220 'No volume found for share %(project_id)s/%(name)s', 

221 {'project_id': 'fake_project_uuid', 'name': 'fakename'}) 

222 

223 def test_allow_access(self): 

224 def rpc_handler(name, *args): 

225 if name == 'resolveVolumeName': 

226 return {'volume_uuid': 'voluuid'} 

227 elif name == 'exportVolume': 227 ↛ exitline 227 didn't return from function 'rpc_handler' because the condition on line 227 was always true

228 return {'nfs_server_ip': '10.10.1.1', 

229 'nfs_export_path': '/voluuid'} 

230 

231 self._driver.rpc.call = mock.Mock(wraps=rpc_handler) 

232 

233 self._driver._allow_access(self._context, self.share, self.access) 

234 

235 exp_params = {'volume_uuid': 'voluuid', 

236 'read_only': False, 

237 'add_allow_ip': '10.0.0.1'} 

238 self._driver.rpc.call.assert_called_with('exportVolume', exp_params) 

239 

240 def test_allow_ro_access(self): 

241 def rpc_handler(name, *args): 

242 if name == 'resolveVolumeName': 

243 return {'volume_uuid': 'voluuid'} 

244 elif name == 'exportVolume': 244 ↛ exitline 244 didn't return from function 'rpc_handler' because the condition on line 244 was always true

245 return {'nfs_server_ip': '10.10.1.1', 

246 'nfs_export_path': '/voluuid'} 

247 

248 self._driver.rpc.call = mock.Mock(wraps=rpc_handler) 

249 ro_access = fake_share.fake_access(access_level='ro') 

250 

251 self._driver._allow_access(self._context, self.share, ro_access) 

252 

253 exp_params = {'volume_uuid': 'voluuid', 

254 'read_only': True, 

255 'add_allow_ip': '10.0.0.1'} 

256 self._driver.rpc.call.assert_called_with('exportVolume', exp_params) 

257 

258 def test_allow_access_nonip(self): 

259 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler) 

260 

261 self.access = fake_share.fake_access(**{"access_type": 

262 "non_existant_access_type"}) 

263 

264 self.assertRaises(exception.InvalidShareAccess, 

265 self._driver._allow_access, 

266 self._context, self.share, self.access) 

267 

268 def test_deny_access(self): 

269 def rpc_handler(name, *args): 

270 if name == 'resolveVolumeName': 

271 return {'volume_uuid': 'voluuid'} 

272 elif name == 'exportVolume': 272 ↛ exitline 272 didn't return from function 'rpc_handler' because the condition on line 272 was always true

273 return {'nfs_server_ip': '10.10.1.1', 

274 'nfs_export_path': '/voluuid'} 

275 

276 self._driver.rpc.call = mock.Mock(wraps=rpc_handler) 

277 

278 self._driver._deny_access(self._context, self.share, self.access) 

279 

280 self._driver.rpc.call.assert_called_with( 

281 'exportVolume', 

282 {'volume_uuid': 'voluuid', 'remove_allow_ip': '10.0.0.1'}) 

283 

284 @mock.patch.object(quobyte.LOG, 'debug') 

285 def test_deny_access_nonip(self, mock_debug): 

286 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler) 

287 self.access = fake_share.fake_access( 

288 access_type="non_existant_access_type") 

289 

290 self._driver._deny_access(self._context, self.share, self.access) 

291 

292 mock_debug.assert_called_with( 

293 'Quobyte driver only supports ip access control. ' 

294 'Ignoring deny access call for %s , %s', 

295 'fakename', 'fake_project_uuid') 

296 

297 def test_resolve_volume_name(self): 

298 self._driver.rpc.call = mock.Mock( 

299 return_value={'volume_uuid': 'fake_uuid'}) 

300 

301 self._driver._resolve_volume_name('fake_vol_name', 'fake_domain_name') 

302 

303 exp_params = {'volume_name': 'fake_vol_name', 

304 'tenant_domain': 'fake_domain_name'} 

305 self._driver.rpc.call.assert_called_with( 

306 'resolveVolumeName', exp_params, 

307 [jsonrpc.ERROR_ENOENT, jsonrpc.ERROR_ENTITY_NOT_FOUND]) 

308 

309 def test_resolve_volume_name_NOENT(self): 

310 self._driver.rpc.call = mock.Mock( 

311 return_value=None) 

312 

313 self.assertIsNone( 

314 self._driver._resolve_volume_name('fake_vol_name', 

315 'fake_domain_name')) 

316 self._driver.rpc.call.assert_called_once_with( 

317 'resolveVolumeName', 

318 dict(volume_name='fake_vol_name', 

319 tenant_domain='fake_domain_name'), 

320 [jsonrpc.ERROR_ENOENT, jsonrpc.ERROR_ENTITY_NOT_FOUND] 

321 ) 

322 

323 def test_resolve_volume_name_other_error(self): 

324 self._driver.rpc.call = mock.Mock( 

325 side_effect=exception.QBRpcException( 

326 result='fubar', 

327 qbcode=666)) 

328 

329 self.assertRaises(exception.QBRpcException, 

330 self._driver._resolve_volume_name, 

331 volume_name='fake_vol_name', 

332 tenant_domain='fake_domain_name') 

333 

334 @mock.patch.object(driver.ShareDriver, '_update_share_stats') 

335 def test_update_share_stats(self, mock_uss): 

336 self._driver._get_capacities = mock.Mock(return_value=[42, 23]) 

337 

338 self._driver._update_share_stats() 

339 

340 mock_uss.assert_called_once_with( 

341 dict(storage_protocol='NFS', 

342 vendor_name='Quobyte', 

343 share_backend_name=self._driver.backend_name, 

344 driver_version=self._driver.DRIVER_VERSION, 

345 total_capacity_gb=42, 

346 free_capacity_gb=23, 

347 reserved_percentage=0, 

348 reserved_snapshot_percentage=0, 

349 reserved_share_extend_percentage=0)) 

350 

351 def test_get_capacities_gb(self): 

352 capval = 42115548133 

353 useval = 19695128917 

354 replfact = 3 

355 self._driver._get_qb_replication_factor = mock.Mock( 

356 return_value=replfact) 

357 self._driver.rpc.call = mock.Mock( 

358 return_value={'total_physical_capacity': str(capval), 

359 'total_physical_usage': str(useval)}) 

360 

361 self.assertEqual((39.223160718, 6.960214182), 

362 self._driver._get_capacities()) 

363 

364 def test_get_capacities_gb_full(self): 

365 capval = 1024 * 1024 * 1024 * 3 

366 useval = 1024 * 1024 * 1024 * 3 + 1 

367 replfact = 1 

368 self._driver._get_qb_replication_factor = mock.Mock( 

369 return_value=replfact) 

370 self._driver.rpc.call = mock.Mock( 

371 return_value={'total_physical_capacity': str(capval), 

372 'total_physical_usage': str(useval)}) 

373 

374 self.assertEqual((3.0, 0), self._driver._get_capacities()) 

375 

376 def test_get_replication(self): 

377 fakerepl = 42 

378 self._driver.configuration.quobyte_volume_configuration = 'fakeVolConf' 

379 self._driver.rpc.call = mock.Mock( 

380 return_value={'configuration': 

381 {'volume_metadata_configuration': 

382 {'replication_factor': 

383 str(fakerepl)}}}) 

384 

385 self.assertEqual(fakerepl, self._driver._get_qb_replication_factor()) 

386 

387 @mock.patch.object(quobyte.QuobyteShareDriver, 

388 "_resolve_volume_name", 

389 return_value="fake_uuid") 

390 def test_ensure_share(self, mock_qb_resolve_volname): 

391 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler) 

392 

393 result = self._driver.ensure_share(self._context, self.share, None) 

394 

395 self.assertEqual(self.share["export_location"], result) 

396 (mock_qb_resolve_volname. 

397 assert_called_once_with(self.share['name'], 

398 self.share['project_id'])) 

399 self._driver.rpc.call.assert_has_calls([ 

400 mock.call('exportVolume', dict( 

401 volume_uuid="fake_uuid", 

402 protocol='NFS' 

403 ))]) 

404 

405 @mock.patch.object(quobyte.QuobyteShareDriver, 

406 "_resolve_volume_name", 

407 return_value=None) 

408 def test_ensure_deleted_share(self, mock_qb_resolve_volname): 

409 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler) 

410 

411 self.assertRaises(exception.ShareResourceNotFound, 

412 self._driver.ensure_share, 

413 self._context, self.share, None) 

414 (mock_qb_resolve_volname. 

415 assert_called_once_with(self.share['name'], 

416 self.share['project_id'])) 

417 

418 @mock.patch.object(quobyte.QuobyteShareDriver, "_resize_share") 

419 def test_extend_share(self, mock_qsd_resize_share): 

420 self._driver.extend_share(ext_share=self.share, 

421 ext_size=2, 

422 share_server=None) 

423 mock_qsd_resize_share.assert_called_once_with(share=self.share, 

424 new_size=2) 

425 

426 @mock.patch.object(quobyte.QuobyteShareDriver, "_resolve_volume_name", 

427 return_value="fake_volume_uuid") 

428 def test_resize_share(self, mock_qb_resolv): 

429 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler) 

430 manila_size = 7 

431 newsize_bytes = manila_size * units.Gi 

432 

433 self._driver._resize_share(share=self.share, new_size=manila_size) 

434 

435 exp_params = { 

436 "quotas": [{ 

437 "consumer": [{ 

438 "type": "VOLUME", 

439 "identifier": "fake_volume_uuid", 

440 "tenant_id": self.share["project_id"] 

441 }], 

442 "limits": [{ 

443 "type": "LOGICAL_DISK_SPACE", 

444 "value": newsize_bytes, 

445 }], 

446 }]} 

447 self._driver.rpc.call.assert_has_calls([ 

448 mock.call('setQuota', exp_params)]) 

449 mock_qb_resolv.assert_called_once_with(self.share['name'], 

450 self.share['project_id']) 

451 

452 @mock.patch.object(quobyte.QuobyteShareDriver, 

453 "_resolve_volume_name", 

454 return_value="fake_id_3") 

455 def test_fetch_existing_access(self, mock_qb_resolve_volname): 

456 self._driver.rpc.call = mock.Mock(wraps=fake_rpc_handler) 

457 old_access_1 = create_fake_access(access_id="old_1", 

458 access_adr="10.0.0.4") 

459 old_access_2 = create_fake_access(access_id="old_2", 

460 access_adr="10.0.0.5") 

461 

462 exist_list = self._driver._fetch_existing_access(context=self._context, 

463 share=self.share) 

464 

465 # assert expected result here 

466 self.assertEqual([old_access_1['access_to'], 

467 old_access_2['access_to']], 

468 [e.get('access_to') for e in exist_list]) 

469 (mock_qb_resolve_volname. 

470 assert_called_once_with(self.share['name'], 

471 self.share['project_id'])) 

472 

473 @mock.patch.object(quobyte.QuobyteShareDriver, "_resize_share") 

474 def test_shrink_share(self, mock_qsd_resize_share): 

475 self._driver.shrink_share(shrink_share=self.share, 

476 shrink_size=3, 

477 share_server=None) 

478 mock_qsd_resize_share.assert_called_once_with(share=self.share, 

479 new_size=3) 

480 

481 def test_subtract_access_lists(self): 

482 access_1 = create_fake_access(access_id="new_1", 

483 access_adr="10.0.0.5", 

484 access_type="rw",) 

485 access_2 = create_fake_access(access_id="old_1", 

486 access_adr="10.0.0.1", 

487 access_type="rw") 

488 access_3 = create_fake_access(access_id="old_2", 

489 access_adr="10.0.0.3", 

490 access_type="ro") 

491 access_4 = create_fake_access(access_id="new_2", 

492 access_adr="10.0.0.6", 

493 access_type="rw") 

494 access_5 = create_fake_access(access_id="old_3", 

495 access_adr="10.0.0.4", 

496 access_type="rw") 

497 min_list = [access_1, access_2, access_3, access_4] 

498 sub_list = [access_5, access_3, access_2] 

499 

500 self.assertEqual([access_1, access_4], 

501 self._driver._subtract_access_lists(min_list, 

502 sub_list)) 

503 

504 def test_subtract_access_lists_level(self): 

505 access_1 = create_fake_access(access_id="new_1", 

506 access_adr="10.0.0.5", 

507 access_level="rw") 

508 access_2 = create_fake_access(access_id="old_1", 

509 access_adr="10.0.0.1", 

510 access_level="rw") 

511 access_3 = create_fake_access(access_id="old_2", 

512 access_adr="10.0.0.3", 

513 access_level="rw") 

514 access_4 = create_fake_access(access_id="new_2", 

515 access_adr="10.0.0.6", 

516 access_level="rw") 

517 access_5 = create_fake_access(access_id="old_2_ro", 

518 access_adr="10.0.0.3", 

519 access_level="ro") 

520 min_list = [access_1, access_2, access_3, access_4] 

521 sub_list = [access_5, access_2] 

522 

523 self.assertEqual([access_1, access_3, access_4], 

524 self._driver._subtract_access_lists(min_list, 

525 sub_list)) 

526 

527 def test_subtract_access_lists_type(self): 

528 access_1 = create_fake_access(access_id="new_1", 

529 access_adr="10.0.0.5", 

530 access_type="ip") 

531 access_2 = create_fake_access(access_id="old_1", 

532 access_adr="10.0.0.1", 

533 access_type="ip") 

534 access_3 = create_fake_access(access_id="old_2", 

535 access_adr="10.0.0.3", 

536 access_type="ip") 

537 access_4 = create_fake_access(access_id="new_2", 

538 access_adr="10.0.0.6", 

539 access_type="ip") 

540 access_5 = create_fake_access(access_id="old_2_ro", 

541 access_adr="10.0.0.3", 

542 access_type="other") 

543 min_list = [access_1, access_2, access_3, access_4] 

544 sub_list = [access_5, access_2] 

545 

546 self.assertEqual([access_1, access_3, access_4], 

547 self._driver._subtract_access_lists(min_list, 

548 sub_list)) 

549 

550 @mock.patch.object(quobyte.QuobyteShareDriver, "_allow_access") 

551 @mock.patch.object(quobyte.QuobyteShareDriver, "_deny_access") 

552 def test_update_access_add_delete(self, qb_deny_mock, qb_allow_mock): 

553 access_1 = create_fake_access(access_id="new_1", 

554 access_adr="10.0.0.5", 

555 access_level="rw") 

556 access_2 = create_fake_access(access_id="old_1", 

557 access_adr="10.0.0.1", 

558 access_level="rw") 

559 access_3 = create_fake_access(access_id="old_2", 

560 access_adr="10.0.0.3", 

561 access_level="rw") 

562 

563 self._driver.update_access(self._context, 

564 self.share, 

565 access_rules=None, 

566 add_rules=[access_1], 

567 delete_rules=[access_2, access_3], 

568 update_rules=[]) 

569 

570 qb_allow_mock.assert_called_once_with(self._context, 

571 self.share, access_1) 

572 deny_calls = [mock.call(self._context, self.share, access_2), 

573 mock.call(self._context, self.share, access_3)] 

574 qb_deny_mock.assert_has_calls(deny_calls) 

575 

576 @mock.patch.object(quobyte.LOG, "warning") 

577 def test_update_access_no_rules(self, qb_log_mock): 

578 self._driver.update_access(context=None, share=None, access_rules=[], 

579 add_rules=[], delete_rules=[], 

580 update_rules=[]) 

581 

582 qb_log_mock.assert_has_calls([mock.ANY]) 

583 

584 @mock.patch.object(quobyte.QuobyteShareDriver, "_subtract_access_lists") 

585 @mock.patch.object(quobyte.QuobyteShareDriver, "_fetch_existing_access") 

586 @mock.patch.object(quobyte.QuobyteShareDriver, "_allow_access") 

587 def test_update_access_recovery_additionals(self, 

588 qb_allow_mock, 

589 qb_exist_mock, 

590 qb_subtr_mock): 

591 new_access_1 = create_fake_access(access_id="new_1", 

592 access_adr="10.0.0.2") 

593 old_access = create_fake_access(access_id="fake_access_id", 

594 access_adr="10.0.0.1") 

595 new_access_2 = create_fake_access(access_id="new_2", 

596 access_adr="10.0.0.3") 

597 add_access_rules = [new_access_1, 

598 old_access, 

599 new_access_2] 

600 qb_exist_mock.return_value = [old_access] 

601 qb_subtr_mock.side_effect = [[new_access_1, new_access_2], []] 

602 

603 self._driver.update_access(self._context, self.share, 

604 access_rules=add_access_rules, add_rules=[], 

605 delete_rules=[], update_rules=[]) 

606 

607 assert_calls = [mock.call(self._context, self.share, new_access_1), 

608 mock.call(self._context, self.share, new_access_2)] 

609 qb_allow_mock.assert_has_calls(assert_calls, any_order=True) 

610 qb_exist_mock.assert_called_once_with(self._context, self.share) 

611 

612 @mock.patch.object(quobyte.QuobyteShareDriver, "_subtract_access_lists") 

613 @mock.patch.object(quobyte.QuobyteShareDriver, "_fetch_existing_access") 

614 @mock.patch.object(quobyte.QuobyteShareDriver, "_deny_access") 

615 def test_update_access_recovery_superfluous(self, 

616 qb_deny_mock, 

617 qb_exist_mock, 

618 qb_subtr_mock): 

619 

620 old_access_1 = create_fake_access(access_id="old_1", 

621 access_adr="10.0.0.1") 

622 missing_access_1 = create_fake_access(access_id="mis_1", 

623 access_adr="10.0.0.2") 

624 old_access_2 = create_fake_access(access_id="old_2", 

625 access_adr="10.0.0.3") 

626 qb_exist_mock.side_effect = [[old_access_1, old_access_2]] 

627 qb_subtr_mock.side_effect = [[], [missing_access_1]] 

628 old_access_rules = [old_access_1, old_access_2] 

629 

630 self._driver.update_access(self._context, self.share, 

631 access_rules=old_access_rules, add_rules=[], 

632 delete_rules=[], update_rules=[]) 

633 

634 qb_deny_mock.assert_called_once_with(self._context, 

635 self.share, 

636 (missing_access_1)) 

637 qb_exist_mock.assert_called_once_with(self._context, self.share) 

638 

639 @mock.patch.object(quobyte.QuobyteShareDriver, "_subtract_access_lists") 

640 @mock.patch.object(quobyte.QuobyteShareDriver, "_fetch_existing_access") 

641 @mock.patch.object(quobyte.QuobyteShareDriver, "_deny_access") 

642 @mock.patch.object(quobyte.QuobyteShareDriver, "_allow_access") 

643 def test_update_access_recovery_add_superfluous(self, 

644 qb_allow_mock, 

645 qb_deny_mock, 

646 qb_exist_mock, 

647 qb_subtr_mock): 

648 new_access_1 = create_fake_access(access_id="new_1", 

649 access_adr="10.0.0.5") 

650 old_access_1 = create_fake_access(access_id="old_1", 

651 access_adr="10.0.0.1") 

652 old_access_2 = create_fake_access(access_id="old_2", 

653 access_adr="10.0.0.3") 

654 old_access_3 = create_fake_access(access_id="old_3", 

655 access_adr="10.0.0.4") 

656 miss_access_1 = create_fake_access(access_id="old_3", 

657 access_adr="10.0.0.4") 

658 new_access_2 = create_fake_access(access_id="new_2", 

659 access_adr="10.0.0.3", 

660 access_level="ro") 

661 new_access_rules = [new_access_1, old_access_1, old_access_2, 

662 old_access_3, new_access_2] 

663 qb_exist_mock.return_value = [old_access_1, old_access_2, 

664 old_access_3, miss_access_1] 

665 qb_subtr_mock.side_effect = [[new_access_1, new_access_2], 

666 [miss_access_1, old_access_2]] 

667 

668 self._driver.update_access(self._context, self.share, 

669 new_access_rules, add_rules=[], 

670 delete_rules=[], update_rules=[]) 

671 

672 a_calls = [mock.call(self._context, self.share, new_access_1), 

673 mock.call(self._context, self.share, new_access_2)] 

674 qb_allow_mock.assert_has_calls(a_calls) 

675 b_calls = [mock.call(self._context, self.share, miss_access_1), 

676 mock.call(self._context, self.share, old_access_2)] 

677 qb_deny_mock.assert_has_calls(b_calls) 

678 qb_exist_mock.assert_called_once_with(self._context, self.share)