Coverage for manila/share/drivers/qnap/api.py: 95%

380 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2016 QNAP Systems, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15""" 

16API for QNAP Storage. 

17""" 

18import base64 

19import functools 

20from http import client as http_client 

21import re 

22import ssl 

23from urllib import parse as urlparse 

24 

25from defusedxml import ElementTree as etree 

26from oslo_log import log as logging 

27 

28from manila import exception 

29from manila.i18n import _ 

30from manila import utils 

31 

32LOG = logging.getLogger(__name__) 

33MSG_SESSION_EXPIRED = _("Session ID expired") 

34MSG_UNEXPECT_RESP = _("Unexpected response from QNAP API") 

35 

36 

37def _connection_checker(func): 

38 """Decorator to check session has expired or not.""" 

39 @utils.retry(retry_param=exception.ShareBackendException, 

40 retries=5) 

41 @functools.wraps(func) 

42 def inner_connection_checker(self, *args, **kwargs): 

43 LOG.debug('in _connection_checker') 

44 pattern = re.compile(r".*Session ID expired.$") 

45 try: 

46 return func(self, *args, **kwargs) 

47 except exception.ShareBackendException as e: 

48 matches = pattern.match(str(e)) 

49 if matches: 

50 LOG.debug('Session might have expired.' 

51 ' Trying to relogin') 

52 self._login() 

53 raise 

54 return inner_connection_checker 

55 

56 

57class QnapAPIExecutor(object): 

58 """Makes QNAP API calls for ES NAS.""" 

59 

60 def __init__(self, *args, **kwargs): 

61 self.sid = None 

62 self.username = kwargs['username'] 

63 self.password = kwargs['password'] 

64 self.ip, self.port, self.ssl = ( 

65 self._parse_management_url(kwargs['management_url'])) 

66 self._login() 

67 

68 def _parse_management_url(self, management_url): 

69 pattern = re.compile(r"(http|https)\:\/\/(\S+)\:(\d+)") 

70 matches = pattern.match(management_url) 

71 if matches.group(1) == 'http': 

72 management_ssl = False 

73 else: 

74 management_ssl = True 

75 management_ip = matches.group(2) 

76 management_port = matches.group(3) 

77 return management_ip, management_port, management_ssl 

78 

79 def _prepare_connection(self, isSSL, ip, port): 

80 if isSSL: 

81 if hasattr(ssl, '_create_unverified_context'): 81 ↛ 87line 81 didn't jump to line 87 because the condition on line 81 was always true

82 context = ssl._create_unverified_context() # nosec B323 

83 connection = http_client.HTTPSConnection(ip, 

84 port=port, 

85 context=context) 

86 else: 

87 connection = http_client.HTTPSConnection(ip, 

88 port=port) 

89 else: 

90 connection = http_client.HTTPConnection(ip, port) 

91 return connection 

92 

93 def get_basic_info(self, management_url): 

94 """Get the basic information of NAS.""" 

95 LOG.debug('in get_basic_info') 

96 management_ip, management_port, management_ssl = ( 

97 self._parse_management_url(management_url)) 

98 connection = self._prepare_connection(management_ssl, 

99 management_ip, 

100 management_port) 

101 

102 connection.request('GET', '/cgi-bin/authLogin.cgi') 

103 response = connection.getresponse() 

104 data = response.read() 

105 LOG.debug('response data: %s', data) 

106 

107 root = etree.fromstring(data) 

108 

109 display_model_name = root.find('model/displayModelName').text 

110 internal_model_name = root.find('model/internalModelName').text 

111 fw_version = root.find('firmware/version').text 

112 

113 connection.close() 

114 return display_model_name, internal_model_name, fw_version 

115 

116 def _execute_and_get_response_details(self, nas_ip, url): 

117 """Will prepare response after executing a http request.""" 

118 LOG.debug('port: %(port)s, ssl: %(ssl)s', 

119 {'port': self.port, 'ssl': self.ssl}) 

120 

121 res_details = {} 

122 

123 # Prepare the connection 

124 connection = self._prepare_connection(self.ssl, 

125 nas_ip, 

126 self.port) 

127 

128 # Make the connection 

129 LOG.debug('url : %s', url) 

130 connection.request('GET', url) 

131 # Extract the response as the connection was successful 

132 response = connection.getresponse() 

133 # Read the response 

134 data = response.read() 

135 LOG.debug('response data: %s', data) 

136 

137 res_details['data'] = data 

138 res_details['error'] = None 

