Coverage for manila/share/drivers/nexenta/ns5/jsonrpc.py: 99%

374 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2019 Nexenta by DDN, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import hashlib 

17import json 

18import posixpath 

19import time 

20from urllib import parse as urlparse 

21 

22from oslo_log import log as logging 

23import requests 

24 

25from manila import exception 

26from manila.i18n import _ 

27 

28LOG = logging.getLogger(__name__) 

29 

30 

31class NefException(exception.ManilaException): 

32 def __init__(self, data=None, **kwargs): 

33 defaults = { 

34 'name': 'NexentaError', 

35 'code': 'EBADMSG', 

36 'source': 'ManilaDriver', 

37 'message': 'Unknown error' 

38 } 

39 if isinstance(data, dict): 

40 for key in defaults: 

41 if key in kwargs: 

42 continue 

43 if key in data: 

44 kwargs[key] = data[key] 

45 else: 

46 kwargs[key] = defaults[key] 

47 elif isinstance(data, str): 

48 if 'message' not in kwargs: 

49 kwargs['message'] = data 

50 for key in defaults: 

51 if key not in kwargs: 

52 kwargs[key] = defaults[key] 

53 message = (_('%(message)s (source: %(source)s, ' 

54 'name: %(name)s, code: %(code)s)') 

55 % kwargs) 

56 self.code = kwargs['code'] 

57 del kwargs['message'] 

58 super(NefException, self).__init__(message, **kwargs) 

59 

60 

61class NefRequest(object): 

62 def __init__(self, proxy, method): 

63 self.proxy = proxy 

64 self.method = method 

65 self.path = None 

66 self.lock = False 

67 self.time = 0 

68 self.data = [] 

69 self.payload = {} 

70 self.stat = {} 

71 self.hooks = { 

72 'response': self.hook 

73 } 

74 self.kwargs = { 

75 'hooks': self.hooks, 

76 'timeout': self.proxy.timeout 

77 } 

78 

79 def __call__(self, path, payload=None): 

80 LOG.debug('NEF request start: %(method)s %(path)s %(payload)s', 

81 {'method': self.method, 'path': path, 'payload': payload}) 

82 if self.method not in ['get', 'delete', 'put', 'post']: 

83 message = (_('NEF API does not support %(method)s method'), 

84 {'method': self.method}) 

85 raise NefException(code='EINVAL', message=message) 

86 if not path: 

87 message = (_('NEF API call requires collection path')) 

88 raise NefException(code='EINVAL', message=message) 

89 self.path = path 

90 if payload: 

91 if not isinstance(payload, dict): 

92 message = (_('NEF API call payload must be a dictionary')) 

93 raise NefException(code='EINVAL', message=message) 

94 if self.method in ['get', 'delete']: 

95 self.payload = {'params': payload} 

96 elif self.method in ['put', 'post']: 96 ↛ 98line 96 didn't jump to line 98 because the condition on line 96 was always true

97 self.payload = {'data': json.dumps(payload)} 

98 try: 

99 response = self.request(self.method, self.path, **self.payload) 

100 except (requests.exceptions.ConnectionError, 

101 requests.exceptions.Timeout) as error: 

102 LOG.debug('Failed to %(method)s %(path)s %(payload)s: %(error)s', 

103 {'method': self.method, 'path': self.path, 

104 'payload': self.payload, 'error': error}) 

105 if not self.failover(): 

106 raise error 

107 LOG.debug('Retry initial request after failover: ' 

108 '%(method)s %(path)s %(payload)s', 

109 {'method': self.method, 

110 'path': self.path, 

111 'payload': self.payload}) 

112 response = self.request(self.method, self.path, **self.payload) 

113 LOG.debug('NEF request done: %(method)s %(path)s %(payload)s, ' 

114 'total response time: %(time)s seconds, ' 

115 'total requests count: %(count)s, ' 

116 'requests statistics: %(stat)s', 

117 {'method': self.method, 

118 'path': self.path, 

119 'payload': self.payload, 

120 'time': self.time, 

121 'count': sum(self.stat.values()), 

122 'stat': self.stat}) 

123 if response.ok and not response.content: 

124 return None 

125 content = json.loads(response.content) 

126 if not response.ok: 

127 raise NefException(content) 

128 if isinstance(content, dict) and 'data' in content: 

