Coverage for manila/tests/share/drivers/qnap/test_api.py: 99%

236 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2016 QNAP Systems, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import base64 

17import ddt 

18from http import client as http_client 

19import time 

20from unittest import mock 

21from urllib import parse as urlparse 

22 

23from manila import exception 

24from manila.share.drivers.qnap import qnap 

25from manila import test 

26from manila.tests import fake_share 

27from manila.tests.share.drivers.qnap import fakes 

28 

29 

30def create_configuration(management_url, qnap_share_ip, qnap_nas_login, 

31 qnap_nas_password, qnap_poolname): 

32 """Create configuration.""" 

33 configuration = mock.Mock() 

34 configuration.qnap_management_url = management_url 

35 configuration.qnap_share_ip = qnap_share_ip 

36 configuration.qnap_nas_login = qnap_nas_login 

37 configuration.qnap_nas_password = qnap_nas_password 

38 configuration.qnap_poolname = qnap_poolname 

39 configuration.safe_get.return_value = False 

40 return configuration 

41 

42 

43class QnapShareDriverBaseTestCase(test.TestCase): 

44 """Base Class for the QnapShareDriver Tests.""" 

45 

46 def setUp(self): 

47 """Setup the Qnap Driver Base TestCase.""" 

48 super(QnapShareDriverBaseTestCase, self).setUp() 

49 self.driver = None 

50 self.share_api = None 

51 

52 def _do_setup(self, management_url, share_ip, nas_login, 

53 nas_password, poolname, **kwargs): 

54 """Config do setup configurations.""" 

55 self.driver = qnap.QnapShareDriver( 

56 configuration=create_configuration( 

57 management_url, 

58 share_ip, 

59 nas_login, 

60 nas_password, 

61 poolname), 

62 private_storage=kwargs.get('private_storage')) 

63 self.driver.do_setup('context') 

64 

65 

66@ddt.ddt 

67class QnapAPITestCase(QnapShareDriverBaseTestCase): 

68 """Tests QNAP api functions.""" 

69 

70 login_url = ('/cgi-bin/authLogin.cgi?') 

71 get_basic_info_url = ('/cgi-bin/authLogin.cgi') 

72 fake_password = 'qnapadmin' 

73 

74 def setUp(self): 

75 """Setup the Qnap API TestCase.""" 

76 super(QnapAPITestCase, self).setUp() 

77 fake_parms = {} 

78 fake_parms['user'] = 'admin' 

79 fake_parms['pwd'] = base64.b64encode( 

80 self.fake_password.encode("utf-8")) 

81 fake_parms['serviceKey'] = 1 

82 sanitized_params = self._sanitize_params(fake_parms) 

83 self.login_url = ('/cgi-bin/authLogin.cgi?%s' % sanitized_params) 

84 self.mock_object(http_client, 'HTTPConnection') 

85 self.share = fake_share.fake_share( 

86 share_proto='NFS', 

87 id='shareId', 

88 display_name='fakeDisplayName', 

89 export_locations=[{'path': '1.2.3.4:/share/fakeShareName'}], 

90 host='QnapShareDriver', 

91 size=10) 

92 

93 def _sanitize_params(self, params, doseq=False): 

94 sanitized_params = {} 

95 for key in params: 

96 value = params[key] 

97 if value is not None: 97 ↛ 95line 97 didn't jump to line 95 because the condition on line 97 was always true

98 if isinstance(value, list): 

99 sanitized_params[key] = [str(v) for v in value] 

100 else: 

101 sanitized_params[key] = str(value) 

102 

103 sanitized_params = urlparse.urlencode(sanitized_params, doseq) 

104 return sanitized_params 

105 

106 @ddt.data('fake_share_name', 'fakeLabel') 

107 def test_create_share_api(self, fake_name): 

108 """Test create share api.""" 

109 mock_http_connection = http_client.HTTPConnection 

110 mock_http_connection.return_value.getresponse.side_effect = [ 

111 fakes.FakeLoginResponse(), 

112 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

113 fakes.FakeLoginResponse(), 

114 fakes.FakeCreateShareResponse()] 

115 

116 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

117 'qnapadmin', 'Storage Pool 1') 

118 self.driver.api_executor.create_share( 

119 self.share, 

120 'Storage Pool 1', 

121 fake_name, 

122 'NFS', 

123 qnap_deduplication=False, 

124 qnap_compression=True, 

125 qnap_thin_provision=True, 

126 qnap_ssd_cache=False) 

127 