139 res_details['http_status'] = response.status 

140 

141 connection.close() 

142 return res_details 

143 

144 def execute_login(self): 

145 """Login and return sid.""" 

146 params = { 

147 'user': self.username, 

148 'pwd': base64.b64encode(self.password.encode("utf-8")), 

149 'serviceKey': '1', 

150 } 

151 sanitized_params = self._sanitize_params(params) 

152 

153 sanitized_params = urlparse.urlencode(sanitized_params) 

154 url = ('/cgi-bin/authLogin.cgi?%s' % sanitized_params) 

155 

156 res_details = self._execute_and_get_response_details(self.ip, url) 

157 root = etree.fromstring(res_details['data']) 

158 

159 if root.find('authPassed').text == '0': 

160 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

161 

162 session_id = root.find('authSid').text 

163 return session_id 

164 

165 def _login(self): 

166 """Execute Https Login API.""" 

167 self.sid = self.execute_login() 

168 LOG.debug('sid: %s', self.sid) 

169 

170 def _sanitize_params(self, params): 

171 sanitized_params = {} 

172 for key in params: 

173 value = params[key] 

174 if value is not None: 174 ↛ 172line 174 didn't jump to line 172 because the condition on line 174 was always true

175 if isinstance(value, list): 

176 sanitized_params[key] = [str(v) for v in value] 

177 else: 

178 sanitized_params[key] = str(value) 

179 return sanitized_params 

180 

181 @_connection_checker 

182 def create_share(self, share, pool_name, create_share_name, 

183 share_proto, **kwargs): 

184 """Create share.""" 

185 LOG.debug('create_share_name: %s', create_share_name) 

186 

187 params = { 

188 'wiz_func': 'share_create', 

189 'action': 'add_share', 

190 'vol_name': create_share_name, 

191 'vol_size': str(share['size']) + 'GB', 

192 'threshold': '80', 

193 'dedup': ('sha512' 

194 if kwargs['qnap_deduplication'] is True 

195 else 'off'), 

196 'compression': '1' if kwargs['qnap_compression'] is True else '0', 

197 'thin_pro': '1' if kwargs['qnap_thin_provision'] is True else '0', 

198 'cache': '1' if kwargs['qnap_ssd_cache'] is True else '0', 

199 'cifs_enable': '0' if share_proto == 'NFS' else '1', 

200 'nfs_enable': '0' if share_proto == 'CIFS' else '1', 

201 'afp_enable': '0', 

202 'ftp_enable': '0', 

203 'encryption': '0', 

204 'hidden': '0', 

205 'oplocks': '1', 

206 'sync': 'always', 

207 'userrw0': 'admin', 

208 'userrd_len': '0', 

209 'userrw_len': '1', 

210 'userno_len': '0', 

211 'access_r': 'setup_users', 

212 'path_type': 'auto', 

213 'recycle_bin': '1', 

214 'recycle_bin_administrators_only': '0', 

215 'pool_name': pool_name, 

216 'sid': self.sid, 

217 } 

218 sanitized_params = self._sanitize_params(params) 

219 

220 sanitized_params = urlparse.urlencode(sanitized_params) 

221 url = ('/cgi-bin/wizReq.cgi?%s' % sanitized_params) 

222 

223 res_details = self._execute_and_get_response_details(self.ip, url) 

224 root = etree.fromstring(res_details['data']) 

225 

226 if root.find('authPassed').text == '0': 

227 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

228 if root.find('ES_RET_CODE').text < '0': 

229 msg = _("Fail to create share %s on NAS.") % create_share_name 

230 LOG.error(msg) 

231 raise exception.ShareBackendException(msg=msg) 

232 

233 vol_list = root.find('func').find('ownContent').find('volumeList') 

234 vol_info_tree = vol_list.findall('volume') 

235 for vol in vol_info_tree: 

236 LOG.debug('Iterating vol name: %(name)s, index: %(id)s', 

237 {'name': vol.find('volumeLabel').text, 

238 'id': vol.find('volumeValue').text}) 

239 if (create_share_name == vol.find('volumeLabel').text): 

240 LOG.debug('volumeLabel:%s', vol.find('volumeLabel').text) 

241 return vol.find('volumeValue').text 

242 

243 return res_details['data'] 

244 

245 @_connection_checker 

246 def delete_share(self, vol_id, *args, **kwargs): 

247 """Execute delete share API.""" 

248 params = { 

249 'func': 'volume_mgmt', 

250 'vol_remove': '1', 

251 'volumeID': vol_id, 

252 'stop_service': 'no', 

253 'sid': self.sid, 

254 } 

