Coverage for manila/share/drivers/dell_emc/plugins/powerflex/object_manager.py: 91%
171 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2023 Dell Inc. or its subsidiaries.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16from http import client as http_client
17import json
19from oslo_log import log as logging
20import requests
22from manila import exception
24LOG = logging.getLogger(__name__)
27class StorageObjectManager(object):
29 def __init__(self,
30 host_url,
31 username,
32 password,
33 export_path,
34 certificate_path=None,
35 verify_ssl_cert=False):
36 self.host_url = host_url
37 self.base_url = host_url + '/rest'
38 self.rest_username = username
39 self.rest_password = password
40 self.rest_token = None
41 self.got_token = False
42 self.export_path = export_path
43 self.verify_certificate = verify_ssl_cert
44 self.certificate_path = certificate_path
46 def _get_headers(self):
47 if self.got_token:
48 return {"Content-type": "application/json",
49 "Accept": "application/json",
50 "Authorization": "Bearer " + self.rest_token}
51 else:
52 return {"Content-type": "application/json",
53 "Accept": "application/json"}
55 def execute_powerflex_get_request(self, url, **url_params):
56 request = url % url_params
57 res = requests.get(request,
58 headers=self._get_headers(),
59 verify=self._get_verify_cert())
60 res = self._check_response(res, request, "GET")
61 response = res.json()
63 return res, response
65 def execute_powerflex_post_request(self, url, params=None, **url_params):
66 if not params:
67 params = {}
68 request = url % url_params
69 res = requests.post(request,
70 data=json.dumps(params),
71 headers=self._get_headers(),
72 verify=self._get_verify_cert())
73 res = self._check_response(res, request, "POST", params)
74 response = None
75 try:
76 response = res.json()
77 except ValueError:
78 # Particular case for get_storage_pool_id which is not
79 # a json object but a string
80 response = res
81 return res, response
83 def execute_powerflex_delete_request(self, url, **url_params):
84 request = url % url_params
85 res = requests.delete(request,
86 headers=self._get_headers(),
87 verify=self._get_verify_cert())
88 res = self._check_response(res, request, "DELETE")
89 return res
91 def execute_powerflex_patch_request(self, url, params=None, **url_params):
92 if not params:
93 params = {}
94 request = url % url_params
95 res = requests.patch(request,
96 data=json.dumps(params),
97 headers=self._get_headers(),
98 verify=self._get_verify_cert())
99 res = self._check_response(res, request, "PATCH")
100 return res
102 def _check_response(self,
103 response,
104 request,
105 request_type,
106 params=None):
107 login_url = "/auth/login"
109 if (response.status_code == http_client.UNAUTHORIZED or
110 response.status_code == http_client.FORBIDDEN):
111 LOG.info("Dell PowerFlex token is invalid, going to re-login "
112 "and get a new one.")
113 login_request = self.base_url + login_url
114 verify_cert = self._get_verify_cert()
115 self.got_token = False
116 payload = json.dumps({"username": self.rest_username,
117 "password": self.rest_password})
118 res = requests.post(login_request,
119 headers=self._get_headers(),
120 data=payload,
121 verify=verify_cert)
122 if (res.status_code == http_client.UNAUTHORIZED or
123 res.status_code == http_client.FORBIDDEN):
124 message = ("PowerFlex REST API access is still forbidden or "
125 "unauthorized, there might be an issue with your "
126 "credentials.")
127 LOG.error(message)
128 raise exception.NotAuthorized()
129 else:
130 token = res.json()["access_token"]
131 self.rest_token = token
132 self.got_token = True
133 LOG.info("Going to perform request again %s with valid token.",
134 request)
135 if (request_type == "GET"):
136 response = requests.get(request,
137 headers=self._get_headers(),
138 verify=verify_cert)
139 elif (request_type == "POST"):
140 response = requests.post(request,
141 headers=self._get_headers(),
142 data=json.dumps(params),
143 verify=verify_cert)
144 elif (request_type == "DELETE"):
145 response = requests.delete(request,
146 headers=self._get_headers(),
147 verify=verify_cert)
148 elif (request_type == "PATCH"): 148 ↛ 153line 148 didn't jump to line 153 because the condition on line 148 was always true
149 response = requests.patch(request,
150 headers=self._get_headers(),
151 data=json.dumps(params),
152 verify=verify_cert)
153 level = logging.DEBUG
154 if response.status_code != http_client.OK:
155 level = logging.ERROR
156 LOG.log(level,
157 "REST REQUEST: %s with params %s",
158 request,
159 json.dumps(params))
160 LOG.log(level,
161 "REST RESPONSE: %s with params %s",
162 response.status_code,
163 response.text)
164 return response
166 def _get_verify_cert(self):
167 verify_cert = False
168 if self.verify_certificate: 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true
169 verify_cert = self.certificate_path
170 return verify_cert
172 def create_filesystem(self, storage_pool_id, nas_server, name, size):
173 """Creates a filesystem.
175 :param nas_server: name of the nas_server
176 :param name: name of the filesystem
177 :param size: size in GiB
178 :return: ID of the filesystem if created successfully
179 """
180 nas_server_id = self.get_nas_server_id(nas_server)
181 params = {
182 "name": name,
183 "size_total": size,
184 "storage_pool_id": storage_pool_id,
185 "nas_server_id": nas_server_id
186 }
187 url = f'{self.base_url}/v1/file-systems'
188 res, response = self.execute_powerflex_post_request(url, params)
189 if res.status_code == 201: 189 ↛ exitline 189 didn't return from function 'create_filesystem' because the condition on line 189 was always true
190 return response["id"]
192 def create_nfs_export(self, filesystem_id, name):
193 """Creates an NFS export.
195 :param filesystem_id: ID of the filesystem on which
196 the export will be created
197 :param name: name of the NFS export
198 :return: ID of the export if created successfully
199 """
200 params = {
201 "file_system_id": filesystem_id,
202 "path": "/" + str(name),
203 "name": name
204 }
205 url = f'{self.base_url}/v1/nfs-exports'
206 res, response = self.execute_powerflex_post_request(url, params)
207 if res.status_code == 201: 207 ↛ exitline 207 didn't return from function 'create_nfs_export' because the condition on line 207 was always true
208 return response["id"]
210 def delete_filesystem(self, filesystem_id):
211 """Deletes a filesystem and all associated export.
213 :param filesystem_id: ID of the filesystem to delete
214 :return: True if deleted successfully
215 """
216 url = f'{self.base_url}/v1/file-systems/{filesystem_id}'
217 res = self.execute_powerflex_delete_request(url)
218 return res.status_code == 204
220 def create_snapshot(self, name, filesystem_id):
221 """Creates a snapshot of a filesystem.
223 :param name: name of the snapshot
224 :param filesystem_id: ID of the filesystem
225 :return: ID of the snapshot if created successfully
226 """
227 params = {
228 "name": name
229 }
230 url = f'{self.base_url}/v1/file-systems/{filesystem_id}/snapshot'
231 res, response = self.execute_powerflex_post_request(url, params)
232 return res.status_code == 201
234 def get_nas_server_id(self, nas_server):
235 """Retrieves the NAS server ID.
237 :param nas_server: NAS server name
238 :return: ID of the NAS server if success
239 """
240 url = f'{self.base_url}/v1/nas-servers?select=id&name=eq.{nas_server}'
241 res, response = self.execute_powerflex_get_request(url)
242 if res.status_code == 200: 242 ↛ exitline 242 didn't return from function 'get_nas_server_id' because the condition on line 242 was always true
243 return response[0]['id']
245 def get_nfs_export_name(self, export_id):
246 """Retrieves NFS Export name.
248 :param export_id: ID of the NFS export
249 :return: path of the NFS export if success
250 """
251 url = f'{self.base_url}/v1/nfs-exports/{export_id}?select=*'
252 res, response = self.execute_powerflex_get_request(url)
253 if res.status_code == 200: 253 ↛ exitline 253 didn't return from function 'get_nfs_export_name' because the condition on line 253 was always true
254 return response["name"]
256 def get_filesystem_id(self, name):
257 """Retrieves an ID for a filesystem.
259 :param name: name of the filesystem
260 :return: ID of the filesystem if success
261 """
262 url = f'{self.base_url}/v1/file-systems?select=id&name=eq.{name}'
263 res, response = self.execute_powerflex_get_request(url)
264 if res.status_code == 200: 264 ↛ exitline 264 didn't return from function 'get_filesystem_id' because the condition on line 264 was always true
265 return response[0]['id']
267 def get_nfs_export_id(self, name):
268 """Retrieves NFS Export ID.
270 :param name: name of the NFS export
271 :return: id of the NFS export if success
272 """
273 url = f'{self.base_url}/v1/nfs-exports?select=id&name=eq.{name}'
274 res, response = self.execute_powerflex_get_request(url)
275 if res.status_code == 200: 275 ↛ exitline 275 didn't return from function 'get_nfs_export_id' because the condition on line 275 was always true
276 return response[0]['id']
278 def get_storage_pool_id(self, protection_domain, storage_pool):
279 """Retrieves the Storage Pool ID.
281 :param protection_domain: protection domain name
282 :param storage_pool: storage pool name
283 :return: ID of the storage pool if success
284 """
285 params = {
286 "protectionDomainName": protection_domain,
287 "name": storage_pool
288 }
289 url = (f'{self.host_url}/api/types/StoragePool/instances/'
290 'action/queryIdByKey')
291 res, response = self.execute_powerflex_post_request(url, params)
292 if res.status_code == 200: 292 ↛ exitline 292 didn't return from function 'get_storage_pool_id' because the condition on line 292 was always true
293 return response
295 def set_export_access(self, export_id, rw_hosts, ro_hosts):
296 """Sets the authorization access on the export.
298 :param export_id: NFS export ID
299 :param rw_hosts: a set of RW hosts
300 :param ro_hosts: a set of RO hosts
301 :return: True if operation succeeded
302 """
303 params = {
304 "read_only_hosts": list(ro_hosts),
305 "read_write_root_hosts": list(rw_hosts)
306 }
307 url = f'{self.base_url}/v1/nfs-exports/{export_id}'
308 res = self.execute_powerflex_patch_request(url, params)
309 return res.status_code == 204
311 def extend_export(self, export_id, new_size):
312 """Extends the size of a share to a new size.
314 :param export_id: ID of the NFS export
315 :param new_size: new size to allocate in bytes
316 :return: True if extended successfully
317 """
318 params = {
319 "size_total": new_size
320 }
321 url = f'{self.base_url}/v1/file-systems/{export_id}'
322 res = self.execute_powerflex_patch_request(url, params)
323 return res.status_code == 204
325 def get_fsid_from_export_name(self, name):
326 """Retieves the Filesystem ID used by an export.
328 :param name: name of the export
329 :return: ID of the Filesystem which owns the export
330 """
331 url = (f'{self.base_url}/v1/nfs-exports'
332 f'?select=file_system_id&name=eq.{name}')
333 res, response = self.execute_powerflex_get_request(url)
334 if res.status_code == 200: 334 ↛ exitline 334 didn't return from function 'get_fsid_from_export_name' because the condition on line 334 was always true
335 return response[0]['file_system_id']
337 def get_fsid_from_snapshot_name(self, snapshot_name):
338 """Retrieves the Filesystem ID used by a snapshot.
340 :param snapshot_name: Name of the snapshot
341 :return: ID of the parent filesystem of the snapshot
342 """
343 url = (f'{self.base_url}/v1/file-systems'
344 f'?select=id&name=eq.{snapshot_name}')
345 res, response = self.execute_powerflex_get_request(url)
346 if res.status_code == 200: 346 ↛ exitline 346 didn't return from function 'get_fsid_from_snapshot_name' because the condition on line 346 was always true
347 return response[0]['id']
349 def get_storage_pool_spare_percentage(self, storage_pool_id):
350 """Retrieves the spare capacity percentage of the storage pool.
352 :param storage_pool_id: ID of the storage pool
353 :return: Spare capacity percentage of the storage pool
354 """
355 url = f'{self.host_url}/api/instances/StoragePool::{storage_pool_id}'
356 res, response = self.execute_powerflex_get_request(url)
357 if res.status_code == 200: 357 ↛ exitline 357 didn't return from function 'get_storage_pool_spare_percentage' because the condition on line 357 was always true
358 return response['sparePercentage']
360 def get_storage_pool_statistic(self, storage_pool_id):
361 """Retrieves the spare capacity percentage of the storage pool.
363 :param storage_pool_id: ID of the storage pool
364 :return: Statistics of the storage pool
365 """
366 url = (f'{self.host_url}/api/instances/StoragePool::{storage_pool_id}'
367 '/relationships/Statistics')
368 res, response = self.execute_powerflex_get_request(url)
369 if res.status_code == 200: 369 ↛ 376line 369 didn't jump to line 376 because the condition on line 369 was always true
370 statistics = {
371 "maxCapacityInKb": response['maxCapacityInKb'],
372 "capacityInUseInKb": response['capacityInUseInKb'],
373 "netUnusedCapacityInKb": response['netUnusedCapacityInKb'],
374 "primaryVacInKb": response['primaryVacInKb'],
375 }
376 return statistics
378 def get_nas_server_interfaces(self, nas_server_id):
379 """Retrieves the file interfaces for a given na_server.
381 :param nas_server_id: ID of the NAS server
382 :return: file interfaces of the NAS server
383 """
384 url = (f'{self.base_url}/v1/file-interfaces'
385 f'?select=ip_address&nas_server_id=eq.{nas_server_id}')
386 res, response = self.execute_powerflex_get_request(url)
387 if res.status_code == 200:
388 return [i['ip_address'] for i in response]