128 fake_params = { 

129 'wiz_func': 'share_create', 

130 'action': 'add_share', 

131 'vol_name': fake_name, 

132 'vol_size': '10' + 'GB', 

133 'threshold': '80', 

134 'dedup': 'off', 

135 'compression': '1', 

136 'thin_pro': '1', 

137 'cache': '0', 

138 'cifs_enable': '0', 

139 'nfs_enable': '1', 

140 'afp_enable': '0', 

141 'ftp_enable': '0', 

142 'encryption': '0', 

143 'hidden': '0', 

144 'oplocks': '1', 

145 'sync': 'always', 

146 'userrw0': 'admin', 

147 'userrd_len': '0', 

148 'userrw_len': '1', 

149 'userno_len': '0', 

150 'access_r': 'setup_users', 

151 'path_type': 'auto', 

152 'recycle_bin': '1', 

153 'recycle_bin_administrators_only': '0', 

154 'pool_name': 'Storage Pool 1', 

155 'sid': 'fakeSid', 

156 } 

157 sanitized_params = self._sanitize_params(fake_params) 

158 fake_url = ('/cgi-bin/wizReq.cgi?%s' % sanitized_params) 

159 

160 expected_call_list = [ 

161 mock.call('GET', self.login_url), 

162 mock.call('GET', self.get_basic_info_url), 

163 mock.call('GET', self.login_url), 

164 mock.call('GET', fake_url)] 

165 self.assertEqual( 

166 expected_call_list, 

167 mock_http_connection.return_value.request.call_args_list) 

168 

169 def test_api_delete_share(self): 

170 """Test delete share api.""" 

171 mock_http_connection = http_client.HTTPConnection 

172 mock_http_connection.return_value.getresponse.side_effect = [ 

173 fakes.FakeLoginResponse(), 

174 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

175 fakes.FakeLoginResponse(), 

176 fakes.FakeDeleteShareResponse()] 

177 

178 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

179 'qnapadmin', 'Storage Pool 1') 

180 self.driver.api_executor.delete_share( 

181 'fakeId') 

182 

183 fake_params = { 

184 'func': 'volume_mgmt', 

185 'vol_remove': '1', 

186 'volumeID': 'fakeId', 

187 'stop_service': 'no', 

188 'sid': 'fakeSid', 

189 } 

190 sanitized_params = self._sanitize_params(fake_params) 

191 fake_url = ( 

192 '/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params) 

193 

194 expected_call_list = [ 

195 mock.call('GET', self.login_url), 

196 mock.call('GET', self.get_basic_info_url), 

197 mock.call('GET', self.login_url), 

198 mock.call('GET', fake_url)] 

199 self.assertEqual( 

200 expected_call_list, 

201 mock_http_connection.return_value.request.call_args_list) 

202 

203 def test_get_specific_poolinfo(self): 

204 """Test get specific poolinfo api.""" 

205 mock_http_connection = http_client.HTTPConnection 

206 mock_http_connection.return_value.getresponse.side_effect = [ 

207 fakes.FakeLoginResponse(), 

208 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

209 fakes.FakeLoginResponse(), 

210 fakes.FakeSpecificPoolInfoResponse()] 

211 

212 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

213 'qnapadmin', 'Storage Pool 1') 

214 self.driver.api_executor.get_specific_poolinfo( 

215 'fakePoolId') 

216 

217 fake_params = { 

218 'store': 'poolInfo', 

219 'func': 'extra_get', 

220 'poolID': 'fakePoolId', 

221 'Pool_Info': '1', 

222 'sid': 'fakeSid', 

223 } 

224 sanitized_params = self._sanitize_params(fake_params) 

225 fake_url = ( 

226 '/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params) 

227 

228 expected_call_list = [ 

229 mock.call('GET', self.login_url), 

230 mock.call('GET', self.get_basic_info_url), 

231 mock.call('GET', self.login_url), 

232 mock.call('GET', fake_url)] 

233 self.assertEqual( 

234 expected_call_list, 

235 mock_http_connection.return_value.request.call_args_list) 

236 

237 @ddt.data({'pool_id': "Storage Pool 1"}, 

238 {'pool_id': "Storage Pool 1", 'vol_no': 'fakeNo'}, 

239 {'pool_id': "Storage Pool 1", 'vol_label': 'fakeShareName'}) 

240 def test_get_share_info(self, dict_parm): 

241 """Test get share info api.""" 

242 mock_http_connection = http_client.HTTPConnection 

243 mock_http_connection.return_value.getresponse.side_effect = [ 

244 fakes.FakeLoginResponse(), 

245 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

246 fakes.FakeLoginResponse(), 

247 fakes.FakeShareInfoResponse()] 

248 

249 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

250 'qnapadmin', 'Storage Pool 1') 

