Coverage for manila/share/drivers/dell_emc/plugins/powerscale/powerscale_api.py: 98%
338 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2015 EMC Corporation.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import enum
17import functools
19from oslo_log import log
20from oslo_serialization import jsonutils
21import requests
22from urllib.parse import quote
24from manila import exception
25from manila.i18n import _
27LOG = log.getLogger(__name__)
30class PowerScaleApi(object):
32 def __init__(self, api_url, username, password,
33 verify_ssl_cert=False,
34 ssl_cert_path=None,
35 dir_permission=None,
36 threshold_limit=0):
37 self.host_url = api_url
38 self.session = requests.session()
39 self.username = username
40 self.password = password
41 self.verify_ssl_cert = verify_ssl_cert
42 self.certificate_path = ssl_cert_path
43 self.dir_permission = dir_permission
44 self.threshold_limit = threshold_limit
46 # Create session
47 self.session_token = None
48 self.csrf_token = None
49 LOG.debug("Login to PowerScale OneFS during initialization.")
50 login = self.create_session(username, password)
51 if not login:
52 message = _("Failed to login to PowerScale OneFS.")
53 raise exception.BadConfigurationException(reason=message)
55 @property
56 def _verify_cert(self):
57 verify_cert = self.verify_ssl_cert
58 if self.verify_ssl_cert and self.certificate_path:
59 verify_cert = self.certificate_path
60 return verify_cert
62 def create_session(self, username, password):
63 """Create a session. Update session token and csrf token."""
65 headers = {"Content-type": "application/json"}
66 url = self.host_url + '/session/1/session'
67 data = {
68 "username": username,
69 "password": password,
70 "services": ["platform", "namespace"]
71 }
72 r = self.session.request(
73 'POST', url, headers=headers, data=jsonutils.dumps(data),
74 verify=self._verify_cert)
75 if r.status_code == requests.codes.created:
76 self.session_token = r.cookies['isisessid']
77 self.csrf_token = r.cookies['isicsrf']
78 return True
80 message = (_('Failed to create session. '
81 'Status_code="%(code)s", body="%(body)s".') %
82 {'code': r.status_code, 'body': r.text})
83 LOG.error(message)
84 return False
86 def create_directory(self, container_path, recursive=False):
87 """Create a directory."""
89 headers = {"x-isi-ifs-target-type": "container"}
90 if self.dir_permission:
91 headers.update({"x-isi-ifs-access-control": self.dir_permission})
92 url = (self.host_url + "/namespace" + container_path + '?recursive='
93 + str(recursive))
94 r = self.send_put_request(url, headers=headers)
95 return r.status_code == 200
97 def clone_snapshot(self, snapshot_name, fq_target_dir,
98 provider_location):
99 self.create_directory(fq_target_dir)
100 if provider_location:
101 snapshot = self.get_snapshot_id(provider_location)
102 snapshot_name = snapshot['name']
103 else:
104 snapshot = self.get_snapshot(snapshot_name)
105 snapshot_path = snapshot['path']
106 # remove /ifs from start of path
107 relative_snapshot_path = snapshot_path[4:]
108 fq_snapshot_path = ('/ifs/.snapshot/' + snapshot_name +
109 relative_snapshot_path)
110 self._clone_directory_contents(fq_snapshot_path, fq_target_dir,
111 snapshot_name, relative_snapshot_path)
113 def _clone_directory_contents(self, fq_source_dir, fq_target_dir,
114 snapshot_name, relative_path):
115 dir_listing = self.get_directory_listing(fq_source_dir)
116 for item in dir_listing['children']:
117 name = item['name']
118 source_item_path = fq_source_dir + '/' + name
119 new_relative_path = relative_path + '/' + name
120 dest_item_path = fq_target_dir + '/' + name
121 if item['type'] == 'container':
122 # create the container name in the target dir & clone dir
123 self.create_directory(dest_item_path)
124 self._clone_directory_contents(source_item_path,
125 dest_item_path,
126 snapshot_name,
127 new_relative_path)
128 elif item['type'] == 'object': 128 ↛ 116line 128 didn't jump to line 116 because the condition on line 128 was always true
129 self.clone_file_from_snapshot('/ifs' + new_relative_path,
130 dest_item_path, snapshot_name)
132 def clone_file_from_snapshot(self, fq_file_path, fq_dest_path,
133 snapshot_name):
134 headers = {'x-isi-ifs-copy-source': '/namespace' + fq_file_path}
135 snapshot_suffix = '&snapshot=' + snapshot_name
136 url = (self.host_url + '/namespace' + fq_dest_path + '?clone=true' +
137 snapshot_suffix)
138 self.send_put_request(url, headers=headers)
140 def get_directory_listing(self, fq_dir_path):
141 url = self.host_url + '/namespace' + fq_dir_path + '?detail=default'
142 r = self.send_get_request(url)
144 r.raise_for_status()
145 return r.json()
147 def is_path_existent(self, resource_path):
148 url = self.host_url + '/namespace' + resource_path
149 r = self.send_head_request(url)
150 if r.status_code == 200:
151 return True
152 elif r.status_code == 404:
153 return False
154 else:
155 r.raise_for_status()
157 def get_snapshot(self, snapshot_name):
158 r = self.send_get_request(
159 self.host_url + '/platform/1/snapshot/snapshots/' +
160 snapshot_name)
161 snapshot_json = r.json()
162 if r.status_code == 200:
163 return snapshot_json['snapshots'][0]
164 elif r.status_code == 404:
165 return None
166 else:
167 r.raise_for_status()
169 def get_snapshots(self):
170 r = self.send_get_request(
171 self.host_url + '/platform/1/snapshot/snapshots')
172 if r.status_code == 200:
173 return r.json()
174 else:
175 r.raise_for_status()
177 def get_snapshot_id(self, snap_id):
178 r = self.send_get_request(
179 self.host_url + '/platform/1/snapshot/snapshots/' +
180 snap_id)
181 snapshot_json = r.json()
182 if r.status_code == 200:
183 return snapshot_json['snapshots'][0]
184 elif r.status_code == 404:
185 return None
186 else:
187 r.raise_for_status()
189 def lookup_nfs_export(self, share_path):
190 '''Retrieve NFS export by directory path.'''
191 r = self.send_get_request(
192 self.host_url + '/platform/12/protocols/nfs/exports',
193 params={'path': share_path})
194 if r.status_code == 200 and r.json()['total'] > 0:
195 return r.json()['exports'][0]['id']
196 return None
198 def get_nfs_export(self, export_id):
199 response = self.send_get_request(
200 self.host_url + '/platform/1/protocols/nfs/exports/' +
201 str(export_id))
202 if response.status_code == 200:
203 return response.json()['exports'][0]
204 else:
205 return None
207 def lookup_smb_share(self, share_name):
208 response = self.send_get_request(
209 self.host_url + '/platform/1/protocols/smb/shares/' + share_name)
210 if response.status_code == 200:
211 return response.json()['shares'][0]
212 else:
213 return None
215 def create_nfs_export(self, export_path):
216 """Creates an NFS export using the Platform API.
218 :param export_path: a string specifying the desired export path
219 :return: "True" if created successfully; "False" otherwise
220 """
222 data = {'paths': [export_path]}
223 url = self.host_url + '/platform/1/protocols/nfs/exports'
224 response = self.send_post_request(url, data=data)
225 return response.status_code == 201
227 def create_snapshot_nfs_export(self, export_path):
228 """Creates an NFS export using the Platform API.
230 :param export_path: a string specifying the desired export path
231 :return: "True" if created successfully; "False" otherwise
232 """
234 data = {'paths': [export_path],
235 'read_only': True,
236 "map_root": {"enabled": False}}
237 url = self.host_url + '/platform/22/protocols/nfs/exports'
238 response = self.send_post_request(url, data=data)
239 return response.status_code == 201
241 def create_snapshot_smb_export(self, snapshot_name, share_path):
242 """Creates an SMB/CIFS share.
244 :param snapshot_name: the name of the CIFS share
245 :param share_path: the path associated with the CIFS share
246 :return: "True" if the share created successfully; returns "False"
247 otherwise
248 """
249 data = {'name': snapshot_name, 'path': share_path}
250 url = self.host_url + '/platform/1/protocols/smb/shares'
251 response = self.send_post_request(url, data=data)
252 return response.status_code == 201
254 def modify_nfs_export_access(self, share_id, ro_ips=None, rw_ips=None):
255 """Modify access on an existing NFS export.
257 :param share_id: the ID of the NFS export
258 :param ro_ips: a list of IP addresses that should have read-only
259 access
260 :param rw_ips: a list of IP addresses that should have read-write
261 access
262 :return: a boolean indicating whether the modification was successful
263 """
264 export_params = {}
265 if ro_ips is not None:
266 export_params['read_only_clients'] = ro_ips
267 if rw_ips is not None:
268 export_params['clients'] = rw_ips
270 url = '{0}/platform/1/protocols/nfs/exports/{1}'.format(
271 self.host_url, share_id)
273 resp = self.send_put_request(url, data=export_params)
274 return resp.status_code == 204
276 def create_smb_share(self, share_name, share_path):
277 """Creates an SMB/CIFS share.
279 :param share_name: the name of the CIFS share
280 :param share_path: the path associated with the CIFS share
281 :return: "True" if the share created successfully; returns "False"
282 otherwise
283 """
285 data = {'permissions': []}
286 data['name'] = share_name
287 data['path'] = share_path
288 url = self.host_url + '/platform/1/protocols/smb/shares'
289 response = self.send_post_request(url, data=data)
290 return response.status_code == 201
292 def create_snapshot(self, snapshot_name, snapshot_path):
293 """Creates a snapshot."""
295 data = {'name': snapshot_name, 'path': snapshot_path}
296 r = self.send_post_request(
297 self.host_url + '/platform/1/snapshot/snapshots',
298 data=data)
299 if r.status_code == 201:
300 data = r.json()
301 snap_id = data['id']
302 return snap_id
303 return None
305 def delete_path(self, fq_resource_path, recursive=False):
306 """Deletes a file or folder."""
308 r = self.send_delete_request(
309 self.host_url + '/namespace' + fq_resource_path +
310 '?recursive=' + str(recursive))
311 return r.status_code == 204
313 def delete_nfs_share(self, share_number):
314 response = self.send_delete_request(
315 self.host_url + '/platform/1/protocols/nfs/exports' + '/' +
316 str(share_number))
317 return response.status_code == 204
319 def delete_smb_share(self, share_name):
320 url = self.host_url + '/platform/1/protocols/smb/shares/' + share_name
321 response = self.send_delete_request(url)
322 return response.status_code == 204
324 def delete_snapshot(self, snapshot_name):
325 response = self.send_delete_request(
326 '{0}/platform/1/snapshot/snapshots/{1}'
327 .format(self.host_url, snapshot_name))
328 return response.status_code == 204
330 def delete_snapshot_by_id(self, snapshot_id):
331 response = self.send_delete_request(
332 '{0}/platform/1/snapshot/snapshots/{1}'
333 .format(self.host_url, snapshot_id))
334 return response.status_code == 204
336 def quota_create(self, path, quota_type, size):
337 thresholds = {'hard': size}
338 if self.threshold_limit > 0:
339 advisory_size = round((size * self.threshold_limit) / 100)
340 thresholds['advisory'] = int(advisory_size)
341 data = {
342 'path': path,
343 'type': quota_type,
344 'include_snapshots': False,
345 'thresholds_include_overhead': False,
346 'enforced': True,
347 'thresholds': thresholds,
348 }
349 response = self.send_post_request(
350 '{0}/platform/1/quota/quotas'.format(self.host_url),
351 data=data)
352 response.raise_for_status()
354 def quota_get(self, path, quota_type):
355 response = self.send_get_request(
356 '{0}/platform/1/quota/quotas?path={1}'.format(self.host_url, path),
357 )
358 if response.status_code == 404:
359 return None
360 elif response.status_code != 200:
361 response.raise_for_status()
363 json = response.json()
364 len_returned_quotas = len(json['quotas'])
365 if len_returned_quotas == 0: 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true
366 return None
367 elif len_returned_quotas == 1: 367 ↛ 370line 367 didn't jump to line 370 because the condition on line 367 was always true
368 return json['quotas'][0]
369 else:
370 message = (_('Greater than one quota returned when querying '
371 'quotas associated with share path: %(path)s .') %
372 {'path': path})
373 raise exception.ShareBackendException(msg=message)
375 def quota_modify_size(self, quota_id, new_size):
376 data = {'thresholds': {'hard': new_size}}
377 if self.threshold_limit > 0:
378 advisory_size = round((new_size * self.threshold_limit) / 100)
379 data.get('thresholds')['advisory'] = int(advisory_size)
380 response = self.send_put_request(
381 '{0}/platform/1/quota/quotas/{1}'.format(self.host_url, quota_id),
382 data=data
383 )
384 response.raise_for_status()
386 def quota_set(self, path, quota_type, size):
387 """Sets a quota of the given type and size on the given path."""
388 quota_json = self.quota_get(path, quota_type)
389 if quota_json is None:
390 self.quota_create(path, quota_type, size)
391 else:
392 # quota already exists, modify it's size
393 quota_id = quota_json['id']
394 self.quota_modify_size(quota_id, size)
396 def delete_quota(self, quota_id):
397 response = self.send_delete_request(
398 '{0}/platform/1/quota/quotas/{1}'.format(self.host_url, quota_id))
399 return response.status_code == 204
401 def modify_smb_share_access(self, share_name,
402 host_acl=None, permissions=None):
403 """Modifies SMB share access
405 :param share_name: the name of the SMB share
406 :param host_acl: host access control list
407 :param permissions: SMB permissions
408 :return: "True" if access updated successfully; otherwise "False"
409 """
410 data = {}
411 if host_acl is not None:
412 data['host_acl'] = host_acl
413 if permissions is not None:
414 data['permissions'] = permissions
415 url = ('{0}/platform/1/protocols/smb/shares/{1}'
416 .format(self.host_url, share_name))
417 r = self.send_put_request(url, data=data)
418 return r.status_code == 204
420 def get_user_sid(self, user):
421 user_json = self.auth_lookup_user(user)
422 if user_json:
423 auth_mappings = user_json['mapping']
424 if len(auth_mappings) > 1:
425 message = (_('More than one mapping found for user "%(user)s".'
426 ) % {'user': user})
427 LOG.error(message)
428 return None
429 user_sid = auth_mappings[0]['user']['sid']
430 return user_sid
432 def auth_lookup_user(self, user_string):
433 url = '{0}/platform/1/auth/mapping/users/lookup'.format(self.host_url)
434 r = self.send_get_request(url, params={"user": user_string})
435 if r.status_code == 200:
436 return r.json()
437 LOG.error(f'Failed to lookup user {user_string}.')
439 def get_space_stats(self):
440 url = '{0}/platform/1/statistics/current'.format(self.host_url)
441 params = {'keys': 'ifs.bytes.free,ifs.bytes.total,ifs.bytes.used'}
442 r = self.send_get_request(url, params=params)
443 if r.status_code != 200:
444 raise exception.ShareBackendException(
445 msg=_('Failed to get statistics from PowerScale.')
446 )
447 stats = r.json()['stats']
448 spaces = {}
449 for stat in stats:
450 if stat['key'] == 'ifs.bytes.total':
451 spaces['total'] = stat['value']
452 elif stat['key'] == 'ifs.bytes.free':
453 spaces['free'] = stat['value']
454 elif stat['key'] == 'ifs.bytes.used': 454 ↛ 449line 454 didn't jump to line 449 because the condition on line 454 was always true
455 spaces['used'] = stat['value']
456 return spaces
458 def get_allocated_space(self):
459 url = '{0}/platform/1/quota/quotas'.format(self.host_url)
460 r = self.send_get_request(url)
461 allocated_capacity = 0
462 if r.status_code != 200:
463 raise exception.ShareBackendException(
464 msg=_('Failed to get share quotas from PowerScale.')
465 )
466 quotas = r.json()['quotas']
467 for quota in quotas:
468 if quota['thresholds']['hard'] is not None:
469 allocated_capacity += quota['thresholds']['hard']
470 if allocated_capacity > 0: 470 ↛ 472line 470 didn't jump to line 472 because the condition on line 470 was always true
471 return round(allocated_capacity / (1024 ** 3), 2)
472 return allocated_capacity
474 def get_cluster_version(self):
475 url = '{0}/platform/12/cluster/version'.format(self.host_url)
476 r = self.send_get_request(url)
477 if r.status_code != 200:
478 raise exception.ShareBackendException(
479 msg=_('Failed to get cluster version from PowerScale.')
480 )
481 return r.json()['nodes'][0]['release']
483 def create_nfs_export_aliases(self, alias_name, path):
484 data = {'name': alias_name, 'path': path}
485 r = self.send_post_request(
486 self.host_url + '/platform/12/protocols/nfs/aliases?zone=System',
487 data=data)
488 return r.status_code == 201
490 def get_nfs_export_aliases(self, alias_name):
491 encoded_alias = quote(alias_name, safe='')
492 url = (self.host_url + '/platform/12/protocols/nfs/aliases/'
493 + encoded_alias + "?zone=System")
494 r = self.send_get_request(url)
495 if r.status_code != 200:
496 raise exception.ShareBackendException(
497 msg=_('Failed to get nfs aliases from PowerScale.')
498 )
499 return r.json()['aliases'][0]
501 def delete_nfs_export_aliases(self, alias_name):
502 encoded_alias = quote(alias_name, safe='')
503 url = (self.host_url + '/platform/12/protocols/nfs/aliases/'
504 + encoded_alias + "?zone=System")
505 response = self.send_delete_request(url)
506 return response.status_code == 204
508 def request(self, method, url, headers=None, data=None, params=None):
509 if data is not None:
510 data = jsonutils.dumps(data)
511 cookies = {'isisessid': self.session_token}
512 csrf_headers = {'X-CSRF-Token': self.csrf_token,
513 'referer': self.host_url}
514 if headers:
515 headers.update(csrf_headers)
516 else:
517 headers = csrf_headers
519 self._log_request(method, url, data, params)
520 r = self.session.request(
521 method, url, cookies=cookies, headers=headers, data=data,
522 verify=self._verify_cert, params=params)
523 self._log_response(r)
525 # Unauthorized, login again
526 if r.status_code == 401:
527 login = self.create_session(self.username, self.password)
528 # Resend the request once login is successful
529 if login: 529 ↛ 536line 529 didn't jump to line 536 because the condition on line 529 was always true
530 self._log_request(method, url, data, params)
531 r = self.session.request(
532 method, url, cookies=cookies, headers=headers, data=data,
533 verify=self._verify_cert, params=params)
534 self._log_response(r)
536 return r
538 def _log_request(self, method, url, data=None, params=None):
539 req_dict = {}
540 if data:
541 req_dict['data'] = data
542 if params:
543 req_dict['params'] = params
544 if req_dict:
545 LOG.debug(f'Request: {method} {url} {req_dict}')
546 else:
547 LOG.debug(f'Request: {method} {url}')
549 def _log_response(self, r):
550 try:
551 body = r.json()
552 except requests.exceptions.JSONDecodeError:
553 body = r.text
554 LOG.debug(f'Response: status_code={r.status_code} body={body}')
556 send_get_request = functools.partialmethod(request, "GET")
557 send_post_request = functools.partialmethod(request, "POST")
558 send_put_request = functools.partialmethod(request, "PUT")
559 send_delete_request = functools.partialmethod(request, "DELETE")
560 send_head_request = functools.partialmethod(request, "HEAD")
563class SmbPermission(enum.Enum):
564 full = 'full'
565 rw = 'change'
566 ro = 'read'