129 return self.data 

130 return content 

131 

132 def request(self, method, path, **kwargs): 

133 url = self.proxy.url(path) 

134 LOG.debug('Perform session request: %(method)s %(url)s %(body)s', 

135 {'method': method, 'url': url, 'body': kwargs}) 

136 kwargs.update(self.kwargs) 

137 return self.proxy.session.request(method, url, **kwargs) 

138 

139 def hook(self, response, **kwargs): 

140 initial_text = (_('initial request %(method)s %(path)s %(body)s') 

141 % {'method': self.method, 

142 'path': self.path, 

143 'body': self.payload}) 

144 request_text = (_('session request %(method)s %(url)s %(body)s') 

145 % {'method': response.request.method, 

146 'url': response.request.url, 

147 'body': response.request.body}) 

148 response_text = (_('session response %(code)s %(content)s') 

149 % {'code': response.status_code, 

150 'content': response.content}) 

151 text = (_('%(request_text)s and %(response_text)s') 

152 % {'request_text': request_text, 

153 'response_text': response_text}) 

154 LOG.debug('Hook start on %(text)s', {'text': text}) 

155 

156 if response.status_code not in self.stat: 

157 self.stat[response.status_code] = 0 

158 self.stat[response.status_code] += 1 

159 self.time += response.elapsed.total_seconds() 

160 

161 if response.ok and not response.content: 

162 LOG.debug('Hook done on %(text)s: ' 

163 'empty response content', 

164 {'text': text}) 

165 return response 

166 

167 if not response.content: 

168 message = (_('There is no response content ' 

169 'is available for %(text)s') 

170 % {'text': text}) 

171 raise NefException(code='ENODATA', message=message) 

172 

173 try: 

174 content = json.loads(response.content) 

175 except (TypeError, ValueError) as error: 

176 message = (_('Failed to decode JSON for %(text)s: %(error)s') 

177 % {'text': text, 'error': error}) 

178 raise NefException(code='ENOMSG', message=message) 

179 

180 method = 'get' 

181 # pylint: disable=no-member 

182 if response.status_code == requests.codes.unauthorized: 

183 if self.stat[response.status_code] > self.proxy.retries: 

184 raise NefException(content) 

185 self.auth() 

186 request = response.request.copy() 

187 request.headers.update(self.proxy.session.headers) 

188 LOG.debug('Retry last %(text)s after authentication', 

189 {'text': request_text}) 

190 return self.proxy.session.send(request, **kwargs) 

191 elif response.status_code == requests.codes.not_found: 

192 if self.lock: 

193 LOG.debug('Hook done on %(text)s: ' 

194 'nested failover is detected', 

195 {'text': text}) 

196 return response 

197 if self.stat[response.status_code] > self.proxy.retries: 

198 raise NefException(content) 

199 if not self.failover(): 

200 LOG.debug('Hook done on %(text)s: ' 

201 'no valid hosts found', 

202 {'text': text}) 

203 return response 

204 LOG.debug('Retry %(text)s after failover', 

205 {'text': initial_text}) 

206 return self.request(self.method, self.path, **self.payload) 

207 elif response.status_code == requests.codes.server_error: 

208 if not (isinstance(content, dict) and 

209 'code' in content and 

210 content['code'] == 'EBUSY'): 

211 raise NefException(content) 

212 if self.stat[response.status_code] > self.proxy.retries: 

213 raise NefException(content) 

214 self.proxy.delay(self.stat[response.status_code]) 

215 LOG.debug('Retry %(text)s after delay', 

216 {'text': initial_text}) 

217 return self.request(self.method, self.path, **self.payload) 

218 elif response.status_code == requests.codes.accepted: 

219 path = self.getpath(content, 'monitor') 

220 if not path: 

221 message = (_('There is no monitor path ' 

222 'available for %(text)s') 

223 % {'text': text}) 

224 raise NefException(code='ENOMSG', message=message) 

225 self.proxy.delay(self.stat[response.status_code]) 

226 return self.request(method, path) 

227 elif response.status_code == requests.codes.ok: 

228 if not (isinstance(content, dict) and 'data' in content): 

229 LOG.debug('Hook done on %(text)s: there ' 

230 'is no JSON data available', 

231 {'text': text}) 

232 return response 