251 self.driver.api_executor.get_share_info(**dict_parm) 

252 

253 fake_params = { 

254 'store': 'poolVolumeList', 

255 'poolID': 'Storage Pool 1', 

256 'func': 'extra_get', 

257 'Pool_Vol_Info': '1', 

258 'sid': 'fakeSid', 

259 } 

260 sanitized_params = self._sanitize_params(fake_params) 

261 fake_url = ( 

262 '/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params) 

263 

264 expected_call_list = [ 

265 mock.call('GET', self.login_url), 

266 mock.call('GET', self.get_basic_info_url), 

267 mock.call('GET', self.login_url), 

268 mock.call('GET', fake_url)] 

269 self.assertEqual( 

270 expected_call_list, 

271 mock_http_connection.return_value.request.call_args_list) 

272 

273 def test_get_specific_volinfo(self): 

274 """Test get specific volume info api.""" 

275 mock_http_connection = http_client.HTTPConnection 

276 mock_http_connection.return_value.getresponse.side_effect = [ 

277 fakes.FakeLoginResponse(), 

278 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

279 fakes.FakeLoginResponse(), 

280 fakes.FakeSpecificVolInfoResponse()] 

281 

282 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

283 'qnapadmin', 'Storage Pool 1') 

284 self.driver.api_executor.get_specific_volinfo( 

285 'fakeNo') 

286 

287 fake_params = { 

288 'store': 'volumeInfo', 

289 'volumeID': 'fakeNo', 

290 'func': 'extra_get', 

291 'Volume_Info': '1', 

292 'sid': 'fakeSid', 

293 } 

294 sanitized_params = self._sanitize_params(fake_params) 

295 fake_url = ( 

296 '/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params) 

297 

298 expected_call_list = [ 

299 mock.call('GET', self.login_url), 

300 mock.call('GET', self.get_basic_info_url), 

301 mock.call('GET', self.login_url), 

302 mock.call('GET', fake_url)] 

303 self.assertEqual( 

304 expected_call_list, 

305 mock_http_connection.return_value.request.call_args_list) 

306 

307 def test_get_snapshot_info_es(self): 

308 """Test get snapsho info api.""" 

309 mock_http_connection = http_client.HTTPConnection 

310 mock_http_connection.return_value.getresponse.side_effect = [ 

311 fakes.FakeLoginResponse(), 

312 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

313 fakes.FakeLoginResponse(), 

314 fakes.FakeSnapshotInfoResponse()] 

315 

316 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

317 'qnapadmin', 'Storage Pool 1') 

318 self.driver.api_executor.get_snapshot_info( 

319 volID='volId', snapshot_name='fakeSnapshotName') 

320 

321 fake_params = { 

322 'func': 'extra_get', 

323 'volumeID': 'volId', 

324 'snapshot_list': '1', 

325 'snap_start': '0', 

326 'snap_count': '100', 

327 'sid': 'fakeSid', 

328 } 

329 sanitized_params = self._sanitize_params(fake_params) 

330 fake_url = ( 

331 '/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params) 

332 

333 expected_call_list = [ 

334 mock.call('GET', self.login_url), 

335 mock.call('GET', self.get_basic_info_url), 

336 mock.call('GET', self.login_url), 

337 mock.call('GET', fake_url)] 

338 self.assertEqual( 

339 expected_call_list, 

340 mock_http_connection.return_value.request.call_args_list) 

341 

342 def test_create_snapshot_api(self): 

343 """Test create snapshot api.""" 

344 mock_http_connection = http_client.HTTPConnection 

345 mock_http_connection.return_value.getresponse.side_effect = [ 

346 fakes.FakeLoginResponse(), 

347 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

348 fakes.FakeLoginResponse(), 

349 fakes.FakeCreateSnapshotResponse()] 

350 

351 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

352 'qnapadmin', 'Storage Pool 1') 

353 self.driver.api_executor.create_snapshot_api( 

354 'fakeVolumeId', 

355 'fakeSnapshotName') 

356 

357 fake_params = { 

358 'func': 'create_snapshot', 

359 'volumeID': 'fakeVolumeId', 

360 'snapshot_name': 'fakeSnapshotName', 

361 'expire_min': '0', 

362 'vital': '1', 

363 'sid': 'fakeSid', 

364 } 

365 sanitized_params = self._sanitize_params(fake_params) 

