Coverage for manila/share/drivers/qnap/api.py: 95%
380 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2016 QNAP Systems, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15"""
16API for QNAP Storage.
17"""
18import base64
19import functools
20from http import client as http_client
21import re
22import ssl
23from urllib import parse as urlparse
25from defusedxml import ElementTree as etree
26from oslo_log import log as logging
28from manila import exception
29from manila.i18n import _
30from manila import utils
32LOG = logging.getLogger(__name__)
33MSG_SESSION_EXPIRED = _("Session ID expired")
34MSG_UNEXPECT_RESP = _("Unexpected response from QNAP API")
37def _connection_checker(func):
38 """Decorator to check session has expired or not."""
39 @utils.retry(retry_param=exception.ShareBackendException,
40 retries=5)
41 @functools.wraps(func)
42 def inner_connection_checker(self, *args, **kwargs):
43 LOG.debug('in _connection_checker')
44 pattern = re.compile(r".*Session ID expired.$")
45 try:
46 return func(self, *args, **kwargs)
47 except exception.ShareBackendException as e:
48 matches = pattern.match(str(e))
49 if matches:
50 LOG.debug('Session might have expired.'
51 ' Trying to relogin')
52 self._login()
53 raise
54 return inner_connection_checker
57class QnapAPIExecutor(object):
58 """Makes QNAP API calls for ES NAS."""
60 def __init__(self, *args, **kwargs):
61 self.sid = None
62 self.username = kwargs['username']
63 self.password = kwargs['password']
64 self.ip, self.port, self.ssl = (
65 self._parse_management_url(kwargs['management_url']))
66 self._login()
68 def _parse_management_url(self, management_url):
69 pattern = re.compile(r"(http|https)\:\/\/(\S+)\:(\d+)")
70 matches = pattern.match(management_url)
71 if matches.group(1) == 'http':
72 management_ssl = False
73 else:
74 management_ssl = True
75 management_ip = matches.group(2)
76 management_port = matches.group(3)
77 return management_ip, management_port, management_ssl
79 def _prepare_connection(self, isSSL, ip, port):
80 if isSSL:
81 if hasattr(ssl, '_create_unverified_context'): 81 ↛ 87line 81 didn't jump to line 87 because the condition on line 81 was always true
82 context = ssl._create_unverified_context() # nosec B323
83 connection = http_client.HTTPSConnection(ip,
84 port=port,
85 context=context)
86 else:
87 connection = http_client.HTTPSConnection(ip,
88 port=port)
89 else:
90 connection = http_client.HTTPConnection(ip, port)
91 return connection
93 def get_basic_info(self, management_url):
94 """Get the basic information of NAS."""
95 LOG.debug('in get_basic_info')
96 management_ip, management_port, management_ssl = (
97 self._parse_management_url(management_url))
98 connection = self._prepare_connection(management_ssl,
99 management_ip,
100 management_port)
102 connection.request('GET', '/cgi-bin/authLogin.cgi')
103 response = connection.getresponse()
104 data = response.read()
105 LOG.debug('response data: %s', data)
107 root = etree.fromstring(data)
109 display_model_name = root.find('model/displayModelName').text
110 internal_model_name = root.find('model/internalModelName').text
111 fw_version = root.find('firmware/version').text
113 connection.close()
114 return display_model_name, internal_model_name, fw_version
116 def _execute_and_get_response_details(self, nas_ip, url):
117 """Will prepare response after executing a http request."""
118 LOG.debug('port: %(port)s, ssl: %(ssl)s',
119 {'port': self.port, 'ssl': self.ssl})
121 res_details = {}
123 # Prepare the connection
124 connection = self._prepare_connection(self.ssl,
125 nas_ip,
126 self.port)
128 # Make the connection
129 LOG.debug('url : %s', url)
130 connection.request('GET', url)
131 # Extract the response as the connection was successful
132 response = connection.getresponse()
133 # Read the response
134 data = response.read()
135 LOG.debug('response data: %s', data)
137 res_details['data'] = data
138 res_details['error'] = None
139 res_details['http_status'] = response.status
141 connection.close()
142 return res_details
144 def execute_login(self):
145 """Login and return sid."""
146 params = {
147 'user': self.username,
148 'pwd': base64.b64encode(self.password.encode("utf-8")),
149 'serviceKey': '1',
150 }
151 sanitized_params = self._sanitize_params(params)
153 sanitized_params = urlparse.urlencode(sanitized_params)
154 url = ('/cgi-bin/authLogin.cgi?%s' % sanitized_params)
156 res_details = self._execute_and_get_response_details(self.ip, url)
157 root = etree.fromstring(res_details['data'])
159 if root.find('authPassed').text == '0':
160 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
162 session_id = root.find('authSid').text
163 return session_id
165 def _login(self):
166 """Execute Https Login API."""
167 self.sid = self.execute_login()
168 LOG.debug('sid: %s', self.sid)
170 def _sanitize_params(self, params):
171 sanitized_params = {}
172 for key in params:
173 value = params[key]
174 if value is not None: 174 ↛ 172line 174 didn't jump to line 172 because the condition on line 174 was always true
175 if isinstance(value, list):
176 sanitized_params[key] = [str(v) for v in value]
177 else:
178 sanitized_params[key] = str(value)
179 return sanitized_params
181 @_connection_checker
182 def create_share(self, share, pool_name, create_share_name,
183 share_proto, **kwargs):
184 """Create share."""
185 LOG.debug('create_share_name: %s', create_share_name)
187 params = {
188 'wiz_func': 'share_create',
189 'action': 'add_share',
190 'vol_name': create_share_name,
191 'vol_size': str(share['size']) + 'GB',
192 'threshold': '80',
193 'dedup': ('sha512'
194 if kwargs['qnap_deduplication'] is True
195 else 'off'),
196 'compression': '1' if kwargs['qnap_compression'] is True else '0',
197 'thin_pro': '1' if kwargs['qnap_thin_provision'] is True else '0',
198 'cache': '1' if kwargs['qnap_ssd_cache'] is True else '0',
199 'cifs_enable': '0' if share_proto == 'NFS' else '1',
200 'nfs_enable': '0' if share_proto == 'CIFS' else '1',
201 'afp_enable': '0',
202 'ftp_enable': '0',
203 'encryption': '0',
204 'hidden': '0',
205 'oplocks': '1',
206 'sync': 'always',
207 'userrw0': 'admin',
208 'userrd_len': '0',
209 'userrw_len': '1',
210 'userno_len': '0',
211 'access_r': 'setup_users',
212 'path_type': 'auto',
213 'recycle_bin': '1',
214 'recycle_bin_administrators_only': '0',
215 'pool_name': pool_name,
216 'sid': self.sid,
217 }
218 sanitized_params = self._sanitize_params(params)
220 sanitized_params = urlparse.urlencode(sanitized_params)
221 url = ('/cgi-bin/wizReq.cgi?%s' % sanitized_params)
223 res_details = self._execute_and_get_response_details(self.ip, url)
224 root = etree.fromstring(res_details['data'])
226 if root.find('authPassed').text == '0':
227 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
228 if root.find('ES_RET_CODE').text < '0':
229 msg = _("Fail to create share %s on NAS.") % create_share_name
230 LOG.error(msg)
231 raise exception.ShareBackendException(msg=msg)
233 vol_list = root.find('func').find('ownContent').find('volumeList')
234 vol_info_tree = vol_list.findall('volume')
235 for vol in vol_info_tree:
236 LOG.debug('Iterating vol name: %(name)s, index: %(id)s',
237 {'name': vol.find('volumeLabel').text,
238 'id': vol.find('volumeValue').text})
239 if (create_share_name == vol.find('volumeLabel').text):
240 LOG.debug('volumeLabel:%s', vol.find('volumeLabel').text)
241 return vol.find('volumeValue').text
243 return res_details['data']
245 @_connection_checker
246 def delete_share(self, vol_id, *args, **kwargs):
247 """Execute delete share API."""
248 params = {
249 'func': 'volume_mgmt',
250 'vol_remove': '1',
251 'volumeID': vol_id,
252 'stop_service': 'no',
253 'sid': self.sid,
254 }
255 sanitized_params = self._sanitize_params(params)
257 sanitized_params = urlparse.urlencode(sanitized_params)
258 url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
260 res_details = self._execute_and_get_response_details(self.ip, url)
261 root = etree.fromstring(res_details['data'])
263 if root.find('authPassed').text == '0':
264 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
265 if root.find('result').text < '0':
266 msg = _('Delete share id: %s failed') % vol_id
267 raise exception.ShareBackendException(msg=msg)
269 @_connection_checker
270 def get_specific_poolinfo(self, pool_id):
271 """Execute get_specific_poolinfo API."""
272 params = {
273 'store': 'poolInfo',
274 'func': 'extra_get',
275 'poolID': pool_id,
276 'Pool_Info': '1',
277 'sid': self.sid,
278 }
279 sanitized_params = self._sanitize_params(params)
281 sanitized_params = urlparse.urlencode(sanitized_params)
282 url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
284 res_details = self._execute_and_get_response_details(self.ip, url)
286 root = etree.fromstring(res_details['data'])
287 if root.find('authPassed').text == '0':
288 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
289 if root.find('result').text < '0':
290 msg = _('get_specific_poolinfo failed')
291 raise exception.ShareBackendException(msg=msg)
293 pool_list = root.find('Pool_Index')
294 pool_info_tree = pool_list.findall('row')
295 for pool in pool_info_tree: 295 ↛ exitline 295 didn't return from function 'get_specific_poolinfo' because the loop on line 295 didn't complete
296 if pool_id == pool.find('poolID').text: 296 ↛ 295line 296 didn't jump to line 295 because the condition on line 296 was always true
297 LOG.debug('poolID: %s', pool.find('poolID').text)
298 return pool
300 @_connection_checker
301 def get_share_info(self, pool_id, **kwargs):
302 """Execute get_share_info API."""
303 for key, value in kwargs.items():
304 LOG.debug('%(key)s = %(val)s',
305 {'key': key, 'val': value})
307 params = {
308 'store': 'poolVolumeList',
309 'poolID': pool_id,
310 'func': 'extra_get',
311 'Pool_Vol_Info': '1',
312 'sid': self.sid,
313 }
314 sanitized_params = self._sanitize_params(params)
316 sanitized_params = urlparse.urlencode(sanitized_params)
317 url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
319 res_details = self._execute_and_get_response_details(self.ip, url)
320 root = etree.fromstring(res_details['data'])
321 if root.find('authPassed').text == '0':
322 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
324 vol_list = root.find('Volume_Info')
325 vol_info_tree = vol_list.findall('row')
326 for vol in vol_info_tree:
327 LOG.debug('Iterating vol name: %(name)s, index: %(id)s',
328 {'name': vol.find('vol_label').text,
329 'id': vol.find('vol_no').text})
330 if 'vol_no' in kwargs:
331 if kwargs['vol_no'] == vol.find('vol_no').text: 331 ↛ 326line 331 didn't jump to line 326 because the condition on line 331 was always true
332 LOG.debug('vol_no:%s',
333 vol.find('vol_no').text)
334 return vol
335 elif 'vol_label' in kwargs:
336 if kwargs['vol_label'] == vol.find('vol_label').text: 336 ↛ 326line 336 didn't jump to line 326 because the condition on line 336 was always true
337 LOG.debug('vol_label:%s', vol.find('vol_label').text)
338 return vol
339 return None
341 @_connection_checker
342 def get_specific_volinfo(self, vol_id, **kwargs):
343 """Execute get_specific_volinfo API."""
344 params = {
345 'store': 'volumeInfo',
346 'volumeID': vol_id,
347 'func': 'extra_get',
348 'Volume_Info': '1',
349 'sid': self.sid,
350 }
351 sanitized_params = self._sanitize_params(params)
353 sanitized_params = urlparse.urlencode(sanitized_params)
354 url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
356 res_details = self._execute_and_get_response_details(self.ip, url)
357 root = etree.fromstring(res_details['data'])
358 if root.find('authPassed').text == '0':
359 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
361 vol_list = root.find('Volume_Info')
362 vol_info_tree = vol_list.findall('row')
363 for vol in vol_info_tree: 363 ↛ exitline 363 didn't return from function 'get_specific_volinfo' because the loop on line 363 didn't complete
364 if vol_id == vol.find('vol_no').text: 364 ↛ 363line 364 didn't jump to line 363 because the condition on line 364 was always true
365 LOG.debug('vol_no: %s', vol.find('vol_no').text)
366 return vol
368 @_connection_checker
369 def get_snapshot_info(self, **kwargs):
370 """Execute get_snapshot_info API."""
371 params = {
372 'func': 'extra_get',
373 'volumeID': kwargs['volID'],
374 'snapshot_list': '1',
375 'snap_start': '0',
376 'snap_count': '100',
377 'sid': self.sid,
378 }
379 sanitized_params = self._sanitize_params(params)
381 sanitized_params = urlparse.urlencode(sanitized_params)
382 url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
384 res_details = self._execute_and_get_response_details(self.ip, url)
385 root = etree.fromstring(res_details['data'])
386 if root.find('authPassed').text == '0':
387 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
388 if root.find('result').text < '0':
389 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
391 snapshot_list = root.find('SnapshotList')
392 # if snapshot_list is None:
393 if not snapshot_list: 393 ↛ 394line 393 didn't jump to line 394 because the condition on line 393 was never true
394 return None
395 if ('snapshot_name' in kwargs): 395 ↛ 405line 395 didn't jump to line 405 because the condition on line 395 was always true
396 snapshot_tree = snapshot_list.findall('row')
397 for snapshot in snapshot_tree: 397 ↛ 405line 397 didn't jump to line 405 because the loop on line 397 didn't complete
398 if (kwargs['snapshot_name'] == 398 ↛ 402line 398 didn't jump to line 402 because the condition on line 398 was always true
399 snapshot.find('snapshot_name').text):
400 LOG.debug('snapshot_name:%s', kwargs['snapshot_name'])
401 return snapshot
402 if (snapshot is snapshot_tree[-1]):
403 return None
405 return res_details['data']
407 @_connection_checker
408 def create_snapshot_api(self, volumeID, snapshot_name):
409 """Execute CGI to create snapshot from source share."""
410 LOG.debug('volumeID: %s', volumeID)
411 LOG.debug('snapshot_name: %s', snapshot_name)
413 params = {
414 'func': 'create_snapshot',
415 'volumeID': volumeID,
416 'snapshot_name': snapshot_name,
417 'expire_min': '0',
418 'vital': '1',
419 'sid': self.sid,
420 }
421 sanitized_params = self._sanitize_params(params)
423 sanitized_params = urlparse.urlencode(sanitized_params)
424 url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
426 res_details = self._execute_and_get_response_details(self.ip, url)
427 root = etree.fromstring(res_details['data'])
429 if root.find('authPassed').text == '0':
430 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
431 if root.find('ES_RET_CODE').text < '0':
432 msg = _('Create snapshot failed')
433 raise exception.ShareBackendException(msg=msg)
435 @_connection_checker
436 def delete_snapshot_api(self, snapshot_id):
437 """Execute CGI to delete snapshot from snapshot_id."""
438 params = {
439 'func': 'del_snapshots',
440 'snapshotID': snapshot_id,
441 'sid': self.sid,
442 }
443 sanitized_params = self._sanitize_params(params)
445 sanitized_params = urlparse.urlencode(sanitized_params)
446 url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
448 res_details = self._execute_and_get_response_details(self.ip, url)
449 root = etree.fromstring(res_details['data'])
450 if root.find('authPassed').text == '0':
451 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
452 # snapshot not exist
453 if root.find('result').text == '-206021':
454 LOG.warning('Snapshot id %s does not exist', snapshot_id)
455 return
456 # share not exist
457 if root.find('result').text == '-200005':
458 LOG.warning('Share of snapshot id %s does not exist', snapshot_id)
459 return
460 if root.find('result').text < '0':
461 msg = _('Failed to delete snapshot.')
462 raise exception.ShareBackendException(msg=msg)
464 @_connection_checker
465 def clone_snapshot(self, snapshot_id, new_sharename, clone_size):
466 """Execute CGI to clone snapshot as share."""
467 params = {
468 'func': 'clone_qsnapshot',
469 'by_vol': '1',
470 'snapshotID': snapshot_id,
471 'new_name': new_sharename,
472 'clone_size': '{}g'.format(clone_size),
473 'sid': self.sid,
474 }
475 sanitized_params = self._sanitize_params(params)
477 sanitized_params = urlparse.urlencode(sanitized_params)
478 url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
480 res_details = self._execute_and_get_response_details(self.ip, url)
481 root = etree.fromstring(res_details['data'])
482 if root.find('authPassed').text == '0':
483 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
484 if root.find('result').text < '0':
485 msg = _('Failed to clone snapshot.')
486 raise exception.ShareBackendException(msg=msg)
488 @_connection_checker
489 def edit_share(self, share_dict):
490 """Edit share properties."""
491 LOG.debug('share_dict[sharename]: %s', share_dict['sharename'])
493 params = {
494 'wiz_func': 'share_property',
495 'action': 'share_property',
496 'sharename': share_dict['sharename'],
497 'old_sharename': share_dict['old_sharename'],
498 'dedup': 'sha512' if share_dict['deduplication'] else 'off',
499 'compression': '1' if share_dict['compression'] else '0',
500 'thin_pro': '1' if share_dict['thin_provision'] else '0',
501 'cache': '1' if share_dict['ssd_cache'] else '0',
502 'cifs_enable': '1' if share_dict['share_proto'] == 'CIFS' else '0',
503 'nfs_enable': '1' if share_dict['share_proto'] == 'NFS' else '0',
504 'afp_enable': '0',
505 'ftp_enable': '0',
506 'hidden': '0',
507 'oplocks': '1',
508 'sync': 'always',
509 'recycle_bin': '1',
510 'recycle_bin_administrators_only': '0',
511 'sid': self.sid,
512 }
513 if share_dict.get('new_size'): 513 ↛ 515line 513 didn't jump to line 515 because the condition on line 513 was always true
514 params['vol_size'] = str(share_dict['new_size']) + 'GB'
515 sanitized_params = self._sanitize_params(params)
517 sanitized_params = urlparse.urlencode(sanitized_params)
518 url = ('/cgi-bin/priv/privWizard.cgi?%s' % sanitized_params)
520 res_details = self._execute_and_get_response_details(self.ip, url)
521 root = etree.fromstring(res_details['data'])
523 if root.find('authPassed').text == '0':
524 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
525 if root.find('ES_RET_CODE').text < '0':
526 msg = _('Edit sharename %s failed') % share_dict['sharename']
527 raise exception.ShareBackendException(msg=msg)
529 @_connection_checker
530 def get_host_list(self, **kwargs):
531 """Execute get_host_list API."""
532 params = {
533 'module': 'hosts',
534 'func': 'get_hostlist',
535 'sid': self.sid,
536 }
537 sanitized_params = self._sanitize_params(params)
539 sanitized_params = urlparse.urlencode(sanitized_params)
540 url = ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s' %
541 sanitized_params)
543 res_details = self._execute_and_get_response_details(self.ip, url)
544 root = etree.fromstring(res_details['data'])
545 if root.find('authPassed').text == '0':
546 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
547 if root.find('result').text < '0':
548 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
550 host_list = root.find('content').find('host_list')
551 # if host_list is None:
552 if not host_list:
553 return None
555 return_hosts = []
556 host_tree = host_list.findall('host')
557 for host in host_tree:
558 LOG.debug('host:%s', host)
559 return_hosts.append(host)
561 return return_hosts
563 @_connection_checker
564 def add_host(self, hostname, ipv4):
565 """Execute add_host API."""
566 params = {
567 'module': 'hosts',
568 'func': 'apply_addhost',
569 'name': hostname,
570 'ipaddr_v4': ipv4,
571 'sid': self.sid,
572 }
573 sanitized_params = self._sanitize_params(params)
575 sanitized_params = urlparse.urlencode(sanitized_params)
576 url = ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s' %
577 sanitized_params)
579 res_details = self._execute_and_get_response_details(self.ip, url)
580 root = etree.fromstring(res_details['data'])
581 if root.find('authPassed').text == '0':
582 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
583 if root.find('result').text < '0':
584 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
586 @_connection_checker
587 def edit_host(self, hostname, ipv4_list):
588 """Execute edit_host API."""
589 params = {
590 'module': 'hosts',
591 'func': 'apply_sethost',
592 'name': hostname,
593 'ipaddr_v4': ipv4_list,
594 'sid': self.sid,
595 }
596 sanitized_params = self._sanitize_params(params)
598 # urlencode with True parameter to parse ipv4_list
599 sanitized_params = urlparse.urlencode(sanitized_params, True)
600 url = ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s' %
601 sanitized_params)
603 res_details = self._execute_and_get_response_details(self.ip, url)
604 root = etree.fromstring(res_details['data'])
605 if root.find('authPassed').text == '0':
606 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
607 if root.find('result').text < '0':
608 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
610 @_connection_checker
611 def delete_host(self, hostname):
612 """Execute delete_host API."""
613 params = {
614 'module': 'hosts',
615 'func': 'apply_delhost',
616 'host_name': hostname,
617 'sid': self.sid,
618 }
619 sanitized_params = self._sanitize_params(params)
621 sanitized_params = urlparse.urlencode(sanitized_params)
622 url = ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s' %
623 sanitized_params)
625 res_details = self._execute_and_get_response_details(self.ip, url)
626 root = etree.fromstring(res_details['data'])
627 if root.find('authPassed').text == '0':
628 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
629 if root.find('result').text < '0':
630 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
632 @_connection_checker
633 def set_nfs_access(self, sharename, access, host_name):
634 """Execute set_nfs_access API."""
635 params = {
636 'wiz_func': 'share_nfs_control',
637 'action': 'share_nfs_control',
638 'sharename': sharename,
639 'access': access,
640 'host_name': host_name,
641 'sid': self.sid,
642 }
643 sanitized_params = self._sanitize_params(params)
645 sanitized_params = urlparse.urlencode(sanitized_params)
646 url = ('/cgi-bin/priv/privWizard.cgi?%s' % sanitized_params)
648 res_details = self._execute_and_get_response_details(self.ip, url)
649 root = etree.fromstring(res_details['data'])
650 if root.find('authPassed').text == '0':
651 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
652 if root.find('result').text < '0':
653 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
656class QnapAPIExecutorTS(QnapAPIExecutor):
657 """Makes QNAP API calls for TS NAS."""
659 @_connection_checker
660 def get_snapshot_info(self, **kwargs):
661 """Execute get_snapshot_info API."""
662 for key, value in kwargs.items():
663 LOG.debug('%(key)s = %(val)s',
664 {'key': key, 'val': value})
666 params = {
667 'func': 'extra_get',
668 'LUNIndex': kwargs['lun_index'],
669 'smb_snapshot_list': '1',
670 'smb_snapshot': '1',
671 'snapshot_list': '1',
672 'sid': self.sid,
673 }
674 sanitized_params = self._sanitize_params(params)
676 sanitized_params = urlparse.urlencode(sanitized_params)
677 url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
679 res_details = self._execute_and_get_response_details(self.ip, url)
680 root = etree.fromstring(res_details['data'])
681 if root.find('authPassed').text == '0':
682 raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
683 if root.find('result').text < '0':
684 raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
686 snapshot_list = root.find('SnapshotList')
687 if snapshot_list is None: 687 ↛ 688line 687 didn't jump to line 688 because the condition on line 687 was never true
688 return None
689 snapshot_tree = snapshot_list.findall('row')
690 for snapshot in snapshot_tree: 690 ↛ 696line 690 didn't jump to line 696 because the loop on line 690 didn't complete
691 if (kwargs['snapshot_name'] == 691 ↛ 690line 691 didn't jump to line 690 because the condition on line 691 was always true
692 snapshot.find('snapshot_name').text):
693 LOG.debug('snapshot_name:%s', kwargs['snapshot_name'])
694 return snapshot
696 return None