233 LOG.debug('Append %(count)s data items to response', 

234 {'count': len(content['data'])}) 

235 self.data += content['data'] 

236 path = self.getpath(content, 'next') 

237 if not path: 

238 LOG.debug('Hook done on %(text)s: there ' 

239 'is no next path available', 

240 {'text': text}) 

241 return response 

242 LOG.debug('Perform next session request %(method)s %(path)s', 

243 {'method': method, 'path': path}) 

244 return self.request(method, path) 

245 LOG.debug('Hook done on %(text)s and ' 

246 'returned original response', 

247 {'text': text}) 

248 return response 

249 

250 def auth(self): 

251 method = 'post' 

252 path = 'auth/login' 

253 payload = {'username': self.proxy.username, 

254 'password': self.proxy.password} 

255 data = json.dumps(payload) 

256 kwargs = {'data': data} 

257 self.proxy.delete_bearer() 

258 response = self.request(method, path, **kwargs) 

259 content = json.loads(response.content) 

260 if not (isinstance(content, dict) and 'token' in content): 

261 message = (_('There is no authentication token available ' 

262 'for authentication request %(method)s %(url)s ' 

263 '%(body)s and response %(code)s %(content)s') 

264 % {'method': response.request.method, 

265 'url': response.request.url, 

266 'body': response.request.body, 

267 'code': response.status_code, 

268 'content': response.content}) 

269 raise NefException(code='ENODATA', message=message) 

270 token = content['token'] 

271 self.proxy.update_token(token) 

272 

273 def failover(self): 

274 result = False 

275 self.lock = True 

276 method = 'get' 

277 host = self.proxy.host 

278 root = self.proxy.root 

279 for item in self.proxy.hosts: 

280 if item == host: 

281 continue 

282 self.proxy.update_host(item) 

283 LOG.debug('Try to failover path ' 

284 '%(root)s to host %(host)s', 

285 {'root': root, 'host': item}) 

286 try: 

287 response = self.request(method, root) 

288 except (requests.exceptions.ConnectionError, 

289 requests.exceptions.Timeout) as error: 

290 LOG.debug('Skip unavailable host %(host)s ' 

291 'due to error: %(error)s', 

292 {'host': item, 'error': error}) 

293 continue 

294 LOG.debug('Failover result: %(code)s %(content)s', 

295 {'code': response.status_code, 

296 'content': response.content}) 

297 # pylint: disable=no-member 

298 if response.status_code == requests.codes.ok: 

299 LOG.debug('Successful failover path ' 

300 '%(root)s to host %(host)s', 

301 {'root': root, 'host': item}) 

302 self.proxy.update_lock() 

303 result = True 

304 break 

305 else: 

306 LOG.debug('Skip unsuitable host %(host)s: ' 

307 'there is no %(root)s path found', 

308 {'host': item, 'root': root}) 

309 self.lock = False 

310 return result 

311 

312 @staticmethod 

313 def getpath(content, name): 

314 if isinstance(content, dict) and 'links' in content: 

315 for link in content['links']: 

316 if not isinstance(link, dict): 

317 continue 

318 if 'rel' in link and 'href' in link: 

319 if link['rel'] == name: 

320 return link['href'] 

321 return None 

322 

323 

324class NefCollections(object): 

325 subj = 'collection' 

326 root = '/collections' 

327 

328 def __init__(self, proxy): 

329 self.proxy = proxy 

330 

331 def path(self, name): 

332 quoted_name = urlparse.quote_plus(name) 

333 return posixpath.join(self.root, quoted_name) 

334 

335 def get(self, name, payload=None): 

336 LOG.debug('Get properties of %(subj)s %(name)s: %(payload)s', 

337 {'subj': self.subj, 'name': name, 'payload': payload}) 

338 path = self.path(name) 

339 return self.proxy.get(path, payload) 

340 

341 def set(self, name, payload=None): 

342 LOG.debug('Modify properties of %(subj)s %(name)s: %(payload)s', 

343 {'subj': self.subj, 'name': name, 'payload': payload}) 

344 path = self.path(name) 

345 return self.proxy.put(path, payload) 

346 

347 def list(self, payload=None): 

348 LOG.debug('List of %(subj)ss: %(payload)s', 

349 {'subj': self.subj, 'payload': payload}) 

350 return self.proxy.get(self.root, payload) 

351 