366 fake_url = ( 

367 '/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params) 

368 

369 expected_call_list = [ 

370 mock.call('GET', self.login_url), 

371 mock.call('GET', self.get_basic_info_url), 

372 mock.call('GET', self.login_url), 

373 mock.call('GET', fake_url)] 

374 self.assertEqual( 

375 expected_call_list, 

376 mock_http_connection.return_value.request.call_args_list) 

377 

378 @ddt.data(fakes.FakeDeleteSnapshotResponse(), 

379 fakes.FakeDeleteSnapshotResponseSnapshotNotExist(), 

380 fakes.FakeDeleteSnapshotResponseShareNotExist()) 

381 def test_delete_snapshot_api(self, fakeDeleteSnapshotResponse): 

382 """Test delete snapshot api.""" 

383 mock_http_connection = http_client.HTTPConnection 

384 mock_http_connection.return_value.getresponse.side_effect = [ 

385 fakes.FakeLoginResponse(), 

386 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

387 fakes.FakeLoginResponse(), 

388 fakeDeleteSnapshotResponse] 

389 

390 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

391 'qnapadmin', 'Storage Pool 1') 

392 self.driver.api_executor.delete_snapshot_api( 

393 'fakeSnapshotId') 

394 

395 fake_params = { 

396 'func': 'del_snapshots', 

397 'snapshotID': 'fakeSnapshotId', 

398 'sid': 'fakeSid', 

399 } 

400 sanitized_params = self._sanitize_params(fake_params) 

401 fake_url = ( 

402 '/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params) 

403 

404 expected_call_list = [ 

405 mock.call('GET', self.login_url), 

406 mock.call('GET', self.get_basic_info_url), 

407 mock.call('GET', self.login_url), 

408 mock.call('GET', fake_url)] 

409 self.assertEqual( 

410 expected_call_list, 

411 mock_http_connection.return_value.request.call_args_list) 

412 

413 def test_clone_snapshot_api(self): 

414 """Test clone snapshot api.""" 

415 mock_http_connection = http_client.HTTPConnection 

416 mock_http_connection.return_value.getresponse.side_effect = [ 

417 fakes.FakeLoginResponse(), 

418 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

419 fakes.FakeLoginResponse(), 

420 fakes.FakeDeleteSnapshotResponse()] 

421 

422 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

423 'qnapadmin', 'Storage Pool 1') 

424 self.driver.api_executor.clone_snapshot( 

425 'fakeSnapshotId', 

426 'fakeNewShareName', 

427 'fakeCloneSize') 

428 

429 fake_params = { 

430 'func': 'clone_qsnapshot', 

431 'by_vol': '1', 

432 'snapshotID': 'fakeSnapshotId', 

433 'new_name': 'fakeNewShareName', 

434 'clone_size': '{}g'.format('fakeCloneSize'), 

435 'sid': 'fakeSid', 

436 } 

437 sanitized_params = self._sanitize_params(fake_params) 

438 fake_url = ( 

439 '/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params) 

440 

441 expected_call_list = [ 

442 mock.call('GET', self.login_url), 

443 mock.call('GET', self.get_basic_info_url), 

444 mock.call('GET', self.login_url), 

445 mock.call('GET', fake_url)] 

446 self.assertEqual( 

447 expected_call_list, 

448 mock_http_connection.return_value.request.call_args_list) 

449 

450 def test_edit_share_api(self): 

451 """Test edit share api.""" 

452 mock_http_connection = http_client.HTTPConnection 

453 mock_http_connection.return_value.getresponse.side_effect = [ 

454 fakes.FakeLoginResponse(), 

455 fakes.FakeGetBasicInfoResponseTs_4_3_0(), 

456 fakes.FakeLoginResponse(), 

457 fakes.FakeCreateSnapshotResponse()] 

458 

459 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

460 'qnapadmin', 'Storage Pool 1') 

461 expect_share_dict = { 

462 "sharename": 'fakeVolId', 

463 "old_sharename": 'fakeVolId', 

464 "new_size": 100, 

465 "deduplication": False, 

466 "compression": True, 

467 "thin_provision": True, 

468 "ssd_cache": False, 

469 "share_proto": "NFS" 

470 } 

471 self.driver.api_executor.edit_share( 

472 expect_share_dict) 

473 

474 fake_params = { 

475 'wiz_func': 'share_property', 

476 'action': 'share_property', 

477 'sharename': 'fakeVolId', 

478 'old_sharename': 'fakeVolId', 

479 'dedup': 'off', 

480 'compression': '1', 

481 'thin_pro': '1', 

482 'cache': '0', 

483 'cifs_enable': '0', 

484 'nfs_enable': '1', 

485 'afp_enable': '0', 

486 'ftp_enable': '0', 

487 'hidden': '0', 

488 'oplocks': '1', 

489 'sync': 'always', 

490 'recycle_bin': '1', 

491 'recycle_bin_administrators_only': '0', 

492 'sid': 'fakeSid', 

493 } 

