Coverage for manila/share/drivers/macrosan/rest_helper.py: 97%
386 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2022 MacroSAN Technologies Co., Ltd.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import requests
18from oslo_log import log
20from manila import exception
21from manila.i18n import _
22from manila.share.drivers.macrosan import macrosan_constants as constants
23from manila import utils
25LOG = log.getLogger(__name__)
28class RestHelper(object):
29 def __init__(self, configuration):
30 self.configuration = configuration
31 self._protocol = self.configuration.macrosan_nas_http_protocol
32 self._ip = self.configuration.macrosan_nas_ip
33 self._port = self.configuration.macrosan_nas_port
34 self._prefix = self.configuration.macrosan_nas_prefix
35 self._token = None
36 self._username = self.configuration.macrosan_nas_username
37 self._password = self.configuration.macrosan_nas_password
38 self.request_timeout = self.configuration.macrosan_timeout
39 self.ssl_verify = self.configuration.macrosan_ssl_cert_verify
40 if not self.ssl_verify: 40 ↛ exitline 40 didn't return from function '__init__' because the condition on line 40 was always true
41 # Suppress the Insecure request warnings
42 requests.packages.urllib3.disable_warnings(
43 requests.packages.urllib3.exceptions.InsecureRequestWarning)
45 @utils.synchronized('macrosan_manila')
46 def call(self, url, data, method):
47 """Send requests.
49 If token is expired,re-login.
50 """
51 header = {'Authorization': self._token}
52 if self._token is None:
53 self.login()
55 result = self.do_request(url, data, method, header)
56 if result['code'] == constants.TOKEN_EXPIRED:
57 LOG.error("Token is expired, re-login.")
58 self.login()
59 # token refresh, Re-assign
60 header['Authorization'] = self._token
61 result = self.do_request(url, data, method, header)
62 elif (result['code'] == constants.TOKEN_FORMAT_ERROR or
63 result['code'] == constants.TOKEN_VERIFY_FAILED or
64 result['code'] == constants.TOKEN_REQUIRED):
65 msg = _('Token authentication error.')
66 LOG.error(msg)
67 raise exception.MacrosanBackendExeption(msg)
68 return result
70 def do_request(self, url, data, method, header=None):
71 final_url = (f'{self._protocol}://{self._ip}:{self._port}/'
72 f'{self._prefix}/{url}')
73 LOG.debug(f'Request URL: {final_url}, Method: {method}, Data: {data}')
75 if method == 'POST':
76 res = requests.post(final_url, data=data, headers=header,
77 timeout=self.request_timeout,
78 verify=self.ssl_verify)
79 elif method == 'GET':
80 res = requests.get(final_url, data=data, headers=header,
81 timeout=self.request_timeout,
82 verify=self.ssl_verify)
83 elif method == 'PUT':
84 res = requests.put(final_url, data=data, headers=header,
85 timeout=self.request_timeout,
86 verify=self.ssl_verify)
87 elif method == 'DELETE':
88 res = requests.delete(final_url, data=data, headers=header,
89 timeout=self.request_timeout,
90 verify=self.ssl_verify)
91 else:
92 msg = (_("Request method %s invalid.") % method)
93 raise exception.ShareBackendException(msg=msg)
95 code = res.status_code
96 if code != 200:
97 msg = (_('Code: %(code)s, URL: %(url)s, Message: %(msg)s')
98 % {'code': res.status_code,
99 'url': final_url,
100 'msg': res.text})
101 LOG.error(msg)
102 raise exception.NetworkException(msg)
103 response = res.json()
104 LOG.debug('CODE: %(code)s, RESPONSE: %(response)s',
105 {'code': code, 'response': response})
106 return response
108 def login(self):
109 """Login array and return token."""
110 url = 'rest/token'
112 data = {'userName': self._username,
113 'userPasswd': self._password}
114 result = self.do_request(url, data, 'POST')
115 if result['code'] != 0:
116 msg = f"Login failed. code: {result['code']}"
117 msg = _(msg)
118 LOG.error(msg)
119 raise exception.ShareBackendException(msg=msg)
120 LOG.debug(f'Login successful. URL {self._ip}\n')
121 self._token = result['data']
123 def _assert_result_code(self, result, msg):
124 if (result['code'] != constants.CODE_SUCCESS
125 and result['code'] != constants.CODE_NOT_FOUND):
126 error_msg = (_('%(err)s\nresult: %(res)s.') % {'err': msg,
127 'res': result})
128 LOG.error(error_msg)
129 raise exception.ShareBackendException(msg=error_msg)
131 def _assert_result_data(self, result, msg):
132 if "data" not in result:
133 error_msg = (_('Error:"data" not in result. %s') % msg)
134 LOG.error(error_msg)
135 raise exception.ShareBackendException(msg=error_msg)
137 def _create_nfs_share(self, share_path):
138 url = 'rest/nfsShare'
139 # IPv4 Address Blocks Reserved for Documentation
140 params = {
141 'path': share_path,
142 'authority': 'ro',
143 'accessClient': '192.0.2.0',
144 }
145 result = self.call(url, params, 'POST')
147 msg = 'Failed to create a nfs share.'
148 self._assert_result_code(result, msg)
150 def _get_nfs_share(self, share_path):
151 # GET method: param need be after url
152 url = f'rest/nfsShare?path={share_path}'
153 result = self.call(url, None, 'GET')
155 msg = 'Failed to get nfs share.'
156 self._assert_result_code(result, msg)
157 return result['data']
159 def _delete_nfs_share(self, share_path):
160 url = f'rest/nfsShare?path={share_path}'
161 result = self.call(url, None, 'DELETE')
163 msg = 'Failed to delete nfs share.'
164 self._assert_result_code(result, msg)
166 def _create_cifs_share(self, share_name, share_path,
167 rw_list, rw_list_type):
168 url = 'rest/cifsShare'
170 params = {
171 'path': share_path,
172 'cifsName': share_name,
173 'cifsDescription': '',
174 'RoList': [],
175 'RoListType': [],
176 'RwList': rw_list,
177 'RwListType': rw_list_type,
178 'allowList': [],
179 'denyList': [],
180 }
181 result = self.call(url, params, 'POST')
183 msg = 'Failed to create a CIFS share.'
184 self._assert_result_code(result, msg)
186 def _get_cifs_share(self, share_path):
187 url = f'rest/cifsShare?path={share_path}'
188 result = self.call(url, None, 'GET')
190 msg = 'Failed to get the cifs share.'
191 self._assert_result_code(result, msg)
192 return result['data']
194 def _delete_cifs_share(self, share_name, share_path):
195 url = f'rest/cifsShare?path={share_path}&cifsName={share_name}'
196 result = self.call(url, None, 'DELETE')
197 msg = 'Failed to delete the cifs share.'
199 self._assert_result_code(result, msg)
201 def _update_share_size(self, fs_name, new_size):
202 url = f'rest/filesystem/{fs_name}'
204 params = {
205 'capacity': new_size,
206 }
208 result = self.call(url, params, 'PUT')
209 msg = 'Failed to update the filesystem size.'
211 self._assert_result_code(result, msg)
213 def _create_filesystem(self, fs_name, pool_name, filesystem_quota):
214 url = 'rest/filesystem'
215 fsinfo = {
216 'fsName': fs_name,
217 'poolName': pool_name,
218 'createType': '0',
219 'fileSystemQuota': filesystem_quota,
220 'fileSystemReserve': filesystem_quota,
221 'wormStatus': 0,
222 'defaultTimeStatus': 0,
223 'defaultTimeNum': 0,
224 'defaultTimeUnit': 'year',
225 'isAutoLock': 0,
226 'isAutoDelete': 0,
227 'lockTime': 0
228 }
229 result = self.call(url, fsinfo, 'POST')
231 msg = 'Failed to create the filesystem.'
232 self._assert_result_code(result, msg)
234 def _delete_filesystem(self, fs_name):
235 """Delete filesystem"""
236 url = f'rest/filesystem/{fs_name}'
237 result = self.call(url, None, 'DELETE')
239 msg = 'Failed to delete the filesystem.'
240 self._assert_result_code(result, msg)
242 def _get_filesystem(self, fs_name):
243 """Get filesystem """
244 url = f'rest/filesystem/{fs_name}'
245 result = self.call(url, None, 'GET')
247 msg = 'Failed to get the filesystem.'
248 self._assert_result_code(result, msg)
250 return result['data']
252 def _create_filesystem_dir(self, share_path):
253 url = 'rest/fileDir'
254 slash = share_path.index(r'/', 1)
255 dir_info = {
256 'path': share_path[0: slash],
257 'dirName': share_path[slash + 1:],
258 }
259 result = self.call(url, dir_info, 'POST')
261 msg = 'Failed to create the filesystem directory.'
262 self._assert_result_code(result, msg)
264 def _delete_filesystem_dir(self, share_path):
265 slash = share_path.index(r'/', 1)
266 url = f'rest/fileDir?path={share_path[0: slash]}' \
267 f'&dirName={share_path[slash + 1:]}'
269 result = self.call(url, None, 'DELETE')
271 msg = 'Failed to delete the filesystem directory.'
272 self._assert_result_code(result, msg)
274 def _allow_access_rest(self, share_path, access_to,
275 access_level, share_proto):
276 """Allow access to the share."""
277 if share_proto == 'NFS':
278 self._allow_nfs_access_rest(share_path, access_to, access_level)
279 elif share_proto == 'CIFS':
280 self._allow_cifs_access_rest(share_path, access_to, access_level)
281 else:
282 raise exception.InvalidInput(
283 reason=(_('Invalid Nas protocol: %s.') % share_proto))
285 def _allow_nfs_access_rest(self, share_path, access_to, access_level):
286 url = 'rest/nfsShareClient'
287 access = {
288 'path': share_path,
289 'client': access_to,
290 'authority': access_level,
291 }
292 result = self.call(url, access, 'POST')
294 msg = 'Failed to allow access to the NFS share.'
295 self._assert_result_code(result, msg)
297 def _allow_cifs_access_rest(self, share_path, access_to, access_level):
298 url = 'rest/cifsShareClient'
299 ug_type = {
300 'localUser': '0',
301 'localGroup': '1',
302 'adUser': '2',
303 'adGroup': '3',
304 }
306 msg = 'Failed to allow access to the CIFS share.'
307 access_info = (f'Access info (access_to: {access_to},'
308 f'access_level: {access_level},'
309 f'path: {share_path}.)')
311 def send_rest(rest_access_to, rest_ug_type):
312 access = {
313 'path': share_path,
314 'right': access_level,
315 'ugName': rest_access_to,
316 'ugType': rest_ug_type,
317 }
318 result = self.call(url, access, 'POST')
319 err_code = result['code']
320 if err_code == constants.CODE_SUCCESS:
321 return True
322 elif err_code != constants.CODE_SOURCE_NOT_EXIST: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true
323 self._assert_result_code(result, msg)
324 return False
326 if '/' not in access_to:
327 # First, try to add local user access
328 LOG.debug('Attempting to add local user access. %s', access_info)
329 if send_rest(access_to, ug_type['localUser']):
330 return
331 # Second,If add local user access failed,
332 # try to add local group access
333 LOG.debug('Failed add local user access,'
334 ' attempting to add local group access. %s', access_info)
335 if send_rest(access_to, ug_type['localGroup']):
336 return
337 else:
338 str = access_to.index('/')
339 access_to = access_to[str + 1:]
340 # First, add domain user access
341 LOG.debug('Attempting to add domain user access. %s', access_info)
342 if send_rest(access_to, ug_type['adUser']):
343 return
344 # Second, if add domain user access failed,
345 # try to add domain group access.
346 LOG.debug('Failed add domain user access, '
347 'attempting to add domain group access. %s', access_info)
348 if send_rest(access_to, ug_type['adGroup']): 348 ↛ 351line 348 didn't jump to line 351 because the condition on line 348 was always true
349 return
351 raise exception.InvalidShare(reason=msg)
353 def _get_access_from_nfs_share(self, path, clientName):
354 url = f'rest/nfsShareClient?path={path}&client={clientName}'
356 result = self.call(url, None, 'GET')
357 msg = 'Failed to get share NFS access.'
359 self._assert_result_code(result, msg)
360 share_client = None
361 if result['data'] is not None: 361 ↛ 367line 361 didn't jump to line 367 because the condition on line 361 was always true
362 share_client = {}
363 share_client['path'] = result['data']['path']
364 share_client['clientName'] = result['data']['clientName']
365 share_client['accessRight'] = result['data']['accessRight']
367 return share_client
369 def _get_access_from_cifs_share(self, share_path, access_to,
370 ug_input_type=None):
372 ug_type = {
373 'localUser': '0',
374 'localGroup': '1',
375 'adUser': '2',
376 'adGroup': '3',
377 }
379 msg = 'Failed to get share cifs access.'
380 access_info = (f'Access info (access_to: {access_to},'
381 f'path: {share_path}.)')
383 def send_rest(access_to, ug_type):
384 url = f'rest/cifsShareClient?path={share_path}' \
385 f'&ugName={access_to}&ugType={ug_type}'
386 result = self.call(url, None, 'GET')
387 self._assert_result_code(result, msg)
388 return result
390 share_client = None
391 if ug_input_type is not None:
392 ret = send_rest(access_to, ug_input_type)
393 if ret['data']: 393 ↛ 400line 393 didn't jump to line 400 because the condition on line 393 was always true
394 share_client = {}
395 share_client['path'] = ret['data']['path']
396 share_client['ugName'] = ret['data']['ugName']
397 share_client['ugType'] = ret['data']['ugType']
398 share_client['accessRight'] = ret['data']['accessRight']
400 return share_client
401 elif '/' not in access_to:
402 LOG.debug('Attempting to get local user access. %s', access_info)
403 user_ret = send_rest(access_to, ug_type['localUser'])
404 if user_ret['code'] == constants.CODE_NOT_FOUND:
405 return share_client
406 if user_ret['data']:
407 share_client = {}
408 share_client['path'] = user_ret['data']['path']
409 share_client['ugName'] = user_ret['data']['ugName']
410 share_client['ugType'] = user_ret['data']['ugType']
411 share_client['accessRight'] = user_ret['data']['accessRight']
412 return share_client
414 LOG.debug('Failed get local user access,'
415 ' attempting to get local group access. %s', access_info)
416 group_ret = send_rest(access_to, ug_type['localGroup'])
417 if group_ret['data']: 417 ↛ 453line 417 didn't jump to line 453 because the condition on line 417 was always true
418 share_client = {}
419 share_client['path'] = group_ret['data']['path']
420 share_client['ugName'] = group_ret['data']['ugName']
421 share_client['ugType'] = group_ret['data']['ugType']
422 share_client['accessRight'] = group_ret['data']['accessRight']
423 return share_client
424 else:
425 str = access_to.index('/')
426 access_to = access_to[str + 1:]
427 LOG.debug('Attempting to get domain user access. %s', access_info)
428 aduser_ret = send_rest(access_to, ug_type['adUser'])
429 if aduser_ret['code'] == constants.CODE_NOT_FOUND:
430 return share_client
431 if aduser_ret['data']:
432 share_client = {}
433 share_client['path'] = aduser_ret['data']['path']
434 share_client['ugName'] = aduser_ret['data']['ugName']
435 share_client['ugType'] = aduser_ret['data']['ugType']
436 share_client['accessRight'] = \
437 aduser_ret['data']['accessRight']
438 return share_client
440 LOG.debug('Failed get domain user access,'
441 ' attempting to get domain group access. %s',
442 access_info)
443 adgroup_ret = send_rest(access_to, ug_type['adGroup'])
444 if adgroup_ret['data']: 444 ↛ 453line 444 didn't jump to line 453 because the condition on line 444 was always true
445 share_client = {}
446 share_client['path'] = adgroup_ret['data']['path']
447 share_client['ugName'] = adgroup_ret['data']['ugName']
448 share_client['ugType'] = adgroup_ret['data']['ugType']
449 share_client['accessRight'] = \
450 adgroup_ret['data']['accessRight']
451 return share_client
453 return share_client
455 def _get_all_nfs_access_rest(self, share_path):
456 url = f'rest/allNfsShareClient?path={share_path}'
458 result = self.call(url, None, 'GET')
460 msg = 'Get all nfs access error.'
461 self._assert_result_code(result, msg)
462 access_list = []
463 if result['data'] is None: 463 ↛ 464line 463 didn't jump to line 464 because the condition on line 463 was never true
464 pass
465 else:
466 for item in result.get('data', []):
467 access = {}
468 access['share_path'] = item['path']
469 access['access_to'] = item['clientName']
470 access['access_level'] = item['accessRight']
471 access_list.append(access)
473 return access_list
475 def _get_all_cifs_access_rest(self, share_path):
476 url = f'rest/allCifsShareClient?path={share_path}'
478 result = self.call(url, None, 'GET')
480 msg = 'Get all cifs access error.'
481 self._assert_result_code(result, msg)
482 access_list = []
483 for item in result.get('data', []):
484 access = {}
485 access['share_path'] = item['path']
486 access['access_to'] = item['ugName']
487 access['ugType'] = item['ugType']
488 access['access_level'] = item['accessRight']
489 access_list.append(access)
491 return access_list
493 def _change_nfs_access_rest(self, share_path, access_to, access_level):
494 url = 'rest/nfsShareClient'
495 access_info = {
496 'path': share_path,
497 'oldNfsClientName': access_to,
498 'clientName': '',
499 'accessRight': access_level,
500 'allSquash': '',
501 'rootSquash': '',
502 'secure': '',
503 'anonuid': '',
504 'anongid': '',
505 }
506 result = self.call(url, access_info, 'PUT')
508 msg = 'Update nfs acess error.'
509 self._assert_result_code(result, msg)
511 def _change_cifs_access_rest(self, share_path, access_to,
512 access_level, ug_type):
513 url = 'rest/cifsShareClient'
514 if '/' in access_to: 514 ↛ 517line 514 didn't jump to line 517 because the condition on line 514 was always true
515 str = access_to.index('/')
516 access_to = access_to[str + 1:]
517 access_info = {
518 'path': share_path,
519 'right': access_level,
520 'ugName': access_to,
521 'ugType': ug_type,
522 }
524 result = self.call(url, access_info, 'PUT')
525 msg = 'Update cifs access error.'
527 self._assert_result_code(result, msg)
529 def _delete_nfs_access_rest(self, share_path, access_to):
530 url = f'rest/nfsShareClient?path={share_path}&client={access_to}'
532 result = self.call(url, None, 'DELETE')
533 msg = 'Delete nfs access error.'
535 self._assert_result_code(result, msg)
537 def _delete_cifs_access_rest(self, share_path, access_to, ug_type):
538 url = f'rest/cifsShareClient?path={share_path}&ugName={access_to}' \
539 f'&ugType={ug_type}'
541 result = self.call(url, None, 'DELETE')
542 msg = 'Delete cifs access error.'
544 self._assert_result_code(result, msg)
546 def _get_nfs_service_status(self):
547 url = 'rest/nfsService'
548 result = self.call(url, None, 'GET')
550 msg = 'Get NFS service stauts error.'
551 self._assert_result_code(result, msg)
553 nfs_service = {}
555 nfs_service['serviceStatus'] = result['data']['serviceStatus']
556 nfs_service['nfs3Status'] = result['data']['nfs3Status']
557 nfs_service['nfs4Status'] = result['data']['nfs4Status']
559 return nfs_service
561 def _start_nfs_service(self):
562 url = 'rest/nfsService'
563 nfs_service_info = {
564 "openStatus": "1",
565 }
567 result = self.call(url, nfs_service_info, 'PUT')
569 self._assert_result_code(result, 'Start NFS service error.')
571 def _config_nfs_service(self):
572 url = 'rest/nfsConfig'
573 config_nfs = {
574 'configNfs3': "yes",
575 'configNfs4': "yes",
576 }
578 result = self.call(url, config_nfs, 'PUT')
580 self._assert_result_code(result, 'Config NFS service error.')
582 def _get_cifs_service_status(self):
583 url = 'rest/cifsService'
584 result = self.call(url, None, 'GET')
586 msg = 'Get CIFS service status error.'
587 self._assert_result_code(result, msg)
589 return result['data']
591 def _start_cifs_service(self):
592 url = 'rest/cifsService'
593 cifs_service_info = {
594 'openStatus': '1',
595 }
597 result = self.call(url, cifs_service_info, 'PUT')
599 self._assert_result_code(result, 'Start CIFS service error.')
601 def _config_cifs_service(self):
602 url = 'rest/cifsConfig'
603 """config user mode"""
604 config_cifs = {
605 'workName': 'manila',
606 'description': '',
607 'access_way': 'user',
608 'isCache': 'no',
609 'adsName': '',
610 'adsIP': '',
611 'adsUSER': '',
612 'adsPASSWD': '',
613 'allowList': [],
614 'denyList': [],
615 }
617 result = self.call(url, config_cifs, 'PUT')
619 self._assert_result_code(result, 'Config CIFS service error.')
621 def _get_all_pool(self):
622 url = 'rest/storagepool'
624 result = self.call(url, None, 'GET')
626 msg = 'Query pool info error.'
627 self._assert_result_code(result, msg)
629 return result
631 def _query_user(self, user_name):
632 url = f'rest/user/{user_name}'
634 result = self.call(url, None, 'GET')
636 msg = 'Query user error.'
637 self._assert_result_code(result, msg)
638 return result['data']
640 def _add_localuser(self, user_name, user_passwd, group_name):
641 url = 'rest/localUser'
642 user_info = {
643 'userName': user_name,
644 'mgGroup': group_name,
645 'userPasswd': user_passwd,
646 'unusedGroup': [],
647 }
648 result = self.call(url, user_info, 'POST')
649 msg = 'add localuser error.'
650 self._assert_result_code(result, msg)
652 def _query_group(self, group_name):
653 url = f'rest/group/{group_name}'
655 result = self.call(url, None, 'GET')
656 msg = 'Query group error.'
657 self._assert_result_code(result, msg)
658 return result['data']
660 def _add_localgroup(self, group_name):
661 url = 'rest/localGroup'
662 group_info = {
663 'groupName': group_name,
664 }
665 result = self.call(url, group_info, 'POST')
667 msg = 'add localgroup error.'
668 self._assert_result_code(result, msg)