352 def create(self, payload=None): 

353 LOG.debug('Create %(subj)s: %(payload)s', 

354 {'subj': self.subj, 'payload': payload}) 

355 try: 

356 return self.proxy.post(self.root, payload) 

357 except NefException as error: 

358 if error.code != 'EEXIST': 

359 raise error 

360 

361 def delete(self, name, payload=None): 

362 LOG.debug('Delete %(subj)s %(name)s: %(payload)s', 

363 {'subj': self.subj, 'name': name, 'payload': payload}) 

364 path = self.path(name) 

365 try: 

366 return self.proxy.delete(path, payload) 

367 except NefException as error: 

368 if error.code != 'ENOENT': 

369 raise error 

370 

371 

372class NefSettings(NefCollections): 

373 subj = 'setting' 

374 root = '/settings/properties' 

375 

376 def create(self, payload=None): 

377 return NotImplemented 

378 

379 def delete(self, name, payload=None): 

380 return NotImplemented 

381 

382 

383class NefDatasets(NefCollections): 

384 subj = 'dataset' 

385 root = '/storage/datasets' 

386 

387 def rename(self, name, payload=None): 

388 LOG.debug('Rename %(subj)s %(name)s: %(payload)s', 

389 {'subj': self.subj, 'name': name, 'payload': payload}) 

390 path = posixpath.join(self.path(name), 'rename') 

391 return self.proxy.post(path, payload) 

392 

393 

394class NefSnapshots(NefDatasets, NefCollections): 

395 subj = 'snapshot' 

396 root = '/storage/snapshots' 

397 

398 def clone(self, name, payload=None): 

399 LOG.debug('Clone %(subj)s %(name)s: %(payload)s', 

400 {'subj': self.subj, 'name': name, 'payload': payload}) 

401 path = posixpath.join(self.path(name), 'clone') 

402 return self.proxy.post(path, payload) 

403 

404 

405class NefFilesystems(NefDatasets, NefCollections): 

406 subj = 'filesystem' 

407 root = '/storage/filesystems' 

408 

409 def rollback(self, name, payload=None): 

410 LOG.debug('Rollback %(subj)s %(name)s: %(payload)s', 

411 {'subj': self.subj, 'name': name, 'payload': payload}) 

412 path = posixpath.join(self.path(name), 'rollback') 

413 return self.proxy.post(path, payload) 

414 

415 def mount(self, name, payload=None): 

416 LOG.debug('Mount %(subj)s %(name)s: %(payload)s', 

417 {'subj': self.subj, 'name': name, 'payload': payload}) 

418 path = posixpath.join(self.path(name), 'mount') 

419 return self.proxy.post(path, payload) 

420 

421 def unmount(self, name, payload=None): 

422 LOG.debug('Unmount %(subj)s %(name)s: %(payload)s', 

423 {'subj': self.subj, 'name': name, 'payload': payload}) 

424 path = posixpath.join(self.path(name), 'unmount') 

425 return self.proxy.post(path, payload) 

426 

427 def acl(self, name, payload=None): 

428 LOG.debug('Set %(subj)s %(name)s ACL: %(payload)s', 

429 {'subj': self.subj, 'name': name, 'payload': payload}) 

430 path = posixpath.join(self.path(name), 'acl') 

431 return self.proxy.post(path, payload) 

432 

433 def promote(self, name, payload=None): 

434 LOG.debug('Promote %(subj)s %(name)s: %(payload)s', 

435 {'subj': self.subj, 'name': name, 'payload': payload}) 

436 path = posixpath.join(self.path(name), 'promote') 

437 return self.proxy.post(path, payload) 

438 

439 

440class NefHpr(NefCollections): 

441 subj = 'HPR service' 

442 root = '/hpr' 

443 

444 def activate(self, payload=None): 

445 LOG.debug('Activate %(payload)s', 

446 {'payload': payload}) 

447 path = posixpath.join(self.root, 'activate') 

448 return self.proxy.post(path, payload) 

449 

450 def start(self, name, payload=None): 

451 LOG.debug('Start %(subj)s %(name)s: %(payload)s', 

452 {'subj': self.subj, 'name': name, 'payload': payload}) 

453 path = posixpath.join(self.path(name), 'start') 

454 return self.proxy.post(path, payload) 

455 

456 

457class NefServices(NefCollections): 