494 if expect_share_dict.get('new_size'): 494 ↛ 496line 494 didn't jump to line 496 because the condition on line 494 was always true

495 fake_params['vol_size'] = '100GB' 

496 sanitized_params = self._sanitize_params(fake_params) 

497 fake_url = ( 

498 '/cgi-bin/priv/privWizard.cgi?%s' % sanitized_params) 

499 

500 expected_call_list = [ 

501 mock.call('GET', self.login_url), 

502 mock.call('GET', self.get_basic_info_url), 

503 mock.call('GET', self.login_url), 

504 mock.call('GET', fake_url)] 

505 self.assertEqual( 

506 expected_call_list, 

507 mock_http_connection.return_value.request.call_args_list) 

508 

509 @ddt.data(fakes.FakeGetHostListResponse(), 

510 fakes.FakeGetNoHostListResponse()) 

511 def test_get_host_list(self, fakeGetHostListResponse): 

512 """Test get host list api.""" 

513 mock_http_connection = http_client.HTTPConnection 

514 mock_http_connection.return_value.getresponse.side_effect = [ 

515 fakes.FakeLoginResponse(), 

516 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

517 fakes.FakeLoginResponse(), 

518 fakeGetHostListResponse] 

519 

520 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

521 'qnapadmin', 'Storage Pool 1') 

522 self.driver.api_executor.get_host_list() 

523 

524 fake_params = { 

525 'module': 'hosts', 

526 'func': 'get_hostlist', 

527 'sid': 'fakeSid', 

528 } 

529 sanitized_params = self._sanitize_params(fake_params) 

530 fake_url = ( 

531 ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s') % 

532 sanitized_params) 

533 

534 expected_call_list = [ 

535 mock.call('GET', self.login_url), 

536 mock.call('GET', self.get_basic_info_url), 

537 mock.call('GET', self.login_url), 

538 mock.call('GET', fake_url)] 

539 self.assertEqual( 

540 expected_call_list, 

541 mock_http_connection.return_value.request.call_args_list) 

542 

543 def test_add_host(self): 

544 """Test add host api.""" 

545 mock_http_connection = http_client.HTTPConnection 

546 mock_http_connection.return_value.getresponse.side_effect = [ 

547 fakes.FakeLoginResponse(), 

548 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

549 fakes.FakeLoginResponse(), 

550 fakes.FakeGetHostListResponse()] 

551 

552 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

553 'qnapadmin', 'Storage Pool 1') 

554 self.driver.api_executor.add_host( 

555 'fakeHostName', 'fakeIpV4') 

556 

557 fake_params = { 

558 'module': 'hosts', 

559 'func': 'apply_addhost', 

560 'name': 'fakeHostName', 

561 'ipaddr_v4': 'fakeIpV4', 

562 'sid': 'fakeSid', 

563 } 

564 sanitized_params = self._sanitize_params(fake_params) 

565 fake_url = ( 

566 ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s') % 

567 sanitized_params) 

568 

569 expected_call_list = [ 

570 mock.call('GET', self.login_url), 

571 mock.call('GET', self.get_basic_info_url), 

572 mock.call('GET', self.login_url), 

573 mock.call('GET', fake_url)] 

574 self.assertEqual( 

575 expected_call_list, 

576 mock_http_connection.return_value.request.call_args_list) 

577 

578 def test_edit_host(self): 

579 """Test edit host api.""" 

580 mock_http_connection = http_client.HTTPConnection 

581 mock_http_connection.return_value.getresponse.side_effect = [ 

582 fakes.FakeLoginResponse(), 

583 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

584 fakes.FakeLoginResponse(), 

585 fakes.FakeGetHostListResponse()] 

586 

587 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

588 'qnapadmin', 'Storage Pool 1') 

589 self.driver.api_executor.edit_host( 

590 'fakeHostName', ['fakeIpV4']) 

591 

592 fake_params = { 

593 'module': 'hosts', 

594 'func': 'apply_sethost', 

595 'name': 'fakeHostName', 

596 'ipaddr_v4': ['fakeIpV4'], 

597 'sid': 'fakeSid', 

598 } 

599 sanitized_params = self._sanitize_params(fake_params, doseq=True) 

600 fake_url = ( 

601 ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s') % 

602 sanitized_params) 

603 

