Coverage for manila/share/drivers/dell_emc/plugins/powerstore/client.py: 99%
173 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2023 Dell Inc. or its subsidiaries.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""REST client for Dell EMC PowerStore Manila Driver."""
18import functools
19import json
21from oslo_log import log as logging
22from oslo_utils import strutils
23import requests
25LOG = logging.getLogger(__name__)
28class PowerStoreClient(object):
29 def __init__(self,
30 rest_ip,
31 rest_username,
32 rest_password,
33 verify_certificate=False,
34 certificate_path=None):
35 self.rest_ip = rest_ip
36 self.rest_username = rest_username
37 self.rest_password = rest_password
38 self.verify_certificate = verify_certificate
39 self.certificate_path = certificate_path
40 self.base_url = "https://%s/api/rest" % self.rest_ip
41 self.ok_codes = [
42 requests.codes.ok,
43 requests.codes.created,
44 requests.codes.accepted,
45 requests.codes.no_content,
46 requests.codes.partial_content
47 ]
49 @property
50 def _verify_cert(self):
51 verify_cert = self.verify_certificate
52 if self.verify_certificate and self.certificate_path:
53 verify_cert = self.certificate_path
54 return verify_cert
56 def _send_request(self,
57 method,
58 url,
59 payload=None,
60 params=None,
61 log_response_data=True):
62 if not params: 62 ↛ 64line 62 didn't jump to line 64 because the condition on line 62 was always true
63 params = {}
64 request_params = {
65 "auth": (self.rest_username, self.rest_password),
66 "verify": self._verify_cert,
67 "params": params
68 }
69 if payload and method != "GET":
70 request_params["data"] = json.dumps(payload)
71 request_url = self.base_url + url
72 r = requests.request(method, request_url, **request_params)
74 log_level = logging.DEBUG
75 if r.status_code not in self.ok_codes:
76 log_level = logging.ERROR
77 LOG.log(log_level,
78 "REST Request: %s %s with body %s",
79 r.request.method,
80 r.request.url,
81 strutils.mask_password(r.request.body))
82 if log_response_data or log_level == logging.ERROR:
83 msg = "REST Response: %s with data %s" % (r.status_code, r.text)
84 else:
85 msg = "REST Response: %s" % r.status_code
86 LOG.log(log_level, msg)
88 try:
89 response = r.json()
90 except ValueError:
91 response = None
92 return r, response
94 _send_get_request = functools.partialmethod(_send_request, "GET")
95 _send_post_request = functools.partialmethod(_send_request, "POST")
96 _send_patch_request = functools.partialmethod(_send_request, "PATCH")
97 _send_delete_request = functools.partialmethod(_send_request, "DELETE")
99 def get_nas_server_id(self, nas_server_name):
100 """Retrieves the NAS server ID.
102 :param nas_server_name: NAS server name
103 :return: ID of the NAS server if success
104 """
105 url = '/nas_server?name=eq.' + nas_server_name
106 res, response = self._send_get_request(url)
107 if res.status_code == requests.codes.ok:
108 return response[0]['id']
110 def get_nas_server_interfaces(self, nas_server_id):
111 """Retrieves the NAS server ID.
113 :param nas_server_id: NAS server ID
114 :return: File interfaces of the NAS server if success
115 """
116 url = '/nas_server/' + nas_server_id + \
117 '?select=current_preferred_IPv4_interface_id,' \
118 'current_preferred_IPv6_interface_id,' \
119 'file_interfaces(id,ip_address)'
120 res, response = self._send_get_request(url)
121 if res.status_code == requests.codes.ok:
122 preferred_IP = [response['current_preferred_IPv4_interface_id'],
123 response['current_preferred_IPv6_interface_id']]
124 file_interfaces = []
125 for i in response['file_interfaces']:
126 file_interfaces.append({
127 'ip': i['ip_address'],
128 'preferred': i['id'] in preferred_IP
129 })
130 return file_interfaces
132 def create_filesystem(self, nas_server_id, name, size):
133 """Creates a filesystem.
135 :param nas_server_id: ID of the nas_server
136 :param name: name of the filesystem
137 :param size: size in Byte
138 :return: ID of the filesystem if created successfully
139 """
140 payload = {
141 "name": name,
142 "size_total": size,
143 "nas_server_id": nas_server_id
144 }
145 url = '/file_system'
146 res, response = self._send_post_request(url, payload)
147 if res.status_code == requests.codes.created:
148 return response["id"]
150 def create_nfs_export(self, filesystem_id, name):
151 """Creates an NFS export.
153 :param filesystem_id: ID of the filesystem on which
154 the export will be created
155 :param name: name of the NFS export
156 :return: ID of the export if created successfully
157 """
158 payload = {
159 "file_system_id": filesystem_id,
160 "path": "/" + str(name),
161 "name": name
162 }
163 url = '/nfs_export'
164 res, response = self._send_post_request(url, payload)
165 if res.status_code == requests.codes.created:
166 return response["id"]
168 def delete_filesystem(self, filesystem_id):
169 """Deletes a filesystem and all associated export.
171 :param filesystem_id: ID of the filesystem to delete
172 :return: True if deleted successfully
173 """
174 url = '/file_system/' + filesystem_id
175 res, _ = self._send_delete_request(url)
176 return res.status_code == requests.codes.no_content
178 def get_nfs_export_name(self, export_id):
179 """Retrieves NFS Export name.
181 :param export_id: ID of the NFS export
182 :return: path of the NFS export if success
183 """
184 url = '/nfs_export/' + export_id + '?select=name'
185 res, response = self._send_get_request(url)
186 if res.status_code == requests.codes.ok:
187 return response["name"]
189 def get_nfs_export_id(self, name):
190 """Retrieves NFS Export ID.
192 :param name: name of the NFS export
193 :return: id of the NFS export if success
194 """
195 url = '/nfs_export?select=id&name=eq.' + name
196 res, response = self._send_get_request(url)
197 if res.status_code == requests.codes.ok:
198 return response[0]['id']
200 def get_filesystem_id(self, name):
201 """Retrieves an ID for a filesystem.
203 :param name: name of the filesystem
204 :return: ID of the filesystem if success
205 """
206 url = '/file_system?name=eq.' + name
207 res, response = self._send_get_request(url)
208 if res.status_code == requests.codes.ok:
209 return response[0]['id']
211 def set_export_access(self, export_id, rw_hosts, ro_hosts):
212 """Sets the access hosts on the export.
214 :param export_id: NFS export ID
215 :param rw_hosts: a set of RW hosts
216 :param ro_hosts: a set of RO hosts
217 :return: True if operation succeeded
218 """
219 payload = {
220 "read_only_hosts": list(ro_hosts),
221 "read_write_root_hosts": list(rw_hosts)
222 }
223 url = '/nfs_export/' + export_id
224 res, _ = self._send_patch_request(url, payload)
225 return res.status_code == requests.codes.no_content
227 def resize_filesystem(self, filesystem_id, new_size):
228 """Extends the size of a share to a new size.
230 :param export_id: ID of the NFS export
231 :param new_size: new size to allocate in bytes
232 :return: True if extended successfully
233 """
234 payload = {
235 "size_total": new_size
236 }
237 url = '/file_system/' + filesystem_id
238 res, response = self._send_patch_request(url, payload)
239 if res.status_code == requests.codes.unprocessable and \
240 response['messages'][0]['code'] == '0xE08010080449':
241 return False, response['messages'][0]['message_l10n']
242 return res.status_code == requests.codes.no_content, None
244 def get_fsid_from_export_name(self, name):
245 """Retieves the Filesystem ID used by an export.
247 :param name: name of the export
248 :return: ID of the Filesystem which owns the export
249 """
250 url = '/nfs_export?select=file_system_id&name=eq.' + name
251 res, response = self._send_get_request(url)
252 if res.status_code == requests.codes.ok:
253 return response[0]['file_system_id']
255 def create_snapshot(self, filesystem_id, name):
256 """Creates a snapshot of a filesystem.
258 :param filesystem_id: ID of the filesystem
259 :param name: name of the snapshot
260 :return: ID of the snapshot if created successfully
261 """
262 payload = {
263 "name": name
264 }
265 url = '/file_system/' + filesystem_id + '/snapshot'
266 res, response = self._send_post_request(url, payload)
267 if res.status_code == requests.codes.created:
268 return response["id"]
270 def restore_snapshot(self, snapshot_id):
271 """Restore a snapshot of a filesystem.
273 :param snapshot_id: ID of the snapshot
274 :return: True if operation succeeded
275 """
276 url = '/file_system/' + snapshot_id + '/restore'
277 res, _ = self._send_post_request(url)
278 return res.status_code == requests.codes.no_content
280 def clone_snapshot(self, snapshot_id, name):
281 """Clone a snapshot of a filesystem.
283 :param snapshot_id: ID of the snapshot
284 :param name: name the snapshot
285 :return: ID of the clone if created successfully
286 """
287 payload = {
288 "name": name
289 }
290 url = '/file_system/' + snapshot_id + '/clone'
291 res, response = self._send_post_request(url, payload)
292 if res.status_code == requests.codes.created:
293 return response["id"]
295 def get_cluster_id(self):
296 """Get cluster id.
298 :return: ID of the cluster
299 """
300 url = '/cluster'
301 res, response = self._send_get_request(url)
302 if res.status_code == requests.codes.ok:
303 return response[0]["id"]
305 def retreive_cluster_capacity_metrics(self, cluster_id):
306 """Retreive cluster capacity metrics.
308 :param cluster_id: ID of the cluster
309 :return: total and used capacity in Byte
310 """
311 payload = {
312 "entity": "space_metrics_by_cluster",
313 "entity_id": cluster_id
314 }
315 url = '/metrics/generate?order=timestamp'
316 # disable logging of the response
317 res, response = self._send_post_request(url, payload,
318 log_response_data=False)
319 if res.status_code == requests.codes.ok:
320 # latest cluster capacity metrics
321 latestMetrics = response[len(response) - 1]
322 LOG.debug(f"Latest cluster capacity: {latestMetrics}")
323 return (latestMetrics["physical_total"],
324 latestMetrics["physical_used"])
325 return None, None
327 def create_smb_share(self, filesystem_id, name):
328 """Creates a SMB share.
330 :param filesystem_id: ID of the filesystem on which
331 the export will be created
332 :param name: name of the SMB share
333 :return: ID of the share if created successfully
334 """
335 payload = {
336 "file_system_id": filesystem_id,
337 "path": "/" + str(name),
338 "name": name
339 }
340 url = '/smb_share'
341 res, response = self._send_post_request(url, payload)
342 if res.status_code == requests.codes.created:
343 return response["id"]
345 def get_fsid_from_share_name(self, name):
346 """Retieves the Filesystem ID used by a SMB share.
348 :param name: name of the SMB share
349 :return: ID of the Filesystem which owns the share
350 """
351 url = '/smb_share?select=file_system_id&name=eq.' + name
352 res, response = self._send_get_request(url)
353 if res.status_code == requests.codes.ok:
354 return response[0]['file_system_id']
356 def get_smb_share_id(self, name):
357 """Retrieves SMB share ID.
359 :param name: name of the SMB share
360 :return: id of the SMB share if success
361 """
362 url = '/smb_share?select=id&name=eq.' + name
363 res, response = self._send_get_request(url)
364 if res.status_code == requests.codes.ok:
365 return response[0]['id']
367 def get_nas_server_smb_netbios(self, nas_server_name):
368 """Retrieves the domain name or netbios name.
370 :param nas_server_name: NAS server name
371 :return: Netbios name of SMB server if success
372 """
373 url = '/nas_server?select=smb_servers(is_standalone,netbios_name)' \
374 '&name=eq.' + nas_server_name
375 res, response = self._send_get_request(url)
376 if res.status_code == requests.codes.ok:
377 smb_server = response[0]['smb_servers'][0]
378 if smb_server["is_standalone"]: 378 ↛ exitline 378 didn't return from function 'get_nas_server_smb_netbios' because the condition on line 378 was always true
379 return smb_server["netbios_name"]
381 def set_acl(self, smb_share_id, cifs_rw_users, cifs_ro_users):
382 """Set ACL for a SMB share.
384 :param smb_share_id: ID of the SMB share
385 :param name: name of the SMB share
386 :return: ID of the share if created successfully
387 """
388 aces = list()
389 for rw_user in cifs_rw_users:
390 ace = {
391 "trustee_type": "User",
392 "trustee_name": rw_user,
393 "access_level": "Change",
394 "access_type": "Allow"
395 }
396 aces.append(ace)
398 for ro_user in cifs_ro_users:
399 ace = {
400 "trustee_type": "User",
401 "trustee_name": ro_user,
402 "access_level": "Read",
403 "access_type": "Allow"
404 }
405 aces.append(ace)
407 payload = {
408 "aces": aces
409 }
410 url = '/smb_share/' + smb_share_id + '/set_acl'
411 res, _ = self._send_post_request(url, payload)
412 return res.status_code == requests.codes.no_content