Coverage for manila/share/drivers/nexenta/ns5/jsonrpc.py: 99%
374 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2019 Nexenta by DDN, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import hashlib
17import json
18import posixpath
19import time
20from urllib import parse as urlparse
22from oslo_log import log as logging
23import requests
25from manila import exception
26from manila.i18n import _
28LOG = logging.getLogger(__name__)
31class NefException(exception.ManilaException):
32 def __init__(self, data=None, **kwargs):
33 defaults = {
34 'name': 'NexentaError',
35 'code': 'EBADMSG',
36 'source': 'ManilaDriver',
37 'message': 'Unknown error'
38 }
39 if isinstance(data, dict):
40 for key in defaults:
41 if key in kwargs:
42 continue
43 if key in data:
44 kwargs[key] = data[key]
45 else:
46 kwargs[key] = defaults[key]
47 elif isinstance(data, str):
48 if 'message' not in kwargs:
49 kwargs['message'] = data
50 for key in defaults:
51 if key not in kwargs:
52 kwargs[key] = defaults[key]
53 message = (_('%(message)s (source: %(source)s, '
54 'name: %(name)s, code: %(code)s)')
55 % kwargs)
56 self.code = kwargs['code']
57 del kwargs['message']
58 super(NefException, self).__init__(message, **kwargs)
61class NefRequest(object):
62 def __init__(self, proxy, method):
63 self.proxy = proxy
64 self.method = method
65 self.path = None
66 self.lock = False
67 self.time = 0
68 self.data = []
69 self.payload = {}
70 self.stat = {}
71 self.hooks = {
72 'response': self.hook
73 }
74 self.kwargs = {
75 'hooks': self.hooks,
76 'timeout': self.proxy.timeout
77 }
79 def __call__(self, path, payload=None):
80 LOG.debug('NEF request start: %(method)s %(path)s %(payload)s',
81 {'method': self.method, 'path': path, 'payload': payload})
82 if self.method not in ['get', 'delete', 'put', 'post']:
83 message = (_('NEF API does not support %(method)s method'),
84 {'method': self.method})
85 raise NefException(code='EINVAL', message=message)
86 if not path:
87 message = (_('NEF API call requires collection path'))
88 raise NefException(code='EINVAL', message=message)
89 self.path = path
90 if payload:
91 if not isinstance(payload, dict):
92 message = (_('NEF API call payload must be a dictionary'))
93 raise NefException(code='EINVAL', message=message)
94 if self.method in ['get', 'delete']:
95 self.payload = {'params': payload}
96 elif self.method in ['put', 'post']: 96 ↛ 98line 96 didn't jump to line 98 because the condition on line 96 was always true
97 self.payload = {'data': json.dumps(payload)}
98 try:
99 response = self.request(self.method, self.path, **self.payload)
100 except (requests.exceptions.ConnectionError,
101 requests.exceptions.Timeout) as error:
102 LOG.debug('Failed to %(method)s %(path)s %(payload)s: %(error)s',
103 {'method': self.method, 'path': self.path,
104 'payload': self.payload, 'error': error})
105 if not self.failover():
106 raise error
107 LOG.debug('Retry initial request after failover: '
108 '%(method)s %(path)s %(payload)s',
109 {'method': self.method,
110 'path': self.path,
111 'payload': self.payload})
112 response = self.request(self.method, self.path, **self.payload)
113 LOG.debug('NEF request done: %(method)s %(path)s %(payload)s, '
114 'total response time: %(time)s seconds, '
115 'total requests count: %(count)s, '
116 'requests statistics: %(stat)s',
117 {'method': self.method,
118 'path': self.path,
119 'payload': self.payload,
120 'time': self.time,
121 'count': sum(self.stat.values()),
122 'stat': self.stat})
123 if response.ok and not response.content:
124 return None
125 content = json.loads(response.content)
126 if not response.ok:
127 raise NefException(content)
128 if isinstance(content, dict) and 'data' in content:
129 return self.data
130 return content
132 def request(self, method, path, **kwargs):
133 url = self.proxy.url(path)
134 LOG.debug('Perform session request: %(method)s %(url)s %(body)s',
135 {'method': method, 'url': url, 'body': kwargs})
136 kwargs.update(self.kwargs)
137 return self.proxy.session.request(method, url, **kwargs)
139 def hook(self, response, **kwargs):
140 initial_text = (_('initial request %(method)s %(path)s %(body)s')
141 % {'method': self.method,
142 'path': self.path,
143 'body': self.payload})
144 request_text = (_('session request %(method)s %(url)s %(body)s')
145 % {'method': response.request.method,
146 'url': response.request.url,
147 'body': response.request.body})
148 response_text = (_('session response %(code)s %(content)s')
149 % {'code': response.status_code,
150 'content': response.content})
151 text = (_('%(request_text)s and %(response_text)s')
152 % {'request_text': request_text,
153 'response_text': response_text})
154 LOG.debug('Hook start on %(text)s', {'text': text})
156 if response.status_code not in self.stat:
157 self.stat[response.status_code] = 0
158 self.stat[response.status_code] += 1
159 self.time += response.elapsed.total_seconds()
161 if response.ok and not response.content:
162 LOG.debug('Hook done on %(text)s: '
163 'empty response content',
164 {'text': text})
165 return response
167 if not response.content:
168 message = (_('There is no response content '
169 'is available for %(text)s')
170 % {'text': text})
171 raise NefException(code='ENODATA', message=message)
173 try:
174 content = json.loads(response.content)
175 except (TypeError, ValueError) as error:
176 message = (_('Failed to decode JSON for %(text)s: %(error)s')
177 % {'text': text, 'error': error})
178 raise NefException(code='ENOMSG', message=message)
180 method = 'get'
181 # pylint: disable=no-member
182 if response.status_code == requests.codes.unauthorized:
183 if self.stat[response.status_code] > self.proxy.retries:
184 raise NefException(content)
185 self.auth()
186 request = response.request.copy()
187 request.headers.update(self.proxy.session.headers)
188 LOG.debug('Retry last %(text)s after authentication',
189 {'text': request_text})
190 return self.proxy.session.send(request, **kwargs)
191 elif response.status_code == requests.codes.not_found:
192 if self.lock:
193 LOG.debug('Hook done on %(text)s: '
194 'nested failover is detected',
195 {'text': text})
196 return response
197 if self.stat[response.status_code] > self.proxy.retries:
198 raise NefException(content)
199 if not self.failover():
200 LOG.debug('Hook done on %(text)s: '
201 'no valid hosts found',
202 {'text': text})
203 return response
204 LOG.debug('Retry %(text)s after failover',
205 {'text': initial_text})
206 return self.request(self.method, self.path, **self.payload)
207 elif response.status_code == requests.codes.server_error:
208 if not (isinstance(content, dict) and
209 'code' in content and
210 content['code'] == 'EBUSY'):
211 raise NefException(content)
212 if self.stat[response.status_code] > self.proxy.retries:
213 raise NefException(content)
214 self.proxy.delay(self.stat[response.status_code])
215 LOG.debug('Retry %(text)s after delay',
216 {'text': initial_text})
217 return self.request(self.method, self.path, **self.payload)
218 elif response.status_code == requests.codes.accepted:
219 path = self.getpath(content, 'monitor')
220 if not path:
221 message = (_('There is no monitor path '
222 'available for %(text)s')
223 % {'text': text})
224 raise NefException(code='ENOMSG', message=message)
225 self.proxy.delay(self.stat[response.status_code])
226 return self.request(method, path)
227 elif response.status_code == requests.codes.ok:
228 if not (isinstance(content, dict) and 'data' in content):
229 LOG.debug('Hook done on %(text)s: there '
230 'is no JSON data available',
231 {'text': text})
232 return response
233 LOG.debug('Append %(count)s data items to response',
234 {'count': len(content['data'])})
235 self.data += content['data']
236 path = self.getpath(content, 'next')
237 if not path:
238 LOG.debug('Hook done on %(text)s: there '
239 'is no next path available',
240 {'text': text})
241 return response
242 LOG.debug('Perform next session request %(method)s %(path)s',
243 {'method': method, 'path': path})
244 return self.request(method, path)
245 LOG.debug('Hook done on %(text)s and '
246 'returned original response',
247 {'text': text})
248 return response
250 def auth(self):
251 method = 'post'
252 path = 'auth/login'
253 payload = {'username': self.proxy.username,
254 'password': self.proxy.password}
255 data = json.dumps(payload)
256 kwargs = {'data': data}
257 self.proxy.delete_bearer()
258 response = self.request(method, path, **kwargs)
259 content = json.loads(response.content)
260 if not (isinstance(content, dict) and 'token' in content):
261 message = (_('There is no authentication token available '
262 'for authentication request %(method)s %(url)s '
263 '%(body)s and response %(code)s %(content)s')
264 % {'method': response.request.method,
265 'url': response.request.url,
266 'body': response.request.body,
267 'code': response.status_code,
268 'content': response.content})
269 raise NefException(code='ENODATA', message=message)
270 token = content['token']
271 self.proxy.update_token(token)
273 def failover(self):
274 result = False
275 self.lock = True
276 method = 'get'
277 host = self.proxy.host
278 root = self.proxy.root
279 for item in self.proxy.hosts:
280 if item == host:
281 continue
282 self.proxy.update_host(item)
283 LOG.debug('Try to failover path '
284 '%(root)s to host %(host)s',
285 {'root': root, 'host': item})
286 try:
287 response = self.request(method, root)
288 except (requests.exceptions.ConnectionError,
289 requests.exceptions.Timeout) as error:
290 LOG.debug('Skip unavailable host %(host)s '
291 'due to error: %(error)s',
292 {'host': item, 'error': error})
293 continue
294 LOG.debug('Failover result: %(code)s %(content)s',
295 {'code': response.status_code,
296 'content': response.content})
297 # pylint: disable=no-member
298 if response.status_code == requests.codes.ok:
299 LOG.debug('Successful failover path '
300 '%(root)s to host %(host)s',
301 {'root': root, 'host': item})
302 self.proxy.update_lock()
303 result = True
304 break
305 else:
306 LOG.debug('Skip unsuitable host %(host)s: '
307 'there is no %(root)s path found',
308 {'host': item, 'root': root})
309 self.lock = False
310 return result
312 @staticmethod
313 def getpath(content, name):
314 if isinstance(content, dict) and 'links' in content:
315 for link in content['links']:
316 if not isinstance(link, dict):
317 continue
318 if 'rel' in link and 'href' in link:
319 if link['rel'] == name:
320 return link['href']
321 return None
324class NefCollections(object):
325 subj = 'collection'
326 root = '/collections'
328 def __init__(self, proxy):
329 self.proxy = proxy
331 def path(self, name):
332 quoted_name = urlparse.quote_plus(name)
333 return posixpath.join(self.root, quoted_name)
335 def get(self, name, payload=None):
336 LOG.debug('Get properties of %(subj)s %(name)s: %(payload)s',
337 {'subj': self.subj, 'name': name, 'payload': payload})
338 path = self.path(name)
339 return self.proxy.get(path, payload)
341 def set(self, name, payload=None):
342 LOG.debug('Modify properties of %(subj)s %(name)s: %(payload)s',
343 {'subj': self.subj, 'name': name, 'payload': payload})
344 path = self.path(name)
345 return self.proxy.put(path, payload)
347 def list(self, payload=None):
348 LOG.debug('List of %(subj)ss: %(payload)s',
349 {'subj': self.subj, 'payload': payload})
350 return self.proxy.get(self.root, payload)
352 def create(self, payload=None):
353 LOG.debug('Create %(subj)s: %(payload)s',
354 {'subj': self.subj, 'payload': payload})
355 try:
356 return self.proxy.post(self.root, payload)
357 except NefException as error:
358 if error.code != 'EEXIST':
359 raise error
361 def delete(self, name, payload=None):
362 LOG.debug('Delete %(subj)s %(name)s: %(payload)s',
363 {'subj': self.subj, 'name': name, 'payload': payload})
364 path = self.path(name)
365 try:
366 return self.proxy.delete(path, payload)
367 except NefException as error:
368 if error.code != 'ENOENT':
369 raise error
372class NefSettings(NefCollections):
373 subj = 'setting'
374 root = '/settings/properties'
376 def create(self, payload=None):
377 return NotImplemented
379 def delete(self, name, payload=None):
380 return NotImplemented
383class NefDatasets(NefCollections):
384 subj = 'dataset'
385 root = '/storage/datasets'
387 def rename(self, name, payload=None):
388 LOG.debug('Rename %(subj)s %(name)s: %(payload)s',
389 {'subj': self.subj, 'name': name, 'payload': payload})
390 path = posixpath.join(self.path(name), 'rename')
391 return self.proxy.post(path, payload)
394class NefSnapshots(NefDatasets, NefCollections):
395 subj = 'snapshot'
396 root = '/storage/snapshots'
398 def clone(self, name, payload=None):
399 LOG.debug('Clone %(subj)s %(name)s: %(payload)s',
400 {'subj': self.subj, 'name': name, 'payload': payload})
401 path = posixpath.join(self.path(name), 'clone')
402 return self.proxy.post(path, payload)
405class NefFilesystems(NefDatasets, NefCollections):
406 subj = 'filesystem'
407 root = '/storage/filesystems'
409 def rollback(self, name, payload=None):
410 LOG.debug('Rollback %(subj)s %(name)s: %(payload)s',
411 {'subj': self.subj, 'name': name, 'payload': payload})
412 path = posixpath.join(self.path(name), 'rollback')
413 return self.proxy.post(path, payload)
415 def mount(self, name, payload=None):
416 LOG.debug('Mount %(subj)s %(name)s: %(payload)s',
417 {'subj': self.subj, 'name': name, 'payload': payload})
418 path = posixpath.join(self.path(name), 'mount')
419 return self.proxy.post(path, payload)
421 def unmount(self, name, payload=None):
422 LOG.debug('Unmount %(subj)s %(name)s: %(payload)s',
423 {'subj': self.subj, 'name': name, 'payload': payload})
424 path = posixpath.join(self.path(name), 'unmount')
425 return self.proxy.post(path, payload)
427 def acl(self, name, payload=None):
428 LOG.debug('Set %(subj)s %(name)s ACL: %(payload)s',
429 {'subj': self.subj, 'name': name, 'payload': payload})
430 path = posixpath.join(self.path(name), 'acl')
431 return self.proxy.post(path, payload)
433 def promote(self, name, payload=None):
434 LOG.debug('Promote %(subj)s %(name)s: %(payload)s',
435 {'subj': self.subj, 'name': name, 'payload': payload})
436 path = posixpath.join(self.path(name), 'promote')
437 return self.proxy.post(path, payload)
440class NefHpr(NefCollections):
441 subj = 'HPR service'
442 root = '/hpr'
444 def activate(self, payload=None):
445 LOG.debug('Activate %(payload)s',
446 {'payload': payload})
447 path = posixpath.join(self.root, 'activate')
448 return self.proxy.post(path, payload)
450 def start(self, name, payload=None):
451 LOG.debug('Start %(subj)s %(name)s: %(payload)s',
452 {'subj': self.subj, 'name': name, 'payload': payload})
453 path = posixpath.join(self.path(name), 'start')
454 return self.proxy.post(path, payload)
457class NefServices(NefCollections):
458 subj = 'service'
459 root = '/services'
462class NefNfs(NefCollections):
463 subj = 'NFS'
464 root = '/nas/nfs'
467class NefNetAddresses(NefCollections):
468 subj = 'network address'
469 root = '/network/addresses'
472class NefProxy(object):
473 def __init__(self, proto, path, conf):
474 self.session = requests.Session()
475 self.settings = NefSettings(self)
476 self.filesystems = NefFilesystems(self)
477 self.snapshots = NefSnapshots(self)
478 self.services = NefServices(self)
479 self.hpr = NefHpr(self)
480 self.nfs = NefNfs(self)
481 self.netaddrs = NefNetAddresses(self)
482 self.proto = proto
483 self.path = path
484 self.lock = None
485 self.tokens = {}
486 self.headers = {
487 'Content-Type': 'application/json',
488 'X-XSS-Protection': '1'
489 }
490 if conf.nexenta_use_https:
491 self.scheme = 'https'
492 else:
493 self.scheme = 'http'
494 self.username = conf.nexenta_user
495 self.password = conf.nexenta_password
496 self.hosts = []
497 if conf.nexenta_rest_addresses:
498 for host in conf.nexenta_rest_addresses:
499 self.hosts.append(host.strip())
500 self.root = self.filesystems.path(path)
501 if not self.hosts:
502 self.hosts.append(conf.nexenta_nas_host)
503 self.host = self.hosts[0]
504 if conf.nexenta_rest_port:
505 self.port = conf.nexenta_rest_port
506 else:
507 if conf.nexenta_use_https:
508 self.port = 8443
509 else:
510 self.port = 8080
511 self.backoff_factor = conf.nexenta_rest_backoff_factor
512 self.retries = len(self.hosts) * conf.nexenta_rest_retry_count
513 self.timeout = (
514 conf.nexenta_rest_connect_timeout, conf.nexenta_rest_read_timeout)
515 # pylint: disable=no-member
516 max_retries = requests.packages.urllib3.util.retry.Retry(
517 total=conf.nexenta_rest_retry_count,
518 backoff_factor=conf.nexenta_rest_backoff_factor)
519 adapter = requests.adapters.HTTPAdapter(max_retries=max_retries)
520 self.session.verify = conf.nexenta_ssl_cert_verify
521 self.session.headers.update(self.headers)
522 self.session.mount('%s://' % self.scheme, adapter)
523 if not conf.nexenta_ssl_cert_verify:
524 requests.packages.urllib3.disable_warnings()
525 self.update_lock()
527 def __getattr__(self, name):
528 return NefRequest(self, name)
530 def delete_bearer(self):
531 if 'Authorization' in self.session.headers:
532 del self.session.headers['Authorization']
534 def update_bearer(self, token):
535 bearer = 'Bearer %s' % token
536 self.session.headers['Authorization'] = bearer
538 def update_token(self, token):
539 self.tokens[self.host] = token
540 self.update_bearer(token)
542 def update_host(self, host):
543 self.host = host
544 if host in self.tokens:
545 token = self.tokens[host]
546 self.update_bearer(token)
548 def update_lock(self):
549 prop = self.settings.get('system.guid')
550 guid = prop.get('value')
551 path = '%s:%s' % (guid, self.path)
552 if isinstance(path, str): 552 ↛ 554line 552 didn't jump to line 554 because the condition on line 552 was always true
553 path = path.encode('utf-8')
554 self.lock = hashlib.md5(path).hexdigest() # nosec B324
556 def url(self, path):
557 netloc = '%s:%d' % (self.host, int(self.port))
558 components = (self.scheme, netloc, str(path), None, None)
559 url = urlparse.urlunsplit(components)
560 return url
562 def delay(self, attempt):
563 interval = int(self.backoff_factor * (2 ** (attempt - 1)))
564 LOG.debug('Waiting for %(interval)s seconds',
565 {'interval': interval})
566 time.sleep(interval)