604 expected_call_list = [ 

605 mock.call('GET', self.login_url), 

606 mock.call('GET', self.get_basic_info_url), 

607 mock.call('GET', self.login_url), 

608 mock.call('GET', fake_url)] 

609 self.assertEqual( 

610 expected_call_list, 

611 mock_http_connection.return_value.request.call_args_list) 

612 

613 def test_delete_host(self): 

614 """Test delete host api.""" 

615 mock_http_connection = http_client.HTTPConnection 

616 mock_http_connection.return_value.getresponse.side_effect = [ 

617 fakes.FakeLoginResponse(), 

618 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

619 fakes.FakeLoginResponse(), 

620 fakes.FakeGetHostListResponse()] 

621 

622 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

623 'qnapadmin', 'Storage Pool 1') 

624 self.driver.api_executor.delete_host('fakeHostName') 

625 

626 fake_params = { 

627 'module': 'hosts', 

628 'func': 'apply_delhost', 

629 'host_name': 'fakeHostName', 

630 'sid': 'fakeSid', 

631 } 

632 sanitized_params = self._sanitize_params(fake_params) 

633 fake_url = ( 

634 ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s') % 

635 sanitized_params) 

636 

637 expected_call_list = [ 

638 mock.call('GET', self.login_url), 

639 mock.call('GET', self.get_basic_info_url), 

640 mock.call('GET', self.login_url), 

641 mock.call('GET', fake_url)] 

642 self.assertEqual( 

643 expected_call_list, 

644 mock_http_connection.return_value.request.call_args_list) 

645 

646 @ddt.data(fakes.FakeGetHostListResponse()) 

647 def test_set_nfs_access(self, fakeGetHostListResponse): 

648 """Test get host list api.""" 

649 mock_http_connection = http_client.HTTPConnection 

650 mock_http_connection.return_value.getresponse.side_effect = [ 

651 fakes.FakeLoginResponse(), 

652 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

653 fakes.FakeLoginResponse(), 

654 fakeGetHostListResponse] 

655 

656 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

657 'qnapadmin', 'Storage Pool 1') 

658 self.driver.api_executor.set_nfs_access( 

659 'fakeShareName', 'fakeAccess', 'fakeHostName') 

660 

661 fake_params = { 

662 'wiz_func': 'share_nfs_control', 

663 'action': 'share_nfs_control', 

664 'sharename': 'fakeShareName', 

665 'access': 'fakeAccess', 

666 'host_name': 'fakeHostName', 

667 'sid': 'fakeSid', 

668 } 

669 sanitized_params = self._sanitize_params(fake_params) 

670 fake_url = ( 

671 ('/cgi-bin/priv/privWizard.cgi?%s') % 

672 sanitized_params) 

673 

674 expected_call_list = [ 

675 mock.call('GET', self.login_url), 

676 mock.call('GET', self.get_basic_info_url), 

677 mock.call('GET', self.login_url), 

678 mock.call('GET', fake_url)] 

679 self.assertEqual( 

680 expected_call_list, 

681 mock_http_connection.return_value.request.call_args_list) 

682 

683 def test_get_snapshot_info_ts_api(self): 

684 """Test get snapshot info api.""" 

685 mock_http_connection = http_client.HTTPConnection 

686 mock_http_connection.return_value.getresponse.side_effect = [ 

687 fakes.FakeLoginResponse(), 

688 fakes.FakeGetBasicInfoResponseTs_4_3_0(), 

689 fakes.FakeLoginResponse(), 

690 fakes.FakeSnapshotInfoResponse()] 

691 

692 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

693 'qnapadmin', 'Storage Pool 1') 

694 self.driver.api_executor.get_snapshot_info( 

695 snapshot_name='fakeSnapshotName', 

696 lun_index='fakeLunIndex') 

697 

698 fake_params = { 

699 'func': 'extra_get', 

700 'LUNIndex': 'fakeLunIndex', 

701 'smb_snapshot_list': '1', 

702 'smb_snapshot': '1', 

703 'snapshot_list': '1', 

704 'sid': 'fakeSid'} 

705 

706 sanitized_params = self._sanitize_params(fake_params) 

707 fake_url = ( 

708 ('/cgi-bin/disk/snapshot.cgi?%s') % 

709 sanitized_params) 

710 

711 expected_call_list = [ 

712 mock.call('GET', self.login_url), 

713 mock.call('GET', self.get_basic_info_url), 

714 mock.call('GET', self.login_url), 

715 mock.call('GET', fake_url)] 

716 self.assertEqual( 

717 expected_call_list, 

718 mock_http_connection.return_value.request.call_args_list) 

