Coverage for manila/share/drivers/dell_emc/plugins/powerstore/client.py: 99%

173 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2023 Dell Inc. or its subsidiaries. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""REST client for Dell EMC PowerStore Manila Driver.""" 

17 

18import functools 

19import json 

20 

21from oslo_log import log as logging 

22from oslo_utils import strutils 

23import requests 

24 

25LOG = logging.getLogger(__name__) 

26 

27 

28class PowerStoreClient(object): 

29 def __init__(self, 

30 rest_ip, 

31 rest_username, 

32 rest_password, 

33 verify_certificate=False, 

34 certificate_path=None): 

35 self.rest_ip = rest_ip 

36 self.rest_username = rest_username 

37 self.rest_password = rest_password 

38 self.verify_certificate = verify_certificate 

39 self.certificate_path = certificate_path 

40 self.base_url = "https://%s/api/rest" % self.rest_ip 

41 self.ok_codes = [ 

42 requests.codes.ok, 

43 requests.codes.created, 

44 requests.codes.accepted, 

45 requests.codes.no_content, 

46 requests.codes.partial_content 

47 ] 

48 

49 @property 

50 def _verify_cert(self): 

51 verify_cert = self.verify_certificate 

52 if self.verify_certificate and self.certificate_path: 

53 verify_cert = self.certificate_path 

54 return verify_cert 

55 

56 def _send_request(self, 

57 method, 

58 url, 

59 payload=None, 

60 params=None, 

61 log_response_data=True): 

62 if not params: 62 ↛ 64line 62 didn't jump to line 64 because the condition on line 62 was always true

63 params = {} 

64 request_params = { 

65 "auth": (self.rest_username, self.rest_password), 

66 "verify": self._verify_cert, 

67 "params": params 

68 } 

69 if payload and method != "GET": 

70 request_params["data"] = json.dumps(payload) 

71 request_url = self.base_url + url 

72 r = requests.request(method, request_url, **request_params) 

73 

74 log_level = logging.DEBUG 

75 if r.status_code not in self.ok_codes: 

76 log_level = logging.ERROR 

77 LOG.log(log_level, 

78 "REST Request: %s %s with body %s", 

79 r.request.method, 

80 r.request.url, 

81 strutils.mask_password(r.request.body)) 

82 if log_response_data or log_level == logging.ERROR: 

83 msg = "REST Response: %s with data %s" % (r.status_code, r.text) 

84 else: 

85 msg = "REST Response: %s" % r.status_code 

86 LOG.log(log_level, msg) 

87 

88 try: 

89 response = r.json() 

90 except ValueError: 

91 response = None 

92 return r, response 

93 

94 _send_get_request = functools.partialmethod(_send_request, "GET") 

95 _send_post_request = functools.partialmethod(_send_request, "POST") 

96 _send_patch_request = functools.partialmethod(_send_request, "PATCH") 

97 _send_delete_request = functools.partialmethod(_send_request, "DELETE") 

98 

99 def get_nas_server_id(self, nas_server_name): 

100 """Retrieves the NAS server ID. 

101 

102 :param nas_server_name: NAS server name 

103 :return: ID of the NAS server if success 

104 """ 

105 url = '/nas_server?name=eq.' + nas_server_name 

106 res, response = self._send_get_request(url) 

107 if res.status_code == requests.codes.ok: 

108 return response[0]['id'] 

109 

110 def get_nas_server_interfaces(self, nas_server_id): 

111 """Retrieves the NAS server ID. 

112 

113 :param nas_server_id: NAS server ID 

114 :return: File interfaces of the NAS server if success 

115 """ 

116 url = '/nas_server/' + nas_server_id + \ 

117 '?select=current_preferred_IPv4_interface_id,' \ 

118 'current_preferred_IPv6_interface_id,' \ 

119 'file_interfaces(id,ip_address)' 

120 res, response = self._send_get_request(url) 

121 if res.status_code == requests.codes.ok: 

122 preferred_IP = [response['current_preferred_IPv4_interface_id'], 

123 response['current_preferred_IPv6_interface_id']] 

124 file_interfaces = [] 

125 for i in response['file_interfaces']: 

126 file_interfaces.append({ 

127 'ip': i['ip_address'], 

128 'preferred': i['id'] in preferred_IP 

129 }) 

130 return file_interfaces 

131 

132 def create_filesystem(self, nas_server_id, name, size): 

133 """Creates a filesystem. 

134 

135 :param nas_server_id: ID of the nas_server 

136 :param name: name of the filesystem 

137 :param size: size in Byte 

138 :return: ID of the filesystem if created successfully 

139 """ 

140 payload = { 

141 "name": name, 

142 "size_total": size, 

143 "nas_server_id": nas_server_id 

144 } 

145 url = '/file_system' 

146 res, response = self._send_post_request(url, payload) 

147 if res.status_code == requests.codes.created: 

148 return response["id"] 

149 

150 def create_nfs_export(self, filesystem_id, name): 

151 """Creates an NFS export. 