255 sanitized_params = self._sanitize_params(params) 

256 

257 sanitized_params = urlparse.urlencode(sanitized_params) 

258 url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params) 

259 

260 res_details = self._execute_and_get_response_details(self.ip, url) 

261 root = etree.fromstring(res_details['data']) 

262 

263 if root.find('authPassed').text == '0': 

264 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

265 if root.find('result').text < '0': 

266 msg = _('Delete share id: %s failed') % vol_id 

267 raise exception.ShareBackendException(msg=msg) 

268 

269 @_connection_checker 

270 def get_specific_poolinfo(self, pool_id): 

271 """Execute get_specific_poolinfo API.""" 

272 params = { 

273 'store': 'poolInfo', 

274 'func': 'extra_get', 

275 'poolID': pool_id, 

276 'Pool_Info': '1', 

277 'sid': self.sid, 

278 } 

279 sanitized_params = self._sanitize_params(params) 

280 

281 sanitized_params = urlparse.urlencode(sanitized_params) 

282 url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params) 

283 

284 res_details = self._execute_and_get_response_details(self.ip, url) 

285 

286 root = etree.fromstring(res_details['data']) 

287 if root.find('authPassed').text == '0': 

288 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

289 if root.find('result').text < '0': 

290 msg = _('get_specific_poolinfo failed') 

291 raise exception.ShareBackendException(msg=msg) 

292 

293 pool_list = root.find('Pool_Index') 

294 pool_info_tree = pool_list.findall('row') 

295 for pool in pool_info_tree: 295 ↛ exitline 295 didn't return from function 'get_specific_poolinfo' because the loop on line 295 didn't complete

296 if pool_id == pool.find('poolID').text: 296 ↛ 295line 296 didn't jump to line 295 because the condition on line 296 was always true

297 LOG.debug('poolID: %s', pool.find('poolID').text) 

298 return pool 

299 

300 @_connection_checker 

301 def get_share_info(self, pool_id, **kwargs): 

302 """Execute get_share_info API.""" 

303 for key, value in kwargs.items(): 

304 LOG.debug('%(key)s = %(val)s', 

305 {'key': key, 'val': value}) 

306 

307 params = { 

308 'store': 'poolVolumeList', 

309 'poolID': pool_id, 

310 'func': 'extra_get', 

311 'Pool_Vol_Info': '1', 

312 'sid': self.sid, 

313 } 

314 sanitized_params = self._sanitize_params(params) 

315 

316 sanitized_params = urlparse.urlencode(sanitized_params) 

317 url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params) 

318 

319 res_details = self._execute_and_get_response_details(self.ip, url) 

320 root = etree.fromstring(res_details['data']) 

321 if root.find('authPassed').text == '0': 

322 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

323 

324 vol_list = root.find('Volume_Info') 

325 vol_info_tree = vol_list.findall('row') 

326 for vol in vol_info_tree: 

327 LOG.debug('Iterating vol name: %(name)s, index: %(id)s', 

328 {'name': vol.find('vol_label').text, 

329 'id': vol.find('vol_no').text}) 

330 if 'vol_no' in kwargs: 

331 if kwargs['vol_no'] == vol.find('vol_no').text: 331 ↛ 326line 331 didn't jump to line 326 because the condition on line 331 was always true

332 LOG.debug('vol_no:%s', 

333 vol.find('vol_no').text) 

334 return vol 

335 elif 'vol_label' in kwargs: 

336 if kwargs['vol_label'] == vol.find('vol_label').text: 336 ↛ 326line 336 didn't jump to line 326 because the condition on line 336 was always true

337 LOG.debug('vol_label:%s', vol.find('vol_label').text) 

338 return vol 

339 return None 

340 

341 @_connection_checker 

342 def get_specific_volinfo(self, vol_id, **kwargs): 

343 """Execute get_specific_volinfo API.""" 

344 params = { 

345 'store': 'volumeInfo', 

346 'volumeID': vol_id, 

347 'func': 'extra_get', 

348 'Volume_Info': '1', 

349 'sid': self.sid, 

350 } 

351 sanitized_params = self._sanitize_params(params) 

352 

353 sanitized_params = urlparse.urlencode(sanitized_params) 

354 url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params) 

355 

356 res_details = self._execute_and_get_response_details(self.ip, url) 

357 root = etree.fromstring(res_details['data']) 

358 if root.find('authPassed').text == '0': 