719 

720 @ddt.data(fakes.FakeAuthPassFailResponse(), 

721 fakes.FakeEsResCodeNegativeResponse()) 

722 def test_api_create_share_with_fail_response(self, fake_fail_response): 

723 """Test create share api with fail response.""" 

724 mock_http_connection = http_client.HTTPConnection 

725 mock_http_connection.return_value.getresponse.side_effect = [ 

726 fakes.FakeLoginResponse(), 

727 fakes.FakeGetBasicInfoResponseEs_1_1_3(), 

728 fakes.FakeLoginResponse(), 

729 fake_fail_response, 

730 fake_fail_response, 

731 fake_fail_response, 

732 fake_fail_response, 

733 fake_fail_response, 

734 fake_fail_response, 

735 fake_fail_response, 

736 fake_fail_response, 

737 fake_fail_response, 

738 fake_fail_response] 

739 

740 self.mock_object(time, 'sleep') 

741 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

742 'qnapadmin', 'Storage Pool 1') 

743 self.assertRaises( 

744 exception.ShareBackendException, 

745 self.driver.api_executor.create_share, 

746 share=self.share, 

747 pool_name='Storage Pool 1', 

748 create_share_name='fake_share_name', 

749 share_proto='NFS', 

750 qnap_deduplication=False, 

751 qnap_compression=True, 

752 qnap_thin_provision=True, 

753 qnap_ssd_cache=False) 

754 

755 @ddt.unpack 