152 

153 :param filesystem_id: ID of the filesystem on which 

154 the export will be created 

155 :param name: name of the NFS export 

156 :return: ID of the export if created successfully 

157 """ 

158 payload = { 

159 "file_system_id": filesystem_id, 

160 "path": "/" + str(name), 

161 "name": name 

162 } 

163 url = '/nfs_export' 

164 res, response = self._send_post_request(url, payload) 

165 if res.status_code == requests.codes.created: 

166 return response["id"] 

167 

168 def delete_filesystem(self, filesystem_id): 

169 """Deletes a filesystem and all associated export. 

170 

171 :param filesystem_id: ID of the filesystem to delete 

172 :return: True if deleted successfully 

173 """ 

174 url = '/file_system/' + filesystem_id 

175 res, _ = self._send_delete_request(url) 

176 return res.status_code == requests.codes.no_content 

177 

178 def get_nfs_export_name(self, export_id): 

179 """Retrieves NFS Export name. 

180 

181 :param export_id: ID of the NFS export 

182 :return: path of the NFS export if success 

183 """ 

184 url = '/nfs_export/' + export_id + '?select=name' 

185 res, response = self._send_get_request(url) 

186 if res.status_code == requests.codes.ok: 

187 return response["name"] 

188 

189 def get_nfs_export_id(self, name): 

190 """Retrieves NFS Export ID. 

191 

192 :param name: name of the NFS export 

193 :return: id of the NFS export if success 

194 """ 

195 url = '/nfs_export?select=id&name=eq.' + name 

196 res, response = self._send_get_request(url) 

197 if res.status_code == requests.codes.ok: 

198 return response[0]['id'] 

199 

200 def get_filesystem_id(self, name): 

201 """Retrieves an ID for a filesystem. 

202 

203 :param name: name of the filesystem 

204 :return: ID of the filesystem if success 

205 """ 

206 url = '/file_system?name=eq.' + name 

207 res, response = self._send_get_request(url) 

208 if res.status_code == requests.codes.ok: 

209 return response[0]['id'] 

210 

211 def set_export_access(self, export_id, rw_hosts, ro_hosts): 

212 """Sets the access hosts on the export. 

213 

214 :param export_id: NFS export ID 

215 :param rw_hosts: a set of RW hosts 

216 :param ro_hosts: a set of RO hosts 

217 :return: True if operation succeeded 

218 """ 

219 payload = { 

220 "read_only_hosts": list(ro_hosts), 

221 "read_write_root_hosts": list(rw_hosts) 

222 } 

223 url = '/nfs_export/' + export_id 

224 res, _ = self._send_patch_request(url, payload) 

225 return res.status_code == requests.codes.no_content 

226 

227 def resize_filesystem(self, filesystem_id, new_size): 

228 """Extends the size of a share to a new size. 

229 

230 :param export_id: ID of the NFS export 

231 :param new_size: new size to allocate in bytes 

232 :return: True if extended successfully 

233 """ 

234 payload = { 

235 "size_total": new_size 

236 } 

237 url = '/file_system/' + filesystem_id 

238 res, response = self._send_patch_request(url, payload) 

239 if res.status_code == requests.codes.unprocessable and \ 

240 response['messages'][0]['code'] == '0xE08010080449': 

241 return False, response['messages'][0]['message_l10n'] 

242 return res.status_code == requests.codes.no_content, None 

243 

244 def get_fsid_from_export_name(self, name): 

245 """Retieves the Filesystem ID used by an export. 

246 

247 :param name: name of the export 

248 :return: ID of the Filesystem which owns the export 

249 """ 

250 url = '/nfs_export?select=file_system_id&name=eq.' + name 

251 res, response = self._send_get_request(url) 

252 if res.status_code == requests.codes.ok: 

253 return response[0]['file_system_id'] 

254 

255 def create_snapshot(self, filesystem_id, name): 

256 """Creates a snapshot of a filesystem. 

257 

258 :param filesystem_id: ID of the filesystem 

259 :param name: name of the snapshot 

260 :return: ID of the snapshot if created successfully 

261 """ 

262 payload = { 

263 "name": name 

264 } 

265 url = '/file_system/' + filesystem_id + '/snapshot' 

266 res, response = self._send_post_request(url, payload) 

267 if res.status_code == requests.codes.created: 

268 return response["id"] 

269 

270 def restore_snapshot(self, snapshot_id): 

271 """Restore a snapshot of a filesystem. 

272 

273 :param snapshot_id: ID of the snapshot 

274 :return: True if operation succeeded 

275 """ 

276 url = '/file_system/' + snapshot_id + '/restore' 

277 res, _ = self._send_post_request(url) 

278 return res.status_code == requests.codes.no_content 

279 

280 def clone_snapshot(self, snapshot_id, name): 

281 """Clone a snapshot of a filesystem. 