359 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

360 

361 vol_list = root.find('Volume_Info') 

362 vol_info_tree = vol_list.findall('row') 

363 for vol in vol_info_tree: 363 ↛ exitline 363 didn't return from function 'get_specific_volinfo' because the loop on line 363 didn't complete

364 if vol_id == vol.find('vol_no').text: 364 ↛ 363line 364 didn't jump to line 363 because the condition on line 364 was always true

365 LOG.debug('vol_no: %s', vol.find('vol_no').text) 

366 return vol 

367 

368 @_connection_checker 

369 def get_snapshot_info(self, **kwargs): 

370 """Execute get_snapshot_info API.""" 

371 params = { 

372 'func': 'extra_get', 

373 'volumeID': kwargs['volID'], 

374 'snapshot_list': '1', 

375 'snap_start': '0', 

376 'snap_count': '100', 

377 'sid': self.sid, 

378 } 

379 sanitized_params = self._sanitize_params(params) 

380 

381 sanitized_params = urlparse.urlencode(sanitized_params) 

382 url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params) 

383 

384 res_details = self._execute_and_get_response_details(self.ip, url) 

385 root = etree.fromstring(res_details['data']) 

386 if root.find('authPassed').text == '0': 

387 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

388 if root.find('result').text < '0': 

389 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP) 

390 

391 snapshot_list = root.find('SnapshotList') 

392 # if snapshot_list is None: 

393 if not snapshot_list: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true

394 return None 

395 if ('snapshot_name' in kwargs): 395 ↛ 405line 395 didn't jump to line 405 because the condition on line 395 was always true

396 snapshot_tree = snapshot_list.findall('row') 

397 for snapshot in snapshot_tree: 397 ↛ 405line 397 didn't jump to line 405 because the loop on line 397 didn't complete

398 if (kwargs['snapshot_name'] == 398 ↛ 402line 398 didn't jump to line 402 because the condition on line 398 was always true

399 snapshot.find('snapshot_name').text): 

400 LOG.debug('snapshot_name:%s', kwargs['snapshot_name']) 

401 return snapshot 

402 if (snapshot is snapshot_tree[-1]): 

403 return None 

404 

405 return res_details['data'] 

406 

407 @_connection_checker 

408 def create_snapshot_api(self, volumeID, snapshot_name): 

409 """Execute CGI to create snapshot from source share.""" 

410 LOG.debug('volumeID: %s', volumeID) 

411 LOG.debug('snapshot_name: %s', snapshot_name) 

412 

413 params = { 

414 'func': 'create_snapshot', 

415 'volumeID': volumeID, 

416 'snapshot_name': snapshot_name, 

417 'expire_min': '0', 

418 'vital': '1', 

419 'sid': self.sid, 

420 } 

421 sanitized_params = self._sanitize_params(params) 

422 

423 sanitized_params = urlparse.urlencode(sanitized_params) 

424 url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params) 

425 

426 res_details = self._execute_and_get_response_details(self.ip, url) 

427 root = etree.fromstring(res_details['data']) 

428 

429 if root.find('authPassed').text == '0': 

430 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

431 if root.find('ES_RET_CODE').text < '0': 

432 msg = _('Create snapshot failed') 

433 raise exception.ShareBackendException(msg=msg) 

434 

435 @_connection_checker 

436 def delete_snapshot_api(self, snapshot_id): 

437 """Execute CGI to delete snapshot from snapshot_id.""" 

438 params = { 

439 'func': 'del_snapshots', 

440 'snapshotID': snapshot_id, 

441 'sid': self.sid, 

442 } 

443 sanitized_params = self._sanitize_params(params) 

444 

445 sanitized_params = urlparse.urlencode(sanitized_params) 

446 url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params) 

447 

448 res_details = self._execute_and_get_response_details(self.ip, url) 

449 root = etree.fromstring(res_details['data']) 

450 if root.find('authPassed').text == '0': 

451 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

452 # snapshot not exist 

453 if root.find('result').text == '-206021': 

454 LOG.warning('Snapshot id %s does not exist', snapshot_id) 

455 return 

456 # share not exist 

457 if root.find('result').text == '-200005': 

458 LOG.warning('Share of snapshot id %s does not exist', snapshot_id) 

459 return 

460 if root.find('result').text < '0': 

461 msg = _('Failed to delete snapshot.') 

462 raise exception.ShareBackendException(msg=msg) 

463 

464 @_connection_checker 

465 def clone_snapshot(self, snapshot_id, new_sharename, clone_size): 