756 @ddt.data(['self.driver.api_executor.get_share_info', 

757 {'pool_id': 'fakeId'}, 

758 fakes.FakeAuthPassFailResponse(), 

759 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

760 ['self.driver.api_executor.get_specific_volinfo', 

761 {'vol_id': 'fakeId'}, 

762 fakes.FakeAuthPassFailResponse(), 

763 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

764 ['self.driver.api_executor.create_snapshot_api', 

765 {'volumeID': 'fakeVolumeId', 

766 'snapshot_name': 'fakeSnapshotName'}, 

767 fakes.FakeAuthPassFailResponse(), 

768 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

769 ['self.driver.api_executor.create_snapshot_api', 

770 {'volumeID': 'fakeVolumeId', 

771 'snapshot_name': 'fakeSnapshotName'}, 

772 fakes.FakeEsResCodeNegativeResponse(), 

773 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

774 ['self.driver.api_executor.get_snapshot_info', 

775 {'volID': 'volId'}, 

776 fakes.FakeAuthPassFailResponse(), 

777 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

778 ['self.driver.api_executor.get_snapshot_info', 

779 {'volID': 'volId'}, 

780 fakes.FakeResultNegativeResponse(), 

781 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

782 ['self.driver.api_executor.get_specific_poolinfo', 

783 {'pool_id': 'Storage Pool 1'}, 

784 fakes.FakeAuthPassFailResponse(), 

785 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

786 ['self.driver.api_executor.get_specific_poolinfo', 

787 {'pool_id': 'Storage Pool 1'}, 

788 fakes.FakeResultNegativeResponse(), 

789 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

790 ['self.driver.api_executor.delete_share', 

791 {'vol_id': 'fakeId'}, 

792 fakes.FakeAuthPassFailResponse(), 

793 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

794 ['self.driver.api_executor.delete_share', 

795 {'vol_id': 'fakeId'}, 

796 fakes.FakeResultNegativeResponse(), 

797 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

798 ['self.driver.api_executor.delete_snapshot_api', 

799 {'snapshot_id': 'fakeSnapshotId'}, 

800 fakes.FakeAuthPassFailResponse(), 

801 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

802 ['self.driver.api_executor.delete_snapshot_api', 

803 {'snapshot_id': 'fakeSnapshotId'}, 

804 fakes.FakeResultNegativeResponse(), 

805 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

806 ['self.driver.api_executor.clone_snapshot', 

807 {'snapshot_id': 'fakeSnapshotId', 

808 'new_sharename': 'fakeNewShareName', 

809 'clone_size': 'fakeCloneSize'}, 

810 fakes.FakeResultNegativeResponse(), 

811 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

812 ['self.driver.api_executor.clone_snapshot', 

813 {'snapshot_id': 'fakeSnapshotId', 

814 'new_sharename': 'fakeNewShareName', 

815 'clone_size': 'fakeCloneSize'}, 

816 fakes.FakeAuthPassFailResponse(), 

817 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

818 ['self.driver.api_executor.edit_share', 

819 {'share_dict': {"sharename": 'fakeVolId', 

820 "old_sharename": 'fakeVolId', 

821 "new_size": 100, 

822 "deduplication": False, 

823 "compression": True, 

824 "thin_provision": False, 

825 "ssd_cache": False, 

826 "share_proto": "NFS"}}, 

827 fakes.FakeEsResCodeNegativeResponse(), 

828 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

829 ['self.driver.api_executor.edit_share', 

830 {'share_dict': {"sharename": 'fakeVolId', 

831 "old_sharename": 'fakeVolId', 

832 "new_size": 100, 

833 "deduplication": False, 

834 "compression": True, 

835 "thin_provision": False, 

836 "ssd_cache": False, 

837 "share_proto": "NFS"}}, 

838 fakes.FakeAuthPassFailResponse(), 

839 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

840 ['self.driver.api_executor.add_host', 

841 {'hostname': 'fakeHostName', 

842 'ipv4': 'fakeIpV4'}, 

843 fakes.FakeResultNegativeResponse(), 

844 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

845 ['self.driver.api_executor.add_host', 

846 {'hostname': 'fakeHostName', 

847 'ipv4': 'fakeIpV4'}, 

848 fakes.FakeAuthPassFailResponse(), 

849 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

850 ['self.driver.api_executor.edit_host', 

851 {'hostname': 'fakeHostName', 

852 'ipv4_list': 'fakeIpV4List'}, 

853 fakes.FakeResultNegativeResponse(), 

854 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

855 ['self.driver.api_executor.edit_host', 

856 {'hostname': 'fakeHostName', 

857 'ipv4_list': 'fakeIpV4List'}, 

858 fakes.FakeAuthPassFailResponse(), 

859 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

860 ['self.driver.api_executor.delete_host', 

861 {'hostname': 'fakeHostName'}, 

862 fakes.FakeResultNegativeResponse(), 

863 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

864 ['self.driver.api_executor.delete_host', 

865 {'hostname': 'fakeHostName'}, 

866 fakes.FakeAuthPassFailResponse(), 

867 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

868 ['self.driver.api_executor.get_host_list', 

869 {}, 

870 fakes.FakeResultNegativeResponse(), 

871 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

872 ['self.driver.api_executor.get_host_list', 

873 {}, 

874 fakes.FakeAuthPassFailResponse(), 

875 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

876 ['self.driver.api_executor.set_nfs_access', 

877 {'sharename': 'fakeShareName', 

878 'access': 'fakeAccess', 

879 'host_name': 'fakeHostName'}, 

880 fakes.FakeAuthPassFailResponse(), 

881 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

882 ['self.driver.api_executor.set_nfs_access', 

883 {'sharename': 'fakeShareName', 

884 'access': 'fakeAccess', 

885 'host_name': 'fakeHostName'}, 

886 fakes.FakeResultNegativeResponse(), 

887 fakes.FakeGetBasicInfoResponseEs_1_1_3()], 

888 ['self.driver.api_executor.get_snapshot_info', 

889 {'snapshot_name': 'fakeSnapshoName', 

890 'lun_index': 'fakeLunIndex'}, 

891 fakes.FakeAuthPassFailResponse(), 

892 fakes.FakeGetBasicInfoResponseTs_4_3_0()], 

893 ['self.driver.api_executor.get_snapshot_info', 

894 {'snapshot_name': 'fakeSnapshoName', 

895 'lun_index': 'fakeLunIndex'}, 

896 fakes.FakeResultNegativeResponse(), 

897 fakes.FakeGetBasicInfoResponseTs_4_3_0()]) 

898 def test_get_snapshot_info_ts_with_fail_response( 

899 self, api, dict_parm, 

900 fake_fail_response, fake_basic_info): 

901 """Test get snapshot info api with fail response.""" 

902 mock_http_connection = http_client.HTTPConnection 

903 mock_http_connection.return_value.getresponse.side_effect = [ 

904 fakes.FakeLoginResponse(), 

905 fake_basic_info, 

906 fakes.FakeLoginResponse(), 

907 fake_fail_response, 

908 fake_fail_response, 

909 fake_fail_response, 

910 fake_fail_response, 

911 fake_fail_response, 

912 fake_fail_response, 

913 fake_fail_response, 

914 fake_fail_response, 

915 fake_fail_response, 

916 fake_fail_response] 

917 

918 self._do_setup('http://1.2.3.4:8080', '1.2.3.4', 'admin', 

919 'qnapadmin', 'Storage Pool 1') 

920 self.mock_object(time, 'sleep') 

921 self.assertRaises( 

922 exception.ShareBackendException, 

923 eval(api), 

924 **dict_parm)