458 subj = 'service' 

459 root = '/services' 

460 

461 

462class NefNfs(NefCollections): 

463 subj = 'NFS' 

464 root = '/nas/nfs' 

465 

466 

467class NefNetAddresses(NefCollections): 

468 subj = 'network address' 

469 root = '/network/addresses' 

470 

471 

472class NefProxy(object): 

473 def __init__(self, proto, path, conf): 

474 self.session = requests.Session() 

475 self.settings = NefSettings(self) 

476 self.filesystems = NefFilesystems(self) 

477 self.snapshots = NefSnapshots(self) 

478 self.services = NefServices(self) 

479 self.hpr = NefHpr(self) 

480 self.nfs = NefNfs(self) 

481 self.netaddrs = NefNetAddresses(self) 

482 self.proto = proto 

483 self.path = path 

484 self.lock = None 

485 self.tokens = {} 

486 self.headers = { 

487 'Content-Type': 'application/json', 

488 'X-XSS-Protection': '1' 

489 } 

490 if conf.nexenta_use_https: 

491 self.scheme = 'https' 

492 else: 

493 self.scheme = 'http' 

494 self.username = conf.nexenta_user 

495 self.password = conf.nexenta_password 

496 self.hosts = [] 

497 if conf.nexenta_rest_addresses: 

498 for host in conf.nexenta_rest_addresses: 

499 self.hosts.append(host.strip()) 

500 self.root = self.filesystems.path(path) 

501 if not self.hosts: 

502 self.hosts.append(conf.nexenta_nas_host) 

503 self.host = self.hosts[0] 

504 if conf.nexenta_rest_port: 

505 self.port = conf.nexenta_rest_port 

506 else: 

507 if conf.nexenta_use_https: 

508 self.port = 8443 

509 else: 

510 self.port = 8080 

511 self.backoff_factor = conf.nexenta_rest_backoff_factor 

512 self.retries = len(self.hosts) * conf.nexenta_rest_retry_count 

513 self.timeout = ( 

514 conf.nexenta_rest_connect_timeout, conf.nexenta_rest_read_timeout) 

515 # pylint: disable=no-member 

516 max_retries = requests.packages.urllib3.util.retry.Retry( 

517 total=conf.nexenta_rest_retry_count, 

518 backoff_factor=conf.nexenta_rest_backoff_factor) 

519 adapter = requests.adapters.HTTPAdapter(max_retries=max_retries) 

520 self.session.verify = conf.nexenta_ssl_cert_verify 

521 self.session.headers.update(self.headers) 

522 self.session.mount('%s://' % self.scheme, adapter) 

523 if not conf.nexenta_ssl_cert_verify: 

524 requests.packages.urllib3.disable_warnings() 

525 self.update_lock() 

526 

527 def __getattr__(self, name): 

528 return NefRequest(self, name) 

529 

530 def delete_bearer(self): 

531 if 'Authorization' in self.session.headers: 

532 del self.session.headers['Authorization'] 

533 

534 def update_bearer(self, token): 

535 bearer = 'Bearer %s' % token 

536 self.session.headers['Authorization'] = bearer 

537 

538 def update_token(self, token): 

539 self.tokens[self.host] = token 

540 self.update_bearer(token) 

541 

542 def update_host(self, host): 

543 self.host = host 

544 if host in self.tokens: 

545 token = self.tokens[host] 

546 self.update_bearer(token) 

547 

548 def update_lock(self): 

549 prop = self.settings.get('system.guid') 

550 guid = prop.get('value') 

551 path = '%s:%s' % (guid, self.path) 

552 if isinstance(path, str): 552 ↛ 554line 552 didn't jump to line 554 because the condition on line 552 was always true

553 path = path.encode('utf-8') 

554 self.lock = hashlib.md5(path).hexdigest() # nosec B324 

555 

556 def url(self, path): 

557 netloc = '%s:%d' % (self.host, int(self.port)) 

558 components = (self.scheme, netloc, str(path), None, None) 

559 url = urlparse.urlunsplit(components) 

560 return url 

561 

562 def delay(self, attempt): 

563 interval = int(self.backoff_factor * (2 ** (attempt - 1))) 

564 LOG.debug('Waiting for %(interval)s seconds', 

565 {'interval': interval}) 

566 time.sleep(interval)