282 

283 :param snapshot_id: ID of the snapshot 

284 :param name: name the snapshot 

285 :return: ID of the clone if created successfully 

286 """ 

287 payload = { 

288 "name": name 

289 } 

290 url = '/file_system/' + snapshot_id + '/clone' 

291 res, response = self._send_post_request(url, payload) 

292 if res.status_code == requests.codes.created: 

293 return response["id"] 

294 

295 def get_cluster_id(self): 

296 """Get cluster id. 

297 

298 :return: ID of the cluster 

299 """ 

300 url = '/cluster' 

301 res, response = self._send_get_request(url) 

302 if res.status_code == requests.codes.ok: 

303 return response[0]["id"] 

304 

305 def retreive_cluster_capacity_metrics(self, cluster_id): 

306 """Retreive cluster capacity metrics. 

307 

308 :param cluster_id: ID of the cluster 

309 :return: total and used capacity in Byte 

310 """ 

311 payload = { 

312 "entity": "space_metrics_by_cluster", 

313 "entity_id": cluster_id 

314 } 

315 url = '/metrics/generate?order=timestamp' 

316 # disable logging of the response 

317 res, response = self._send_post_request(url, payload, 

318 log_response_data=False) 

319 if res.status_code == requests.codes.ok: 

320 # latest cluster capacity metrics 

321 latestMetrics = response[len(response) - 1] 

322 LOG.debug(f"Latest cluster capacity: {latestMetrics}") 

323 return (latestMetrics["physical_total"], 

324 latestMetrics["physical_used"]) 

325 return None, None 

326 

327 def create_smb_share(self, filesystem_id, name): 

328 """Creates a SMB share. 

329 

330 :param filesystem_id: ID of the filesystem on which 

331 the export will be created 

332 :param name: name of the SMB share 

333 :return: ID of the share if created successfully 

334 """ 

335 payload = { 

336 "file_system_id": filesystem_id, 

337 "path": "/" + str(name), 

338 "name": name 

339 } 

340 url = '/smb_share' 

341 res, response = self._send_post_request(url, payload) 

342 if res.status_code == requests.codes.created: 

343 return response["id"] 

344 

345 def get_fsid_from_share_name(self, name): 

346 """Retieves the Filesystem ID used by a SMB share. 

347 

348 :param name: name of the SMB share 

349 :return: ID of the Filesystem which owns the share 

350 """ 

351 url = '/smb_share?select=file_system_id&name=eq.' + name 

352 res, response = self._send_get_request(url) 

353 if res.status_code == requests.codes.ok: 

354 return response[0]['file_system_id'] 

355 

356 def get_smb_share_id(self, name): 

357 """Retrieves SMB share ID. 

358 

359 :param name: name of the SMB share 

360 :return: id of the SMB share if success 

361 """ 

362 url = '/smb_share?select=id&name=eq.' + name 

363 res, response = self._send_get_request(url) 

364 if res.status_code == requests.codes.ok: 

365 return response[0]['id'] 

366 

367 def get_nas_server_smb_netbios(self, nas_server_name): 

368 """Retrieves the domain name or netbios name. 

369 

370 :param nas_server_name: NAS server name 

371 :return: Netbios name of SMB server if success 

372 """ 

373 url = '/nas_server?select=smb_servers(is_standalone,netbios_name)' \ 

374 '&name=eq.' + nas_server_name 

375 res, response = self._send_get_request(url) 

376 if res.status_code == requests.codes.ok: 

377 smb_server = response[0]['smb_servers'][0] 

378 if smb_server["is_standalone"]: 378 ↛ exitline 378 didn't return from function 'get_nas_server_smb_netbios' because the condition on line 378 was always true

379 return smb_server["netbios_name"] 

380 

381 def set_acl(self, smb_share_id, cifs_rw_users, cifs_ro_users): 

382 """Set ACL for a SMB share. 

383 

384 :param smb_share_id: ID of the SMB share 

385 :param name: name of the SMB share 

386 :return: ID of the share if created successfully 

387 """ 

388 aces = list() 

389 for rw_user in cifs_rw_users: 

390 ace = { 

391 "trustee_type": "User", 

392 "trustee_name": rw_user, 

393 "access_level": "Change", 

394 "access_type": "Allow" 

395 } 

396 aces.append(ace) 

397 

398 for ro_user in cifs_ro_users: 

399 ace = { 

400 "trustee_type": "User", 

401 "trustee_name": ro_user, 

402 "access_level": "Read", 

403 "access_type": "Allow" 

404 } 

405 aces.append(ace) 

406 

407 payload = { 

408 "aces": aces 

409 } 

410 url = '/smb_share/' + smb_share_id + '/set_acl' 

411 res, _ = self._send_post_request(url, payload) 

412 return res.status_code == requests.codes.no_content