466 """Execute CGI to clone snapshot as share.""" 

467 params = { 

468 'func': 'clone_qsnapshot', 

469 'by_vol': '1', 

470 'snapshotID': snapshot_id, 

471 'new_name': new_sharename, 

472 'clone_size': '{}g'.format(clone_size), 

473 'sid': self.sid, 

474 } 

475 sanitized_params = self._sanitize_params(params) 

476 

477 sanitized_params = urlparse.urlencode(sanitized_params) 

478 url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params) 

479 

480 res_details = self._execute_and_get_response_details(self.ip, url) 

481 root = etree.fromstring(res_details['data']) 

482 if root.find('authPassed').text == '0': 

483 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

484 if root.find('result').text < '0': 

485 msg = _('Failed to clone snapshot.') 

486 raise exception.ShareBackendException(msg=msg) 

487 

488 @_connection_checker 

489 def edit_share(self, share_dict): 

490 """Edit share properties.""" 

491 LOG.debug('share_dict[sharename]: %s', share_dict['sharename']) 

492 

493 params = { 

494 'wiz_func': 'share_property', 

495 'action': 'share_property', 

496 'sharename': share_dict['sharename'], 

497 'old_sharename': share_dict['old_sharename'], 

498 'dedup': 'sha512' if share_dict['deduplication'] else 'off', 

499 'compression': '1' if share_dict['compression'] else '0', 

500 'thin_pro': '1' if share_dict['thin_provision'] else '0', 

501 'cache': '1' if share_dict['ssd_cache'] else '0', 

502 'cifs_enable': '1' if share_dict['share_proto'] == 'CIFS' else '0', 

503 'nfs_enable': '1' if share_dict['share_proto'] == 'NFS' else '0', 

504 'afp_enable': '0', 

505 'ftp_enable': '0', 

506 'hidden': '0', 

507 'oplocks': '1', 

508 'sync': 'always', 

509 'recycle_bin': '1', 

510 'recycle_bin_administrators_only': '0', 

511 'sid': self.sid, 

512 } 

513 if share_dict.get('new_size'): 513 ↛ 515line 513 didn't jump to line 515 because the condition on line 513 was always true

514 params['vol_size'] = str(share_dict['new_size']) + 'GB' 

515 sanitized_params = self._sanitize_params(params) 

516 

517 sanitized_params = urlparse.urlencode(sanitized_params) 

518 url = ('/cgi-bin/priv/privWizard.cgi?%s' % sanitized_params) 

519 

520 res_details = self._execute_and_get_response_details(self.ip, url) 

521 root = etree.fromstring(res_details['data']) 

522 

523 if root.find('authPassed').text == '0': 

524 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

525 if root.find('ES_RET_CODE').text < '0': 

526 msg = _('Edit sharename %s failed') % share_dict['sharename'] 

527 raise exception.ShareBackendException(msg=msg) 

528 

529 @_connection_checker 

530 def get_host_list(self, **kwargs): 

531 """Execute get_host_list API.""" 

532 params = { 

533 'module': 'hosts', 

534 'func': 'get_hostlist', 

535 'sid': self.sid, 

536 } 

537 sanitized_params = self._sanitize_params(params) 

538 

539 sanitized_params = urlparse.urlencode(sanitized_params) 

540 url = ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s' % 

541 sanitized_params) 

542 

543 res_details = self._execute_and_get_response_details(self.ip, url) 

544 root = etree.fromstring(res_details['data']) 

545 if root.find('authPassed').text == '0': 

546 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

547 if root.find('result').text < '0': 

548 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP) 

549 

550 host_list = root.find('content').find('host_list') 

551 # if host_list is None: 

552 if not host_list: 

553 return None 

554 

555 return_hosts = [] 

556 host_tree = host_list.findall('host') 

557 for host in host_tree: 

558 LOG.debug('host:%s', host) 

559 return_hosts.append(host) 

560 

561 return return_hosts 

562 

563 @_connection_checker 

564 def add_host(self, hostname, ipv4): 

565 """Execute add_host API.""" 

566 params = { 

567 'module': 'hosts', 

568 'func': 'apply_addhost', 

569 'name': hostname, 

570 'ipaddr_v4': ipv4, 

571 'sid': self.sid, 

572 } 

573 sanitized_params = self._sanitize_params(params) 

574 

575 sanitized_params = urlparse.urlencode(sanitized_params) 

576 url = ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s' % 

577 sanitized_params) 

578 

579 res_details = self._execute_and_get_response_details(self.ip, url) 

580 root = etree.fromstring(res_details['data']) 

581 if root.find('authPassed').text == '0': 

582 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

583 if root.find('result').text < '0': 

584 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP) 

585 

586 @_connection_checker 

587 def edit_host(self, hostname, ipv4_list): 

588 """Execute edit_host API.""" 

589 params = { 

590 'module': 'hosts', 

591 'func': 'apply_sethost', 

592 'name': hostname, 

593 'ipaddr_v4': ipv4_list, 

594 'sid': self.sid, 

595 } 

596 sanitized_params = self._sanitize_params(params) 

597 

598 # urlencode with True parameter to parse ipv4_list 

599 sanitized_params = urlparse.urlencode(sanitized_params, True) 

600 url = ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s' % 

601 sanitized_params) 

602 

603 res_details = self._execute_and_get_response_details(self.ip, url) 

604 root = etree.fromstring(res_details['data']) 

605 if root.find('authPassed').text == '0': 

606 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

607 if root.find('result').text < '0': 

608 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP) 

609 

610 @_connection_checker 

611 def delete_host(self, hostname): 

612 """Execute delete_host API.""" 

613 params = { 

614 'module': 'hosts', 

615 'func': 'apply_delhost', 

616 'host_name': hostname, 

617 'sid': self.sid, 

618 } 

619 sanitized_params = self._sanitize_params(params) 

620 

621 sanitized_params = urlparse.urlencode(sanitized_params) 

622 url = ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s' % 

623 sanitized_params) 

624 

625 res_details = self._execute_and_get_response_details(self.ip, url) 

626 root = etree.fromstring(res_details['data']) 

627 if root.find('authPassed').text == '0': 

628 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

629 if root.find('result').text < '0': 

630 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP) 

631 

632 @_connection_checker 

633 def set_nfs_access(self, sharename, access, host_name): 

634 """Execute set_nfs_access API.""" 

635 params = { 

636 'wiz_func': 'share_nfs_control', 

637 'action': 'share_nfs_control', 

638 'sharename': sharename, 

639 'access': access, 

640 'host_name': host_name, 

641 'sid': self.sid, 

642 } 

643 sanitized_params = self._sanitize_params(params) 

644 

645 sanitized_params = urlparse.urlencode(sanitized_params) 

646 url = ('/cgi-bin/priv/privWizard.cgi?%s' % sanitized_params) 

647 

648 res_details = self._execute_and_get_response_details(self.ip, url) 

649 root = etree.fromstring(res_details['data']) 

650 if root.find('authPassed').text == '0': 

651 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

652 if root.find('result').text < '0': 

653 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP) 

654 

655 

656class QnapAPIExecutorTS(QnapAPIExecutor): 

657 """Makes QNAP API calls for TS NAS.""" 

658 

659 @_connection_checker 

660 def get_snapshot_info(self, **kwargs): 

661 """Execute get_snapshot_info API.""" 

662 for key, value in kwargs.items(): 

663 LOG.debug('%(key)s = %(val)s', 

664 {'key': key, 'val': value}) 

665 

666 params = { 

667 'func': 'extra_get', 

668 'LUNIndex': kwargs['lun_index'], 

669 'smb_snapshot_list': '1', 

670 'smb_snapshot': '1', 

671 'snapshot_list': '1', 

672 'sid': self.sid, 

673 } 

674 sanitized_params = self._sanitize_params(params) 

675 

676 sanitized_params = urlparse.urlencode(sanitized_params) 

677 url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params) 

678 

679 res_details = self._execute_and_get_response_details(self.ip, url) 

680 root = etree.fromstring(res_details['data']) 

681 if root.find('authPassed').text == '0': 

682 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED) 

683 if root.find('result').text < '0': 

684 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP) 

685 

686 snapshot_list = root.find('SnapshotList') 

687 if snapshot_list is None: 687 ↛ 688line 687 didn't jump to line 688 because the condition on line 687 was never true

688 return None 

689 snapshot_tree = snapshot_list.findall('row') 

690 for snapshot in snapshot_tree: 690 ↛ 696line 690 didn't jump to line 696 because the loop on line 690 didn't complete

691 if (kwargs['snapshot_name'] == 691 ↛ 690line 691 didn't jump to line 690 because the condition on line 691 was always true

692 snapshot.find('snapshot_name').text): 

693 LOG.debug('snapshot_name:%s', kwargs['snapshot_name']) 

694 return snapshot 

695 

696 return None