Coverage for manila/share/api.py: 90%

1880 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2010 United States Government as represented by the 

2# Administrator of the National Aeronautics and Space Administration. 

3# All Rights Reserved. 

4# Copyright (c) 2015 Tom Barron. All rights reserved. 

5# Copyright (c) 2015 Mirantis Inc. 

6# 

7# Licensed under the Apache License, Version 2.0 (the "License"); you may 

8# not use this file except in compliance with the License. You may obtain 

9# a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

16# License for the specific language governing permissions and limitations 

17# under the License. 

18 

19""" 

20Handles all requests relating to shares. 

21""" 

22import functools 

23import json 

24import re 

25 

26from oslo_config import cfg 

27from oslo_log import log 

28from oslo_utils import excutils 

29from oslo_utils import strutils 

30from oslo_utils import timeutils 

31from oslo_utils import uuidutils 

32from webob import exc 

33 

34from manila.api import common as api_common 

35from manila.common import constants 

36from manila import context as manila_context 

37from manila import coordination 

38from manila.data import rpcapi as data_rpcapi 

39from manila.db import base 

40from manila import exception 

41from manila.i18n import _ 

42from manila import policy 

43from manila import quota 

44from manila.scheduler import rpcapi as scheduler_rpcapi 

45from manila.share import access 

46from manila.share import rpcapi as share_rpcapi 

47from manila.share import share_types 

48from manila.share import utils as share_utils 

49from manila import utils 

50 

51share_api_opts = [ 

52 cfg.BoolOpt('use_scheduler_creating_share_from_snapshot', 

53 default=False, 

54 help='If set to False, then share creation from snapshot will ' 

55 'be performed on the same host. ' 

56 'If set to True, then scheduler will be used.' 

57 'When enabling this option make sure that filter ' 

58 'CreateFromSnapshotFilter is enabled and to have hosts ' 

59 'reporting replication_domain option.' 

60 ), 

61 cfg.StrOpt('default_mount_point_prefix', 

62 default='{project_id}_', 

63 help='Default prefix that will be used if none is provided' 

64 'through share_type extra specs. Prefix will only be' 

65 'used if share_type support mount_point_name.'), 

66 cfg.BoolOpt('is_deferred_deletion_enabled', 

67 default=False, 

68 help='Whether to delete shares and share snapshots in a ' 

69 'deferred manner. Setting this option to True will cause ' 

70 'quotas to be released immediately if a deletion request ' 

71 'is accepted. Deletions may eventually fail, and ' 

72 'rectifying them will require manual intervention.'), 

73] 

74 

75CONF = cfg.CONF 

76CONF.register_opts(share_api_opts) 

77 

78LOG = log.getLogger(__name__) 

79GB = 1048576 * 1024 

80QUOTAS = quota.QUOTAS 

81 

82AFFINITY_HINT = 'same_host' 

83ANTI_AFFINITY_HINT = 'different_host' 

84AFFINITY_KEY = "__affinity_same_host" 

85ANTI_AFFINITY_KEY = "__affinity_different_host" 

86 

87 

88def locked_security_service_update_operation(operation): 

89 """Lock decorator for security service operation. 

90 

91 Takes a named lock prior to executing the operation. The lock is named with 

92 the ids of the security services. 

93 """ 

94 

95 def wrapped(*args, **kwargs): 

96 new_id = kwargs.get('new_security_service_id', '') 

97 current_id = kwargs.get('current_security_service_id', '') 

98 

99 @coordination.synchronized( 

100 'locked-security-service-update-operation-%(new)s-%(curr)s' % { 

101 'new': new_id, 

102 'curr': current_id, 

103 }) 

104 def locked_security_service_operation(*_args, **_kwargs): 

105 return operation(*_args, **_kwargs) 

106 return locked_security_service_operation(*args, **kwargs) 

107 

108 return wrapped 

109 

110 

111def locked_share_server_update_allocations_operation(operation): 

112 """Lock decorator for share server update allocations operation. 

113 

114 Takes a named lock prior to executing the operation. The lock is named with 

115 the ids of the share network and the region to be updated. 

116 """ 

117 

118 def wrapped(*args, **kwargs): 

119 az_id = kwargs.get('availability_zone_id') 

120 share_net_id = kwargs.get('share_network_id') 

121 

122 @coordination.synchronized( 

123 'locked-share-server-update-allocations-operation-%(net)s-%(az)s' 

124 % { 

125 'net': share_net_id, 

126 'az': az_id, 

127 }) 

128 def locked_share_server_allocations_operation(*_args, **_kwargs): 

129 return operation(*_args, **_kwargs) 

130 return locked_share_server_allocations_operation(*args, **kwargs) 

131 

132 return wrapped 

133 

134 

135class API(base.Base): 

136 """API for interacting with the share manager.""" 

137 

138 def __init__(self, db_driver=None): 

139 super(API, self).__init__(db_driver) 

140 self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI() 

141 self.share_rpcapi = share_rpcapi.ShareAPI() 

142 self.access_helper = access.ShareInstanceAccess(self.db, None) 

143 coordination.LOCK_COORDINATOR.start() 

144 

145 # pylint: disable = no-self-argument 

146 def prevent_locked_action_on_share(arg): 

147 """Decorator for preventing a locked method from executing on a share. 

148 

149 Add this decorator to any API method which takes a RequestContext 

150 object as a first parameter and a share object as the second 

151 parameter. 

152 

153 Can be used in any of the following forms 

154 @prevent_locked_action_on_share 

155 @prevent_locked_action_on_share('my_action_name') 

156 

157 :param arg: Can either be the function being decorated or a str 

158 containing the 'action' that we need to check resource locks for. 

159 If no action name is provided, the function name is assumed to be 

160 the action name. 

161 """ 

162 action_name = None 

163 

164 def check_for_locks(f): 

165 @functools.wraps(f) 

166 def wrapper(self, context, share, *args, **kwargs): 

167 action = action_name or f.__name__ 

168 resource_locks, __ = ( 

169 self.db.resource_lock_get_all( 

170 context.elevated(), 

171 filters={'resource_id': share['id'], 

172 'resource_action': action, 

173 'all_projects': True}, 

174 ) 

175 ) 

176 if resource_locks: 

177 msg_payload = { 

178 'locks': ', '.join( 

179 [lock['id'] for lock in resource_locks] 

180 ), 

181 'action': action, 

182 } 

183 msg = (f"Resource lock/s [{msg_payload['locks']}] " 

184 f"prevent {action} action.") 

185 raise exception.InvalidShare(msg) 

186 return f(self, context, share, *args, **kwargs) 

187 return wrapper 

188 

189 if callable(arg): 

190 return check_for_locks(arg) 

191 else: 

192 action_name = arg 

193 return check_for_locks 

194 

195 def _get_all_availability_zones_with_subnets(self, context, 

196 share_network_id): 

197 compatible_azs_name = [] 

198 compatible_azs_multiple = {} 

199 for az in self.db.availability_zone_get_all(context): 

200 subnets = ( 

201 self.db.share_network_subnets_get_all_by_availability_zone_id( 

202 context, share_network_id=share_network_id, 

203 availability_zone_id=az['id'])) 

204 if subnets: 

205 compatible_azs_multiple[az['id']] = len(subnets) > 1 

206 compatible_azs_name.append(az['name']) 

207 return compatible_azs_name, compatible_azs_multiple 

208 

209 @staticmethod 

210 def check_if_encryption_keys_quotas_exceeded(context, quota_exception): 

211 overs = quota_exception.kwargs['overs'] 

212 usages = quota_exception.kwargs['usages'] 

213 quotas = quota_exception.kwargs['quotas'] 

214 

215 def _consumed(name): 

216 return (usages[name]['reserved'] + usages[name]['in_use']) 

217 

218 if 'encryption_keys' in overs: 

219 LOG.warning("Encryption keys quota exceeded for %(s_pid)s " 

220 "(%(d_consumed)d of %(d_quota)d keys already " 

221 "consumed).", { 

222 's_pid': context.project_id, 

223 'd_consumed': _consumed('encryption_keys'), 

224 'd_quota': quotas['encryption_keys']}) 

225 raise exception.EncryptionKeysLimitExceeded() 

226 

227 @staticmethod 

228 def check_if_share_quotas_exceeded(context, quota_exception, 

229 share_size, operation='create'): 

230 overs = quota_exception.kwargs['overs'] 

231 usages = quota_exception.kwargs['usages'] 

232 quotas = quota_exception.kwargs['quotas'] 

233 

234 def _consumed(name): 

235 return (usages[name]['reserved'] + usages[name]['in_use']) 

236 

237 if 'gigabytes' in overs: 

238 LOG.warning("Quota exceeded for %(s_pid)s, " 

239 "tried to %(operation)s " 

240 "%(s_size)sG share (%(d_consumed)dG of " 

241 "%(d_quota)dG already consumed).", { 

242 's_pid': context.project_id, 

243 's_size': share_size, 

244 'd_consumed': _consumed('gigabytes'), 

245 'd_quota': quotas['gigabytes'], 

246 'operation': operation}) 

247 raise exception.ShareSizeExceedsAvailableQuota() 

248 elif 'shares' in overs: 

249 LOG.warning("Quota exceeded for %(s_pid)s, " 

250 "tried to %(operation)s " 

251 "share (%(d_consumed)d shares " 

252 "already consumed).", { 

253 's_pid': context.project_id, 

254 'd_consumed': _consumed('shares'), 

255 'operation': operation}) 

256 raise exception.ShareLimitExceeded(allowed=quotas['shares']) 

257 

258 @staticmethod 

259 def check_if_replica_quotas_exceeded(context, quota_exception, 

260 replica_size, 

261 resource_type='share_replica'): 

262 overs = quota_exception.kwargs['overs'] 

263 usages = quota_exception.kwargs['usages'] 

264 quotas = quota_exception.kwargs['quotas'] 

265 

266 def _consumed(name): 

267 return (usages[name]['reserved'] + usages[name]['in_use']) 

268 

269 if 'share_replicas' in overs: 

270 LOG.warning("Quota exceeded for %(s_pid)s, " 

271 "unable to create share-replica (%(d_consumed)d " 

272 "of %(d_quota)d already consumed).", { 

273 's_pid': context.project_id, 

274 'd_consumed': _consumed('share_replicas'), 

275 'd_quota': quotas['share_replicas']}) 

276 exception_kwargs = {} 

277 if resource_type != 'share_replica': 

278 msg = _("Failed while creating a share with replication " 

279 "support. Maximum number of allowed share-replicas " 

280 "is exceeded.") 

281 exception_kwargs['message'] = msg 

282 raise exception.ShareReplicasLimitExceeded(**exception_kwargs) 

283 elif 'replica_gigabytes' in overs: 283 ↛ exitline 283 didn't return from function 'check_if_replica_quotas_exceeded' because the condition on line 283 was always true

284 LOG.warning("Quota exceeded for %(s_pid)s, " 

285 "unable to create a share replica size of " 

286 "%(s_size)sG (%(d_consumed)dG of " 

287 "%(d_quota)dG already consumed).", { 

288 's_pid': context.project_id, 

289 's_size': replica_size, 

290 'd_consumed': _consumed('replica_gigabytes'), 

291 'd_quota': quotas['replica_gigabytes']}) 

292 exception_kwargs = {} 

293 if resource_type != 'share_replica': 

294 msg = _("Failed while creating a share with replication " 

295 "support. Requested share replica exceeds allowed " 

296 "project/user or share type gigabytes quota.") 

297 exception_kwargs['message'] = msg 

298 raise exception.ShareReplicaSizeExceedsAvailableQuota( 

299 **exception_kwargs) 

300 

301 def create(self, context, share_proto, size, name, description, 

302 snapshot_id=None, availability_zone=None, metadata=None, 

303 share_network_id=None, share_type=None, is_public=False, 

304 share_group_id=None, share_group_snapshot_member=None, 

305 availability_zones=None, scheduler_hints=None, 

306 az_request_multiple_subnet_support_map=None, 

307 mount_point_name=None, encryption_key_ref=None): 

308 """Create new share.""" 

309 

310 api_common.check_metadata_properties(metadata) 

311 

312 if snapshot_id is not None: 

313 snapshot = self.get_snapshot(context, snapshot_id) 

314 if snapshot['aggregate_status'] != constants.STATUS_AVAILABLE: 

315 msg = _("status must be '%s'") % constants.STATUS_AVAILABLE 

316 raise exception.InvalidShareSnapshot(reason=msg) 

317 if not size: 

318 size = snapshot['size'] 

319 else: 

320 snapshot = None 

321 

322 if not strutils.is_int_like(size) or int(size) <= 0: 

323 msg = (_("Share size '%s' must be an integer and greater than 0") 

324 % size) 

325 raise exception.InvalidInput(reason=msg) 

326 

327 # make sure size has been convert to int. 

328 size = int(size) 

329 if snapshot and size < snapshot['size']: 

330 msg = (_("Share size '%s' must be equal or greater " 

331 "than snapshot size") % size) 

332 raise exception.InvalidInput(reason=msg) 

333 

334 # ensure we pass the share_type provisioning filter on size 

335 share_types.provision_filter_on_size(context, share_type, size) 

336 

337 if snapshot is None: 

338 share_type_id = share_type['id'] if share_type else None 

339 else: 

340 source_share = self.db.share_get(context, snapshot['share_id']) 

341 source_share_az = source_share['instance']['availability_zone'] 

342 if availability_zone is None: 

343 availability_zone = source_share_az 

344 elif (availability_zone != source_share_az 

345 and not CONF.use_scheduler_creating_share_from_snapshot): 

346 LOG.error("The specified availability zone must be the same " 

347 "as parent share when you have the configuration " 

348 "option 'use_scheduler_creating_share_from_snapshot'" 

349 " set to False.") 

350 msg = _("The specified availability zone must be the same " 

351 "as the parent share when creating from snapshot.") 

352 raise exception.InvalidInput(reason=msg) 

353 if share_type is None: 

354 # Grab the source share's share_type if no new share type 

355 # has been provided. 

356 share_type_id = source_share['instance']['share_type_id'] 

357 share_type = share_types.get_share_type(context, share_type_id) 

358 else: 

359 share_type_id = share_type['id'] 

360 if share_type_id != source_share['instance']['share_type_id']: 360 ↛ 369line 360 didn't jump to line 369 because the condition on line 360 was always true

361 msg = _("Invalid share type specified: the requested " 

362 "share type must match the type of the source " 

363 "share. If a share type is not specified when " 

364 "requesting a new share from a snapshot, the " 

365 "share type of the source share will be applied " 

366 "to the new share.") 

367 raise exception.InvalidInput(reason=msg) 

368 

369 supported_share_protocols = ( 

370 proto.upper() for proto in CONF.enabled_share_protocols) 

371 if not (share_proto and 

372 share_proto.upper() in supported_share_protocols): 

373 msg = (_("Invalid share protocol provided: %(provided)s. " 

374 "It is either disabled or unsupported. Available " 

375 "protocols: %(supported)s") % dict( 

376 provided=share_proto, 

377 supported=CONF.enabled_share_protocols)) 

378 raise exception.InvalidInput(reason=msg) 

379 

380 self.check_is_share_size_within_per_share_quota_limit(context, size) 

381 

382 deltas = {'shares': 1, 'gigabytes': size} 

383 share_type_attributes = self.get_share_attributes_from_share_type( 

384 share_type) 

385 

386 mount_point_name_support = share_type_attributes.get( 

387 constants.ExtraSpecs.MOUNT_POINT_NAME_SUPPORT, None) 

388 if mount_point_name is not None: 

389 if not mount_point_name_support: 389 ↛ 393line 389 didn't jump to line 393 because the condition on line 389 was always true

390 msg = _("Setting a mount point name is not supported" 

391 " by the share type used: %s." % share_type_id) 

392 raise exception.InvalidInput(reason=msg) 

393 mount_point_name = self._prefix_mount_point_name( 

394 share_type, context, mount_point_name 

395 ) 

396 

397 share_type_supports_replication = share_type_attributes.get( 

398 'replication_type', None) 

399 if share_type_supports_replication: 

400 deltas.update( 

401 {'share_replicas': 1, 'replica_gigabytes': size}) 

402 

403 if encryption_key_ref: 

404 # Make sure encryption_key_ref is valid UUID 

405 if not uuidutils.is_uuid_like(encryption_key_ref): 405 ↛ 406line 405 didn't jump to line 406 because the condition on line 405 was never true

406 msg = _('Encryption key ref is not valid UUID') 

407 raise exception.InvalidInput(reason=msg) 

408 

409 # Check if its new encryption_key_ref 

410 filters = { 

411 'encryption_key_ref': encryption_key_ref, 

412 'project_id': context.project_id, 

413 } 

414 is_existing_key = self.db.encryption_keys_get_all( 

415 context, filters=filters) 

416 if not is_existing_key: 416 ↛ 419line 416 didn't jump to line 419 because the condition on line 416 was always true

417 deltas.update({'encryption_keys': 1}) 

418 

419 try: 

420 reservations = QUOTAS.reserve( 

421 context, share_type_id=share_type_id, **deltas) 

422 except exception.OverQuota as e: 

423 self.check_if_share_quotas_exceeded(context, e, size) 

424 self.check_if_encryption_keys_quotas_exceeded(context, e) 

425 if share_type_supports_replication: 425 ↛ 429line 425 didn't jump to line 429 because the condition on line 425 was always true

426 self.check_if_replica_quotas_exceeded(context, e, size, 

427 resource_type='share') 

428 

429 share_group = None 

430 if share_group_id: 

431 try: 

432 share_group = self.db.share_group_get(context, share_group_id) 

433 except exception.NotFound as e: 

434 raise exception.InvalidParameterValue(e.message) 

435 

436 if (not share_group_snapshot_member and 436 ↛ 438line 436 didn't jump to line 438 because the condition on line 436 was never true

437 not (share_group['status'] == constants.STATUS_AVAILABLE)): 

438 params = { 

439 'avail': constants.STATUS_AVAILABLE, 

440 'status': share_group['status'], 

441 } 

442 msg = _("Share group status must be %(avail)s, got " 

443 "%(status)s.") % params 

444 raise exception.InvalidShareGroup(message=msg) 

445 

446 if share_type_id: 446 ↛ 460line 446 didn't jump to line 460 because the condition on line 446 was always true

447 share_group_st_ids = [ 

448 st['share_type_id'] 

449 for st in share_group.get('share_types', [])] 

450 if share_type_id not in share_group_st_ids: 450 ↛ 451line 450 didn't jump to line 451 because the condition on line 450 was never true

451 params = { 

452 'type': share_type_id, 

453 'group': share_group_id, 

454 } 

455 msg = _("The specified share type (%(type)s) is not " 

456 "supported by the specified share group " 

457 "(%(group)s).") % params 

458 raise exception.InvalidParameterValue(msg) 

459 

460 if not share_group.get('share_network_id') == share_network_id: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true

461 params = { 

462 'net': share_network_id, 

463 'group': share_group_id 

464 } 

465 msg = _("The specified share network (%(net)s) is not " 

466 "supported by the specified share group " 

467 "(%(group)s).") % params 

468 raise exception.InvalidParameterValue(msg) 

469 

470 if share_type: 

471 metadata = self.update_metadata_from_share_type_extra_specs( 

472 context, share_type, metadata) 

473 

474 options = { 

475 'size': size, 

476 'user_id': context.user_id, 

477 'project_id': context.project_id, 

478 'snapshot_id': snapshot_id, 

479 'metadata': metadata, 

480 'display_name': name, 

481 'display_description': description, 

482 'share_proto': share_proto, 

483 'is_public': is_public, 

484 'share_group_id': share_group_id, 

485 } 

486 options.update(share_type_attributes) 

487 

488 if share_group_snapshot_member: 

489 options['source_share_group_snapshot_member_id'] = ( 

490 share_group_snapshot_member['id']) 

491 

492 # NOTE(dviroel): If a target availability zone was not provided, the 

493 # scheduler will receive a list with all availability zones that 

494 # contains a subnet within the selected share network. 

495 if share_network_id and not availability_zone: 

496 compatible_azs_name, compatible_azs_multiple = ( 

497 self._get_all_availability_zones_with_subnets( 

498 context, share_network_id)) 

499 if not availability_zones: 

500 availability_zones = compatible_azs_name 

501 else: 

502 availability_zones = ( 

503 [az for az in availability_zones 

504 if az in compatible_azs_name]) 

505 if not availability_zones: 

506 msg = _( 

507 "The share network is not supported within any requested " 

508 "availability zone. Check the share type's " 

509 "'availability_zones' extra-spec and the availability " 

510 "zones of the share network subnets") 

511 raise exception.InvalidInput(message=msg) 

512 if az_request_multiple_subnet_support_map: 

513 az_request_multiple_subnet_support_map.update( 

514 compatible_azs_multiple) 

515 else: 

516 az_request_multiple_subnet_support_map = ( 

517 compatible_azs_multiple) 

518 

519 share = None 

520 try: 

521 share = self.db.share_create(context, options, 

522 create_share_instance=False) 

523 QUOTAS.commit(context, reservations, share_type_id=share_type_id) 

524 except Exception: 

525 with excutils.save_and_reraise_exception(): 

526 try: 

527 if share: 527 ↛ 530line 527 didn't jump to line 530 because the condition on line 527 was always true

528 self.db.share_delete(context, share['id']) 

529 finally: 

530 QUOTAS.rollback( 

531 context, reservations, share_type_id=share_type_id) 

532 

533 self.save_scheduler_hints(context, share, scheduler_hints) 

534 

535 host = None 

536 snapshot_host = None 

537 if snapshot: 

538 snapshot_host = snapshot['share']['instance']['host'] 

539 if not CONF.use_scheduler_creating_share_from_snapshot: 

540 # Shares from snapshots with restriction - source host only. 

541 # It is common situation for different types of backends. 

542 host = snapshot['share']['instance']['host'] 

543 

544 if share_group and host is None: 

545 host = share_group['host'] 

546 

547 self.create_instance( 

548 context, share, share_network_id=share_network_id, host=host, 

549 availability_zone=availability_zone, share_group=share_group, 

550 share_group_snapshot_member=share_group_snapshot_member, 

551 share_type_id=share_type_id, availability_zones=availability_zones, 

552 snapshot_host=snapshot_host, scheduler_hints=scheduler_hints, 

553 az_request_multiple_subnet_support_map=( 

554 az_request_multiple_subnet_support_map), 

555 mount_point_name=mount_point_name, 

556 encryption_key_ref=encryption_key_ref, 

557 ) 

558 

559 # Retrieve the share with instance details 

560 share = self.db.share_get(context, share['id']) 

561 

562 return share 

563 

564 def update_metadata_from_share_type_extra_specs(self, context, share_type, 

565 user_metadata): 

566 extra_specs = share_type.get('extra_specs', {}) 

567 if not extra_specs: 567 ↛ 568line 567 didn't jump to line 568 because the condition on line 567 was never true

568 return user_metadata 

569 

570 driver_keys = getattr(CONF, 'driver_updatable_metadata', []) 

571 if not driver_keys: 

572 return user_metadata 

573 

574 metadata_from_share_type = {} 

575 for k, v in extra_specs.items(): 

576 try: 

577 prefix, metadata_key = k.split(':') 

578 except Exception: 

579 continue 

580 

581 # consider prefix only with valid storage driver 

582 if prefix.lower() == 'provisioning': 

583 continue 

584 

585 if metadata_key in driver_keys: 

586 metadata_from_share_type.update({metadata_key: v}) 

587 

588 metadata_from_share_type.update(user_metadata) 

589 return metadata_from_share_type 

590 

591 def update_share_from_metadata(self, context, share_id, metadata): 

592 driver_keys = getattr(CONF, 'driver_updatable_metadata', []) 

593 if not driver_keys: 593 ↛ 594line 593 didn't jump to line 594 because the condition on line 593 was never true

594 return 

595 

596 driver_metadata = {} 

597 for k, v in metadata.items(): 

598 if k in driver_keys: 

599 driver_metadata.update({k: v}) 

600 

601 if driver_metadata: 601 ↛ exitline 601 didn't return from function 'update_share_from_metadata' because the condition on line 601 was always true

602 share = self.get(context, share_id) 

603 self.share_rpcapi.update_share_from_metadata(context, share, 

604 driver_metadata) 

605 

606 def update_share_network_subnet_from_metadata(self, context, 

607 share_network_id, 

608 share_network_subnet_id, 

609 metadata): 

610 driver_keys = getattr(CONF, 'driver_updatable_subnet_metadata', []) 

611 if not driver_keys: 611 ↛ 612line 611 didn't jump to line 612 because the condition on line 611 was never true

612 return 

613 

614 driver_metadata = {} 

615 for k, v in metadata.items(): 

616 if k in driver_keys: 

617 driver_metadata.update({k: v}) 

618 

619 if driver_metadata: 619 ↛ exitline 619 didn't return from function 'update_share_network_subnet_from_metadata' because the condition on line 619 was always true

620 share_servers = ( 

621 self.db.share_server_get_all_by_host_and_or_share_subnet( 

622 context, 

623 host=None, 

624 share_subnet_id=share_network_subnet_id)) 

625 for share_server in share_servers: 

626 self.share_rpcapi.update_share_network_subnet_from_metadata( 

627 context, 

628 share_network_id, 

629 share_network_subnet_id, 

630 share_server, 

631 driver_metadata 

632 ) 

633 

634 def get_share_attributes_from_share_type(self, share_type): 

635 """Determine share attributes from the share type. 

636 

637 The share type can change any time after shares of that type are 

638 created, so we copy some share type attributes to the share to 

639 consistently govern the behavior of that share over its lifespan. 

640 """ 

641 

642 inferred_map = constants.ExtraSpecs.INFERRED_OPTIONAL_MAP 

643 snapshot_support_key = constants.ExtraSpecs.SNAPSHOT_SUPPORT 

644 create_share_from_snapshot_key = ( 

645 constants.ExtraSpecs.CREATE_SHARE_FROM_SNAPSHOT_SUPPORT) 

646 revert_to_snapshot_key = ( 

647 constants.ExtraSpecs.REVERT_TO_SNAPSHOT_SUPPORT) 

648 mount_snapshot_support_key = ( 

649 constants.ExtraSpecs.MOUNT_SNAPSHOT_SUPPORT) 

650 mount_point_name_support_key = ( 

651 constants.ExtraSpecs.MOUNT_POINT_NAME_SUPPORT 

652 ) 

653 

654 snapshot_support_default = inferred_map.get(snapshot_support_key) 

655 create_share_from_snapshot_support_default = inferred_map.get( 

656 create_share_from_snapshot_key) 

657 revert_to_snapshot_support_default = inferred_map.get( 

658 revert_to_snapshot_key) 

659 mount_snapshot_support_default = inferred_map.get( 

660 constants.ExtraSpecs.MOUNT_SNAPSHOT_SUPPORT) 

661 mount_point_name_support_default = False 

662 

663 if share_type: 

664 snapshot_support = share_types.parse_boolean_extra_spec( 

665 snapshot_support_key, 

666 share_type.get('extra_specs', {}).get( 

667 snapshot_support_key, snapshot_support_default)) 

668 create_share_from_snapshot_support = ( 

669 share_types.parse_boolean_extra_spec( 

670 create_share_from_snapshot_key, 

671 share_type.get('extra_specs', {}).get( 

672 create_share_from_snapshot_key, 

673 create_share_from_snapshot_support_default))) 

674 revert_to_snapshot_support = ( 

675 share_types.parse_boolean_extra_spec( 

676 revert_to_snapshot_key, 

677 share_type.get('extra_specs', {}).get( 

678 revert_to_snapshot_key, 

679 revert_to_snapshot_support_default))) 

680 mount_snapshot_support = share_types.parse_boolean_extra_spec( 

681 mount_snapshot_support_key, share_type.get( 

682 'extra_specs', {}).get( 

683 mount_snapshot_support_key, 

684 mount_snapshot_support_default)) 

685 mount_point_name_support = share_types.parse_boolean_extra_spec( 

686 mount_point_name_support_key, share_type.get( 

687 'extra_specs', {}).get( 

688 mount_point_name_support_key, 

689 mount_point_name_support_default)) 

690 replication_type = share_type.get('extra_specs', {}).get( 

691 'replication_type') 

692 else: 

693 snapshot_support = snapshot_support_default 

694 create_share_from_snapshot_support = ( 

695 create_share_from_snapshot_support_default) 

696 revert_to_snapshot_support = revert_to_snapshot_support_default 

697 mount_snapshot_support = mount_snapshot_support_default 

698 mount_point_name_support = mount_point_name_support_default 

699 replication_type = None 

700 

701 return { 

702 'snapshot_support': snapshot_support, 

703 'create_share_from_snapshot_support': 

704 create_share_from_snapshot_support, 

705 'revert_to_snapshot_support': revert_to_snapshot_support, 

706 'replication_type': replication_type, 

707 'mount_snapshot_support': mount_snapshot_support, 

708 'mount_point_name_support': mount_point_name_support, 

709 } 

710 

711 def create_instance(self, context, share, share_network_id=None, 

712 host=None, availability_zone=None, 

713 share_group=None, share_group_snapshot_member=None, 

714 share_type_id=None, availability_zones=None, 

715 snapshot_host=None, scheduler_hints=None, 

716 az_request_multiple_subnet_support_map=None, 

717 mount_point_name=None, encryption_key_ref=None): 

718 request_spec, share_instance = ( 

719 self.create_share_instance_and_get_request_spec( 

720 context, share, availability_zone=availability_zone, 

721 share_group=share_group, host=host, 

722 share_network_id=share_network_id, 

723 share_type_id=share_type_id, 

724 availability_zones=availability_zones, 

725 snapshot_host=snapshot_host, 

726 az_request_multiple_subnet_support_map=( 

727 az_request_multiple_subnet_support_map), 

728 mount_point_name=mount_point_name, 

729 encryption_key_ref=encryption_key_ref)) 

730 

731 if share_group_snapshot_member: 

732 # Inherit properties from the share_group_snapshot_member 

733 member_share_instance = share_group_snapshot_member[ 

734 'share_instance'] 

735 updates = { 

736 'host': member_share_instance['host'], 

737 'share_network_id': member_share_instance['share_network_id'], 

738 'share_server_id': member_share_instance['share_server_id'], 

739 } 

740 share = self.db.share_instance_update(context, 

741 share_instance['id'], 

742 updates) 

743 # NOTE(ameade): Do not cast to driver if creating from share group 

744 # snapshot 

745 return 

746 

747 if host: 

748 self.share_rpcapi.create_share_instance( 

749 context, 

750 share_instance, 

751 host, 

752 request_spec=request_spec, 

753 filter_properties={'scheduler_hints': scheduler_hints}, 

754 snapshot_id=share['snapshot_id'], 

755 ) 

756 else: 

757 # Create share instance from scratch or from snapshot could happen 

758 # on hosts other than the source host. 

759 self.scheduler_rpcapi.create_share_instance( 

760 context, 

761 request_spec=request_spec, 

762 filter_properties={'scheduler_hints': scheduler_hints}, 

763 ) 

764 

765 return share_instance 

766 

767 def create_share_instance_and_get_request_spec( 

768 self, context, share, availability_zone=None, 

769 share_group=None, host=None, share_network_id=None, 

770 share_type_id=None, cast_rules_to_readonly=False, 

771 availability_zones=None, snapshot_host=None, 

772 az_request_multiple_subnet_support_map=None, 

773 mount_point_name=None, encryption_key_ref=None): 

774 

775 availability_zone_id = None 

776 if availability_zone: 

777 availability_zone_id = self.db.availability_zone_get( 

778 context, availability_zone).id 

779 

780 # TODO(u_glide): Add here validation that provided share network 

781 # doesn't conflict with provided availability_zone when Neutron 

782 # will have AZ support. 

783 share_instance = self.db.share_instance_create( 

784 context, share['id'], 

785 { 

786 'share_network_id': share_network_id, 

787 'status': constants.STATUS_CREATING, 

788 'scheduled_at': timeutils.utcnow(), 

789 'host': host if host else '', 

790 'availability_zone_id': availability_zone_id, 

791 'share_type_id': share_type_id, 

792 'cast_rules_to_readonly': cast_rules_to_readonly, 

793 'mount_point_name': mount_point_name, 

794 'encryption_key_ref': encryption_key_ref, 

795 } 

796 ) 

797 

798 share_properties = { 

799 'id': share['id'], 

800 'size': share['size'], 

801 'user_id': share['user_id'], 

802 'project_id': share['project_id'], 

803 'metadata': self.db.share_metadata_get(context, share['id']), 

804 'share_server_id': share_instance['share_server_id'], 

805 'snapshot_support': share['snapshot_support'], 

806 'create_share_from_snapshot_support': 

807 share['create_share_from_snapshot_support'], 

808 'revert_to_snapshot_support': share['revert_to_snapshot_support'], 

809 'mount_snapshot_support': share['mount_snapshot_support'], 

810 'share_proto': share['share_proto'], 

811 'share_type_id': share_type_id, 

812 'is_public': share['is_public'], 

813 'share_group_id': share['share_group_id'], 

814 'source_share_group_snapshot_member_id': share[ 

815 'source_share_group_snapshot_member_id'], 

816 'snapshot_id': share['snapshot_id'], 

817 'replication_type': share['replication_type'], 

818 } 

819 share_instance_properties = { 

820 'id': share_instance['id'], 

821 'availability_zone_id': share_instance['availability_zone_id'], 

822 'share_network_id': share_instance['share_network_id'], 

823 'share_server_id': share_instance['share_server_id'], 

824 'share_id': share_instance['share_id'], 

825 'host': share_instance['host'], 

826 'status': share_instance['status'], 

827 'replica_state': share_instance['replica_state'], 

828 'share_type_id': share_instance['share_type_id'], 

829 'encryption_key_ref': share_instance['encryption_key_ref'], 

830 } 

831 

832 share_type = None 

833 if share_instance['share_type_id']: 

834 share_type = self.db.share_type_get( 

835 context, share_instance['share_type_id']) 

836 

837 request_spec = { 

838 'share_properties': share_properties, 

839 'share_instance_properties': share_instance_properties, 

840 'share_proto': share['share_proto'], 

841 'share_id': share['id'], 

842 'snapshot_id': share['snapshot_id'], 

843 'snapshot_host': snapshot_host, 

844 'share_type': share_type, 

845 'share_group': share_group, 

846 'availability_zone_id': availability_zone_id, 

847 'availability_zones': availability_zones, 

848 'az_request_multiple_subnet_support_map': ( 

849 az_request_multiple_subnet_support_map) 

850 } 

851 return request_spec, share_instance 

852 

853 def create_share_replica(self, context, share, availability_zone=None, 

854 share_network_id=None, scheduler_hints=None): 

855 

856 parent_share_network_id = share.get('share_network_id') 

857 if (parent_share_network_id and share_network_id and 857 ↛ 859line 857 didn't jump to line 859 because the condition on line 857 was never true

858 parent_share_network_id != share_network_id): 

859 parent_security_services = ( 

860 self.db.security_service_get_all_by_share_network( 

861 context, parent_share_network_id)) 

862 security_services = ( 

863 self.db.security_service_get_all_by_share_network( 

864 context, share_network_id)) 

865 parent_ss = set([s['id'] for s in parent_security_services]) 

866 ss = set([s['id'] for s in security_services]) 

867 if ss != parent_ss: 

868 msg = _("Share and its replica can't be in " 

869 "different authentication domains.") 

870 raise exception.InvalidInput(reason=msg) 

871 

872 if not share.get('replication_type'): 

873 msg = _("Replication not supported for share %s.") 

874 raise exception.InvalidShare(message=msg % share['id']) 

875 

876 if share.get('share_group_id'): 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true

877 msg = _("Replication not supported for shares in a group.") 

878 raise exception.InvalidShare(message=msg) 

879 

880 if scheduler_hints: 880 ↛ 881line 880 didn't jump to line 881 because the condition on line 880 was never true

881 if ('only_host' not in scheduler_hints.keys() or len( 

882 scheduler_hints) > 1): 

883 msg = _("Arg 'scheduler_hints' supports only 'only_host' key.") 

884 raise exception.InvalidInput(reason=msg) 

885 

886 self._check_is_share_busy(share) 

887 

888 active_replica = self.db.share_replicas_get_available_active_replica( 

889 context, share['id']) 

890 

891 if not active_replica: 

892 msg = _("Share %s does not have any active replica in available " 

893 "state.") 

894 raise exception.ReplicationException(reason=msg % share['id']) 

895 

896 share_type = share_types.get_share_type( 

897 context, share.instance['share_type_id']) 

898 type_azs = share_type['extra_specs'].get('availability_zones', '') 

899 type_azs = [t for t in type_azs.split(',') if type_azs] 

900 if (availability_zone and type_azs and 

901 availability_zone not in type_azs): 

902 msg = _("Share replica cannot be created since the share type " 

903 "%(type)s is not supported within the availability zone " 

904 "chosen %(az)s.") 

905 type_name = '%s' % (share_type['name'] or '') 

906 type_id = '(ID: %s)' % share_type['id'] 

907 payload = {'type': '%s%s' % (type_name, type_id), 

908 'az': availability_zone} 

909 raise exception.InvalidShare(message=msg % payload) 

910 

911 try: 

912 reservations = QUOTAS.reserve( 

913 context, share_replicas=1, replica_gigabytes=share['size'], 

914 share_type_id=share_type['id'] 

915 ) 

916 except exception.OverQuota as e: 

917 self.check_if_replica_quotas_exceeded(context, e, share['size']) 

918 

919 az_request_multiple_subnet_support_map = {} 

920 if share_network_id: 

921 if availability_zone: 

922 try: 

923 az = self.db.availability_zone_get(context, 

924 availability_zone) 

925 except exception.AvailabilityZoneNotFound: 

926 msg = _("Share replica cannot be created because the " 

927 "specified availability zone does not exist.") 

928 raise exception.InvalidInput(message=msg) 

929 az_id = az.get('id') 

930 subnets = ( 

931 self.db. 

932 share_network_subnets_get_all_by_availability_zone_id( 

933 context, share_network_id, az_id)) 

934 if not subnets: 

935 msg = _("Share replica cannot be created because the " 

936 "share network is not available within the " 

937 "specified availability zone.") 

938 raise exception.InvalidShare(message=msg) 

939 az_request_multiple_subnet_support_map[az_id] = ( 

940 len(subnets) > 1) 

941 else: 

942 # NOTE(dviroel): If a target availability zone was not 

943 # provided, the scheduler will receive a list with all 

944 # availability zones that contains subnets within the 

945 # selected share network. 

946 compatible_azs_name, compatible_azs_multiple = ( 

947 self._get_all_availability_zones_with_subnets( 

948 context, share_network_id)) 

949 if not type_azs: 

950 type_azs = compatible_azs_name 

951 else: 

952 type_azs = ( 

953 [az for az in type_azs if az in compatible_azs_name]) 

954 if not type_azs: 

955 msg = _( 

956 "The share network is not supported within any " 

957 "requested availability zone. Check the share type's " 

958 "'availability_zones' extra-spec and the availability " 

959 "zones of the share network subnets") 

960 raise exception.InvalidInput(message=msg) 

961 az_request_multiple_subnet_support_map.update( 

962 compatible_azs_multiple) 

963 

964 if share['replication_type'] == constants.REPLICATION_TYPE_READABLE: 

965 cast_rules_to_readonly = True 

966 else: 

967 cast_rules_to_readonly = False 

968 

969 share_replica = None 

970 try: 

971 request_spec, share_replica = ( 

972 self.create_share_instance_and_get_request_spec( 

973 context, share, availability_zone=availability_zone, 

974 share_network_id=share_network_id, 

975 share_type_id=share['instance']['share_type_id'], 

976 cast_rules_to_readonly=cast_rules_to_readonly, 

977 availability_zones=type_azs, 

978 az_request_multiple_subnet_support_map=( 

979 az_request_multiple_subnet_support_map)) 

980 ) 

981 QUOTAS.commit( 

982 context, reservations, project_id=share['project_id'], 

983 share_type_id=share_type['id'], 

984 ) 

985 except Exception: 

986 with excutils.save_and_reraise_exception(): 

987 try: 

988 if share_replica: 988 ↛ 993line 988 didn't jump to line 993 because the condition on line 988 was always true

989 self.db.share_replica_delete( 

990 context, share_replica['id'], 

991 need_to_update_usages=False) 

992 finally: 

993 QUOTAS.rollback( 

994 context, reservations, share_type_id=share_type['id']) 

995 

996 all_replicas = self.db.share_replicas_get_all_by_share( 

997 context, share['id']) 

998 all_hosts = [r['host'] for r in all_replicas] 

999 

1000 request_spec['active_replica_host'] = active_replica['host'] 

1001 request_spec['all_replica_hosts'] = ','.join(all_hosts) 

1002 

1003 self.db.share_replica_update( 

1004 context, share_replica['id'], 

1005 {'replica_state': constants.REPLICA_STATE_OUT_OF_SYNC}) 

1006 

1007 existing_snapshots = ( 

1008 self.db.share_snapshot_get_all_for_share( 

1009 context, share_replica['share_id']) 

1010 ) 

1011 snapshot_instance = { 

1012 'status': constants.STATUS_CREATING, 

1013 'progress': '0%', 

1014 'share_instance_id': share_replica['id'], 

1015 } 

1016 for snapshot in existing_snapshots: 

1017 self.db.share_snapshot_instance_create( 

1018 context, snapshot['id'], snapshot_instance) 

1019 

1020 self.scheduler_rpcapi.create_share_replica( 

1021 context, request_spec=request_spec, 

1022 filter_properties={'scheduler_hints': scheduler_hints}) 

1023 

1024 return share_replica 

1025 

1026 def delete_share_replica(self, context, share_replica, force=False): 

1027 # Disallow deletion of ONLY active replica, *even* when this 

1028 # operation is forced. 

1029 replicas = self.db.share_replicas_get_all_by_share( 

1030 context, share_replica['share_id']) 

1031 active_replicas = list(filter( 

1032 lambda x: x['replica_state'] == constants.REPLICA_STATE_ACTIVE, 

1033 replicas)) 

1034 if (share_replica.get('replica_state') == 

1035 constants.REPLICA_STATE_ACTIVE and len(active_replicas) == 1): 

1036 msg = _("Cannot delete last active replica.") 

1037 raise exception.ReplicationException(reason=msg) 

1038 

1039 LOG.info("Deleting replica %s.", share_replica['id']) 

1040 

1041 self.db.share_replica_update( 

1042 context, share_replica['id'], 

1043 { 

1044 'status': constants.STATUS_DELETING, 

1045 'terminated_at': timeutils.utcnow(), 

1046 } 

1047 ) 

1048 

1049 if not share_replica['host']: 

1050 # Delete any snapshot instances created on the database 

1051 replica_snapshots = ( 

1052 self.db.share_snapshot_instance_get_all_with_filters( 

1053 context, {'share_instance_ids': share_replica['id']}) 

1054 ) 

1055 for snapshot in replica_snapshots: 

1056 self.db.share_snapshot_instance_delete(context, snapshot['id']) 

1057 

1058 # Delete the replica from the database 

1059 self.db.share_replica_delete(context, share_replica['id']) 

1060 else: 

1061 

1062 self.share_rpcapi.delete_share_replica(context, 

1063 share_replica, 

1064 force=force) 

1065 

1066 def promote_share_replica(self, context, share_replica, 

1067 quiesce_wait_time=None): 

1068 

1069 if share_replica.get('status') != constants.STATUS_AVAILABLE: 

1070 msg = _("Replica %(replica_id)s must be in %(status)s state to be " 

1071 "promoted.") 

1072 raise exception.ReplicationException( 

1073 reason=msg % {'replica_id': share_replica['id'], 

1074 'status': constants.STATUS_AVAILABLE}) 

1075 

1076 replica_state = share_replica['replica_state'] 

1077 

1078 if (replica_state in (constants.REPLICA_STATE_OUT_OF_SYNC, 

1079 constants.STATUS_ERROR) 

1080 and not context.is_admin): 

1081 msg = _("Promoting a replica with 'replica_state': %s requires " 

1082 "administrator privileges.") 

1083 raise exception.AdminRequired( 

1084 message=msg % replica_state) 

1085 

1086 self.db.share_replica_update( 

1087 context, share_replica['id'], 

1088 {'status': constants.STATUS_REPLICATION_CHANGE}) 

1089 

1090 self.share_rpcapi.promote_share_replica( 

1091 context, share_replica, 

1092 quiesce_wait_time=quiesce_wait_time) 

1093 

1094 return self.db.share_replica_get(context, share_replica['id']) 

1095 

1096 def update_share_replica(self, context, share_replica): 

1097 

1098 if not share_replica['host']: 

1099 msg = _("Share replica does not have a valid host.") 

1100 raise exception.InvalidHost(reason=msg) 

1101 

1102 self.share_rpcapi.update_share_replica(context, share_replica) 

1103 

1104 def manage(self, context, share_data, driver_options): 

1105 

1106 # Check whether there's a share already with the provided options: 

1107 filters = { 

1108 'export_location_path': share_data['export_location_path'], 

1109 'host': share_data['host'], 

1110 } 

1111 share_server_id = share_data.get('share_server_id') 

1112 if share_server_id: 

1113 filters['share_server_id'] = share_data['share_server_id'] 

1114 

1115 already_managed = self.db.share_instance_get_all( 

1116 context, filters=filters) 

1117 

1118 if already_managed: 

1119 LOG.error("Found an existing share with export location %s!", 

1120 share_data['export_location_path']) 

1121 msg = _("A share already exists with the export path specified.") 

1122 raise exception.InvalidShare(reason=msg) 

1123 

1124 share_type_id = share_data['share_type_id'] 

1125 share_type = share_types.get_share_type(context, share_type_id) 

1126 

1127 dhss = share_types.parse_boolean_extra_spec( 

1128 'driver_handles_share_servers', 

1129 share_type['extra_specs']['driver_handles_share_servers']) 

1130 

1131 if dhss and not share_server_id: 

1132 msg = _("Share Server ID parameter is required when managing a " 

1133 "share using a share type with " 

1134 "driver_handles_share_servers extra-spec set to True.") 

1135 raise exception.InvalidInput(reason=msg) 

1136 if not dhss and share_server_id: 

1137 msg = _("Share Server ID parameter is not expected when managing a" 

1138 " share using a share type with " 

1139 "driver_handles_share_servers extra-spec set to False.") 

1140 raise exception.InvalidInput(reason=msg) 

1141 

1142 if share_server_id: 

1143 try: 

1144 share_server = self.db.share_server_get( 

1145 context, share_data['share_server_id']) 

1146 except exception.ShareServerNotFound: 

1147 msg = _("Share Server specified was not found.") 

1148 raise exception.InvalidInput(reason=msg) 

1149 

1150 if share_server['status'] != constants.STATUS_ACTIVE: 

1151 msg = _("The provided share server is not active.") 

1152 raise exception.InvalidShareServer(reason=msg) 

1153 share_data['share_network_id'] = ( 

1154 share_server['share_network_id']) 

1155 

1156 try: 

1157 share_network = self.db.share_network_get( 

1158 context, share_data['share_network_id']) 

1159 except exception.ShareNetworkNotFound: 

1160 msg = _("Share network %s was not found." 

1161 ) % share_data['share_network_id'] 

1162 raise exception.InvalidInput(reason=msg) 

1163 # Check if share network is active, otherwise raise a BadRequest 

1164 api_common.check_share_network_is_active(share_network) 

1165 

1166 share_data.update({ 

1167 'user_id': context.user_id, 

1168 'project_id': context.project_id, 

1169 'status': constants.STATUS_MANAGING, 

1170 'scheduled_at': timeutils.utcnow(), 

1171 }) 

1172 share_data.update( 

1173 self.get_share_attributes_from_share_type(share_type)) 

1174 

1175 share = self.db.share_create(context, share_data) 

1176 

1177 export_location_path = share_data.pop('export_location_path') 

1178 self.db.export_locations_update( 

1179 context, share.instance['id'], export_location_path) 

1180 

1181 request_spec = self._get_request_spec_dict( 

1182 context, share, share_type, size=0, 

1183 share_proto=share_data['share_proto'], 

1184 host=share_data['host']) 

1185 

1186 # NOTE(ganso): Scheduler is called to validate if share type 

1187 # provided can fit in host provided. It will invoke manage upon 

1188 # successful validation. 

1189 self.scheduler_rpcapi.manage_share(context, share['id'], 

1190 driver_options, request_spec) 

1191 

1192 return self.db.share_get(context, share['id']) 

1193 

1194 def _get_request_spec_dict(self, context, share, share_type, **kwargs): 

1195 

1196 if share is None: 1196 ↛ 1197line 1196 didn't jump to line 1197 because the condition on line 1196 was never true

1197 share = {'instance': {}} 

1198 

1199 # NOTE(dviroel): The share object can be a share instance object with 

1200 # share data. 

1201 share_instance = share.get('instance', share) 

1202 

1203 share_properties = { 

1204 'size': kwargs.get('size', share.get('size')), 

1205 'user_id': kwargs.get('user_id', share.get('user_id')), 

1206 'project_id': kwargs.get('project_id', share.get('project_id')), 

1207 'metadata': self.db.share_metadata_get( 

1208 context, share_instance.get('share_id')), 

1209 'snapshot_support': kwargs.get( 

1210 'snapshot_support', 

1211 share_type.get('extra_specs', {}).get('snapshot_support') 

1212 ), 

1213 'create_share_from_snapshot_support': kwargs.get( 

1214 'create_share_from_snapshot_support', 

1215 share_type.get('extra_specs', {}).get( 

1216 'create_share_from_snapshot_support') 

1217 ), 

1218 'revert_to_snapshot_support': kwargs.get( 

1219 'revert_to_snapshot_support', 

1220 share_type.get('extra_specs', {}).get( 

1221 'revert_to_snapshot_support') 

1222 ), 

1223 'mount_snapshot_support': kwargs.get( 

1224 'mount_snapshot_support', 

1225 share_type.get('extra_specs', {}).get( 

1226 'mount_snapshot_support') 

1227 ), 

1228 'mount_point_name_support': kwargs.get( 

1229 'mount_point_name_support', 

1230 share_type.get('extra_specs', {}).get( 

1231 'mount_point_name_support') 

1232 ), 

1233 'share_proto': kwargs.get('share_proto', share.get('share_proto')), 

1234 'share_type_id': share_type['id'], 

1235 'is_public': kwargs.get('is_public', share.get('is_public')), 

1236 'share_group_id': kwargs.get( 

1237 'share_group_id', share.get('share_group_id')), 

1238 'source_share_group_snapshot_member_id': kwargs.get( 

1239 'source_share_group_snapshot_member_id', 

1240 share.get('source_share_group_snapshot_member_id')), 

1241 'snapshot_id': kwargs.get('snapshot_id', share.get('snapshot_id')), 

1242 } 

1243 share_instance_properties = { 

1244 'availability_zone_id': kwargs.get( 

1245 'availability_zone_id', 

1246 share_instance.get('availability_zone_id')), 

1247 'share_network_id': kwargs.get( 

1248 'share_network_id', share_instance.get('share_network_id')), 

1249 'share_server_id': kwargs.get( 

1250 'share_server_id', share_instance.get('share_server_id')), 

1251 'share_id': kwargs.get('share_id', share_instance.get('share_id')), 

1252 'host': kwargs.get('host', share_instance.get('host')), 

1253 'status': kwargs.get('status', share_instance.get('status')), 

1254 } 

1255 

1256 request_spec = { 

1257 'share_properties': share_properties, 

1258 'share_instance_properties': share_instance_properties, 

1259 'share_type': share_type, 

1260 'share_id': share.get('id'), 

1261 } 

1262 return request_spec 

1263 

1264 def _prefix_mount_point_name(self, share_type, context, 

1265 mount_point_name=None): 

1266 prefix = share_type.get('extra_specs').get( 

1267 constants.ExtraSpecs.PROVISIONING_MOUNT_POINT_PREFIX) 

1268 if prefix is None: 

1269 prefix = CONF.default_mount_point_prefix 

1270 

1271 mount_point_name_template = f"{prefix}{mount_point_name}" 

1272 mount_point_name = mount_point_name_template.format( 

1273 **context.to_dict()) 

1274 

1275 if mount_point_name and ( 

1276 not re.match( 

1277 r'^[a-zA-Z0-9_-]*$', mount_point_name) 

1278 or len(mount_point_name) > 255 

1279 ): 

1280 msg = _("Invalid mount_point_name: %s") 

1281 LOG.error(msg, mount_point_name) 

1282 raise exception.InvalidInput(msg % mount_point_name) 

1283 

1284 return mount_point_name 

1285 

1286 @prevent_locked_action_on_share('delete') 

1287 def unmanage(self, context, share): 

1288 policy.check_policy(context, 'share', 'unmanage') 

1289 

1290 self._check_is_share_busy(share) 

1291 

1292 if share['status'] == constants.STATUS_MANAGE_ERROR: 1292 ↛ 1293line 1292 didn't jump to line 1293 because the condition on line 1292 was never true

1293 update_status = constants.STATUS_MANAGE_ERROR_UNMANAGING 

1294 else: 

1295 update_status = constants.STATUS_UNMANAGING 

1296 

1297 update_data = {'status': update_status, 

1298 'terminated_at': timeutils.utcnow()} 

1299 share_ref = self.db.share_update(context, share['id'], update_data) 

1300 

1301 self.delete_scheduler_hints(context, share) 

1302 self.share_rpcapi.unmanage_share(context, share_ref) 

1303 

1304 # NOTE(u_glide): We should update 'updated_at' timestamp of 

1305 # share server here, when manage/unmanage operations will be supported 

1306 # for driver_handles_share_servers=True mode 

1307 

1308 def manage_snapshot(self, context, snapshot_data, driver_options, 

1309 share=None): 

1310 if not share: 1310 ↛ 1317line 1310 didn't jump to line 1317 because the condition on line 1310 was always true

1311 try: 

1312 share = self.db.share_get(context, snapshot_data['share_id']) 

1313 except exception.NotFound: 

1314 raise exception.ShareNotFound( 

1315 share_id=snapshot_data['share_id']) 

1316 

1317 if share['has_replicas']: 

1318 msg = (_("Share %s has replicas. Snapshots of this share cannot " 

1319 "currently be managed until all replicas are removed.") 

1320 % share['id']) 

1321 raise exception.InvalidShare(reason=msg) 

1322 

1323 existing_snapshots = self.db.share_snapshot_get_all_for_share( 

1324 context, snapshot_data['share_id']) 

1325 

1326 for existing_snap in existing_snapshots: 

1327 for inst in existing_snap.get('instances'): 

1328 if (snapshot_data['provider_location'] == 

1329 inst['provider_location']): 

1330 msg = _("A share snapshot %(share_snapshot_id)s is " 

1331 "already managed for provider location " 

1332 "%(provider_location)s.") % { 

1333 'share_snapshot_id': existing_snap['id'], 

1334 'provider_location': 

1335 snapshot_data['provider_location'], 

1336 } 

1337 raise exception.ManageInvalidShareSnapshot( 

1338 reason=msg) 

1339 

1340 snapshot_data.update({ 

1341 'user_id': context.user_id, 

1342 'project_id': context.project_id, 

1343 'status': constants.STATUS_MANAGING, 

1344 'share_size': share['size'], 

1345 'progress': '0%', 

1346 'share_proto': share['share_proto'] 

1347 }) 

1348 

1349 snapshot = self.db.share_snapshot_create(context, snapshot_data) 

1350 

1351 self.share_rpcapi.manage_snapshot(context, snapshot, share['host'], 

1352 driver_options) 

1353 return snapshot 

1354 

1355 def unmanage_snapshot(self, context, snapshot, host): 

1356 update_data = {'status': constants.STATUS_UNMANAGING, 

1357 'terminated_at': timeutils.utcnow()} 

1358 snapshot_ref = self.db.share_snapshot_update(context, 

1359 snapshot['id'], 

1360 update_data) 

1361 

1362 self.share_rpcapi.unmanage_snapshot(context, snapshot_ref, host) 

1363 

1364 def revert_to_snapshot(self, context, share, snapshot): 

1365 """Revert a share to a snapshot.""" 

1366 

1367 reservations = self._handle_revert_to_snapshot_quotas( 

1368 context, share, snapshot) 

1369 

1370 try: 

1371 if share.get('has_replicas'): 

1372 self._revert_to_replicated_snapshot( 

1373 context, share, snapshot, reservations) 

1374 else: 

1375 self._revert_to_snapshot( 

1376 context, share, snapshot, reservations) 

1377 except Exception: 

1378 with excutils.save_and_reraise_exception(): 

1379 if reservations: 

1380 QUOTAS.rollback( 

1381 context, reservations, 

1382 share_type_id=share['instance']['share_type_id']) 

1383 

1384 def _handle_revert_to_snapshot_quotas(self, context, share, snapshot): 

1385 """Reserve extra quota if a revert will result in a larger share.""" 

1386 

1387 # Note(cknight): This value may be positive or negative. 

1388 size_increase = snapshot['size'] - share['size'] 

1389 if not size_increase: 

1390 return None 

1391 

1392 try: 

1393 return QUOTAS.reserve( 

1394 context, 

1395 project_id=share['project_id'], 

1396 gigabytes=size_increase, 

1397 user_id=share['user_id'], 

1398 share_type_id=share['instance']['share_type_id']) 

1399 except exception.OverQuota as exc: 

1400 usages = exc.kwargs['usages'] 

1401 quotas = exc.kwargs['quotas'] 

1402 consumed_gb = (usages['gigabytes']['reserved'] + 

1403 usages['gigabytes']['in_use']) 

1404 

1405 msg = _("Quota exceeded for %(s_pid)s. Reverting share " 

1406 "%(s_sid)s to snapshot %(s_ssid)s will increase the " 

1407 "share's size by %(s_size)sG, " 

1408 "(%(d_consumed)dG of %(d_quota)dG already consumed).") 

1409 msg_args = { 

1410 's_pid': context.project_id, 

1411 's_sid': share['id'], 

1412 's_ssid': snapshot['id'], 

1413 's_size': size_increase, 

1414 'd_consumed': consumed_gb, 

1415 'd_quota': quotas['gigabytes'], 

1416 } 

1417 message = msg % msg_args 

1418 LOG.error(message) 

1419 raise exception.ShareSizeExceedsAvailableQuota(message=message) 

1420 

1421 def _revert_to_snapshot(self, context, share, snapshot, reservations): 

1422 """Revert a non-replicated share to a snapshot.""" 

1423 

1424 # Set status of share to 'reverting' 

1425 self.db.share_update( 

1426 context, snapshot['share_id'], 

1427 {'status': constants.STATUS_REVERTING}) 

1428 

1429 # Set status of snapshot to 'restoring' 

1430 self.db.share_snapshot_update( 

1431 context, snapshot['id'], 

1432 {'status': constants.STATUS_RESTORING}) 

1433 

1434 # Send revert API to share host 

1435 self.share_rpcapi.revert_to_snapshot( 

1436 context, share, snapshot, share['instance']['host'], reservations) 

1437 

1438 def _revert_to_replicated_snapshot(self, context, share, snapshot, 

1439 reservations): 

1440 """Revert a replicated share to a snapshot.""" 

1441 

1442 # Get active replica 

1443 active_replica = self.db.share_replicas_get_available_active_replica( 

1444 context, share['id']) 

1445 

1446 if not active_replica: 

1447 msg = _('Share %s has no active replica in available state.') 

1448 raise exception.ReplicationException(reason=msg % share['id']) 

1449 

1450 # Get snapshot instance on active replica 

1451 snapshot_instance_filters = { 

1452 'share_instance_ids': active_replica['id'], 

1453 'snapshot_ids': snapshot['id'], 

1454 } 

1455 snapshot_instances = ( 

1456 self.db.share_snapshot_instance_get_all_with_filters( 

1457 context, snapshot_instance_filters)) 

1458 active_snapshot_instance = ( 

1459 snapshot_instances[0] if snapshot_instances else None) 

1460 

1461 if not active_snapshot_instance: 

1462 msg = _('Share %(share)s has no snapshot %(snap)s associated with ' 

1463 'its active replica.') 

1464 msg_args = {'share': share['id'], 'snap': snapshot['id']} 

1465 raise exception.ReplicationException(reason=msg % msg_args) 

1466 

1467 # Set active replica to 'reverting' 

1468 self.db.share_replica_update( 

1469 context, active_replica['id'], 

1470 {'status': constants.STATUS_REVERTING}) 

1471 

1472 # Set snapshot instance on active replica to 'restoring' 

1473 self.db.share_snapshot_instance_update( 

1474 context, active_snapshot_instance['id'], 

1475 {'status': constants.STATUS_RESTORING}) 

1476 

1477 # Send revert API to active replica host 

1478 self.share_rpcapi.revert_to_snapshot( 

1479 context, share, snapshot, active_replica['host'], reservations) 

1480 

1481 @prevent_locked_action_on_share('delete') 

1482 def soft_delete(self, context, share): 

1483 """Soft delete share.""" 

1484 share_id = share['id'] 

1485 

1486 if share['is_soft_deleted']: 

1487 msg = _("The share has been soft deleted already") 

1488 raise exception.InvalidShare(reason=msg) 

1489 

1490 statuses = (constants.STATUS_AVAILABLE, constants.STATUS_ERROR, 

1491 constants.STATUS_INACTIVE) 

1492 if share['status'] not in statuses: 

1493 msg = _("Share status must be one of %(statuses)s") % { 

1494 "statuses": statuses} 

1495 raise exception.InvalidShare(reason=msg) 

1496 

1497 # If the share has more than one replica, 

1498 # it can't be soft deleted until the additional replicas are removed. 

1499 if share.has_replicas: 

1500 msg = _("Share %s has replicas. Remove the replicas before " 

1501 "soft deleting the share.") % share_id 

1502 raise exception.Conflict(err=msg) 

1503 

1504 snapshots = self.db.share_snapshot_get_all_for_share(context, share_id) 

1505 if len(snapshots): 

1506 msg = _("Share still has %d dependent snapshots.") % len(snapshots) 

1507 raise exception.InvalidShare(reason=msg) 

1508 

1509 filters = dict(share_id=share_id) 

1510 backups = self.db.share_backups_get_all(context, filters=filters) 

1511 if len(backups): 1511 ↛ 1512line 1511 didn't jump to line 1512 because the condition on line 1511 was never true

1512 msg = _("Share still has %d dependent backups.") % len(backups) 

1513 raise exception.InvalidShare(reason=msg) 

1514 

1515 share_group_snapshot_members_count = ( 

1516 self.db.count_share_group_snapshot_members_in_share( 

1517 context, share_id, include_deferred_deleting=False)) 

1518 if share_group_snapshot_members_count: 

1519 msg = ( 

1520 _("Share still has %d dependent share group snapshot " 

1521 "members.") % share_group_snapshot_members_count) 

1522 raise exception.InvalidShare(reason=msg) 

1523 

1524 self._check_is_share_busy(share) 

1525 self.db.share_soft_delete(context, share_id) 

1526 

1527 def restore(self, context, share): 

1528 """Restore share.""" 

1529 share_id = share['id'] 

1530 self.db.share_restore(context, share_id) 

1531 

1532 @policy.wrap_check_policy('share') 

1533 @prevent_locked_action_on_share 

1534 def delete(self, context, share, force=False): 

1535 """Delete share.""" 

1536 share = self.db.share_get(context, share['id']) 

1537 share_id = share['id'] 

1538 statuses = (constants.STATUS_AVAILABLE, constants.STATUS_ERROR, 

1539 constants.STATUS_INACTIVE) 

1540 if not (force or share['status'] in statuses): 

1541 msg = _("Share status must be one of %(statuses)s") % { 

1542 "statuses": statuses} 

1543 raise exception.InvalidShare(reason=msg) 

1544 

1545 # NOTE(gouthamr): If the share has more than one replica, 

1546 # it can't be deleted until the additional replicas are removed. 

1547 if share.has_replicas: 

1548 msg = _("Share %s has replicas. Remove the replicas before " 

1549 "deleting the share.") % share_id 

1550 raise exception.Conflict(err=msg) 

1551 

1552 snapshots = self.db.share_snapshot_get_all_for_share(context, share_id) 

1553 if len(snapshots): 

1554 msg = _("Share still has %d dependent snapshots.") % len(snapshots) 

1555 raise exception.InvalidShare(reason=msg) 

1556 

1557 filters = dict(share_id=share_id) 

1558 backups = self.db.share_backups_get_all(context, filters=filters) 

1559 if len(backups): 1559 ↛ 1560line 1559 didn't jump to line 1560 because the condition on line 1559 was never true

1560 msg = _("Share still has %d dependent backups.") % len(backups) 

1561 raise exception.InvalidShare(reason=msg) 

1562 

1563 share_group_snapshot_members_count = ( 

1564 self.db.count_share_group_snapshot_members_in_share( 

1565 context, share_id)) 

1566 if share_group_snapshot_members_count: 1566 ↛ 1567line 1566 didn't jump to line 1567 because the condition on line 1566 was never true

1567 msg = ( 

1568 _("Share still has %d dependent share group snapshot " 

1569 "members.") % share_group_snapshot_members_count) 

1570 raise exception.InvalidShare(reason=msg) 

1571 self._check_is_share_busy(share) 

1572 self.delete_scheduler_hints(context, share) 

1573 

1574 for share_instance in share.instances: 

1575 if share_instance['host']: 

1576 self.delete_instance(context, share_instance, force=force) 

1577 else: 

1578 self.db.share_instance_delete( 

1579 context, share_instance['id'], need_to_update_usages=True) 

1580 

1581 def delete_instance(self, context, share_instance, force=False): 

1582 policy.check_policy(context, 'share', 'delete') 

1583 

1584 statuses = (constants.STATUS_AVAILABLE, constants.STATUS_ERROR, 

1585 constants.STATUS_INACTIVE) 

1586 if not (force or share_instance['status'] in statuses): 

1587 msg = _("Share instance status must be one of %(statuses)s") % { 

1588 "statuses": statuses} 

1589 raise exception.InvalidShareInstance(reason=msg) 

1590 

1591 deferred_delete = CONF.is_deferred_deletion_enabled 

1592 if force and deferred_delete: 

1593 deferred_delete = False 

1594 

1595 current_status = share_instance['status'] 

1596 if current_status not in (constants.STATUS_DEFERRED_DELETING, 

1597 constants.STATUS_ERROR_DEFERRED_DELETING): 

1598 new_status = constants.STATUS_DELETING 

1599 if deferred_delete: 

1600 new_status = constants.STATUS_DEFERRED_DELETING 

1601 share_instance = self.db.share_instance_update( 

1602 context, share_instance['id'], 

1603 {'status': new_status, 'terminated_at': timeutils.utcnow()} 

1604 ) 

1605 

1606 self.share_rpcapi.delete_share_instance( 

1607 context, share_instance, 

1608 force=force, 

1609 deferred_delete=deferred_delete) 

1610 

1611 # NOTE(u_glide): 'updated_at' timestamp is used to track last usage of 

1612 # share server. This is required for automatic share servers cleanup 

1613 # because we should track somehow period of time when share server 

1614 # doesn't have shares (unused). We do this update only on share 

1615 # deletion because share server with shares cannot be deleted, so no 

1616 # need to do this update on share creation or any other share operation 

1617 if share_instance['share_server_id']: 

1618 self.db.share_server_update( 

1619 context, 

1620 share_instance['share_server_id'], 

1621 {'updated_at': timeutils.utcnow()}) 

1622 

1623 def delete_share_server(self, context, server): 

1624 """Delete share server.""" 

1625 policy.check_policy(context, 'share_server', 'delete', server) 

1626 shares = self.db.share_instance_get_all_by_share_server( 

1627 context, server['id']) 

1628 

1629 if shares: 

1630 raise exception.ShareServerInUse(share_server_id=server['id']) 

1631 

1632 share_groups = self.db.share_group_get_all_by_share_server( 

1633 context, server['id']) 

1634 if share_groups: 

1635 LOG.error("share server '%(ssid)s' in use by share groups.", 

1636 {'ssid': server['id']}) 

1637 raise exception.ShareServerInUse(share_server_id=server['id']) 

1638 

1639 # NOTE(vponomaryov): There is no share_server status update here, 

1640 # it is intentional. 

1641 # Status will be changed in manila.share.manager after verification 

1642 # for race condition between share creation on server 

1643 # and server deletion. 

1644 self.share_rpcapi.delete_share_server(context, server) 

1645 

1646 def manage_share_server( 

1647 self, context, identifier, host, share_net_subnet, driver_opts): 

1648 """Manage a share server.""" 

1649 

1650 try: 

1651 matched_servers = self.db.share_server_search_by_identifier( 

1652 context, identifier) 

1653 except exception.ShareServerNotFound: 

1654 pass 

1655 else: 

1656 msg = _("Identifier %(identifier)s specified matches existing " 

1657 "share servers: %(servers)s.") % { 

1658 'identifier': identifier, 

1659 'servers': ', '.join(s['identifier'] for s in matched_servers) 

1660 } 

1661 raise exception.InvalidInput(reason=msg) 

1662 

1663 values = { 

1664 'host': host, 

1665 'share_network_subnets': [share_net_subnet], 

1666 'status': constants.STATUS_MANAGING, 

1667 'is_auto_deletable': False, 

1668 'identifier': identifier, 

1669 } 

1670 

1671 server = self.db.share_server_create(context, values) 

1672 

1673 self.share_rpcapi.manage_share_server( 

1674 context, server, identifier, driver_opts) 

1675 

1676 return self.db.share_server_get(context, server['id']) 

1677 

1678 def unmanage_share_server(self, context, share_server, force=False): 

1679 """Unmanage a share server.""" 

1680 

1681 shares = self.db.share_instance_get_all_by_share_server( 

1682 context, share_server['id']) 

1683 

1684 if shares: 

1685 raise exception.ShareServerInUse( 

1686 share_server_id=share_server['id']) 

1687 

1688 share_groups = self.db.share_group_get_all_by_share_server( 

1689 context, share_server['id']) 

1690 if share_groups: 

1691 LOG.error("share server '%(ssid)s' in use by share groups.", 

1692 {'ssid': share_server['id']}) 

1693 raise exception.ShareServerInUse( 

1694 share_server_id=share_server['id']) 

1695 

1696 update_data = {'status': constants.STATUS_UNMANAGING, 

1697 'terminated_at': timeutils.utcnow()} 

1698 

1699 share_server = self.db.share_server_update( 

1700 context, share_server['id'], update_data) 

1701 

1702 self.share_rpcapi.unmanage_share_server( 

1703 context, share_server, force=force) 

1704 

1705 def transfer_accept(self, context, share, new_user, 

1706 new_project, clear_rules=False): 

1707 self.share_rpcapi.transfer_accept(context, share, 

1708 new_user, new_project, 

1709 clear_rules=clear_rules) 

1710 

1711 def create_snapshot(self, context, share, name, description, 

1712 force=False, metadata=None): 

1713 policy.check_policy(context, 'share', 'create_snapshot', share) 

1714 if metadata: 1714 ↛ 1715line 1714 didn't jump to line 1715 because the condition on line 1714 was never true

1715 api_common.check_metadata_properties(metadata) 

1716 

1717 if ((not force) and (share['status'] != constants.STATUS_AVAILABLE)): 

1718 msg = _("Source share status must be " 

1719 "%s") % constants.STATUS_AVAILABLE 

1720 raise exception.InvalidShare(reason=msg) 

1721 

1722 size = share['size'] 

1723 

1724 self._check_is_share_busy(share) 

1725 

1726 try: 

1727 reservations = QUOTAS.reserve( 

1728 context, snapshots=1, snapshot_gigabytes=size, 

1729 share_type_id=share['instance']['share_type_id']) 

1730 except exception.OverQuota as e: 

1731 overs = e.kwargs['overs'] 

1732 usages = e.kwargs['usages'] 

1733 quotas = e.kwargs['quotas'] 

1734 

1735 def _consumed(name): 

1736 return (usages[name]['reserved'] + usages[name]['in_use']) 

1737 

1738 if 'snapshot_gigabytes' in overs: 

1739 msg = ("Quota exceeded for %(s_pid)s, tried to create " 

1740 "%(s_size)sG snapshot (%(d_consumed)dG of " 

1741 "%(d_quota)dG already consumed).") 

1742 LOG.warning(msg, { 

1743 's_pid': context.project_id, 

1744 's_size': size, 

1745 'd_consumed': _consumed('snapshot_gigabytes'), 

1746 'd_quota': quotas['snapshot_gigabytes']}) 

1747 raise exception.SnapshotSizeExceedsAvailableQuota() 

1748 elif 'snapshots' in overs: 1748 ↛ 1756line 1748 didn't jump to line 1756 because the condition on line 1748 was always true

1749 msg = ("Quota exceeded for %(s_pid)s, tried to create " 

1750 "snapshot (%(d_consumed)d snapshots " 

1751 "already consumed).") 

1752 LOG.warning(msg, {'s_pid': context.project_id, 

1753 'd_consumed': _consumed('snapshots')}) 

1754 raise exception.SnapshotLimitExceeded( 

1755 allowed=quotas['snapshots']) 

1756 options = {'share_id': share['id'], 

1757 'size': share['size'], 

1758 'user_id': context.user_id, 

1759 'project_id': context.project_id, 

1760 'status': constants.STATUS_CREATING, 

1761 'progress': '0%', 

1762 'share_size': share['size'], 

1763 'display_name': name, 

1764 'display_description': description, 

1765 'share_proto': share['share_proto']} 

1766 if metadata: 1766 ↛ 1767line 1766 didn't jump to line 1767 because the condition on line 1766 was never true

1767 options.update({"metadata": metadata}) 

1768 

1769 snapshot = None 

1770 try: 

1771 snapshot = self.db.share_snapshot_create(context, options) 

1772 QUOTAS.commit( 

1773 context, reservations, 

1774 share_type_id=share['instance']['share_type_id']) 

1775 except Exception: 

1776 with excutils.save_and_reraise_exception(): 

1777 try: 

1778 if snapshot and snapshot['instance']: 

1779 self.db.share_snapshot_instance_delete( 

1780 context, snapshot['instance']['id']) 

1781 finally: 

1782 QUOTAS.rollback( 

1783 context, reservations, 

1784 share_type_id=share['instance']['share_type_id']) 

1785 

1786 # If replicated share, create snapshot instances for each replica 

1787 if share.get('has_replicas'): 

1788 snapshot = self.db.share_snapshot_get(context, snapshot['id']) 

1789 share_instance_id = snapshot['instance']['share_instance_id'] 

1790 replicas = self.db.share_replicas_get_all_by_share( 

1791 context, share['id']) 

1792 replicas = [r for r in replicas if r['id'] != share_instance_id] 

1793 snapshot_instance = { 

1794 'status': constants.STATUS_CREATING, 

1795 'progress': '0%', 

1796 } 

1797 for replica in replicas: 

1798 snapshot_instance.update({'share_instance_id': replica['id']}) 

1799 self.db.share_snapshot_instance_create( 

1800 context, snapshot['id'], snapshot_instance) 

1801 self.share_rpcapi.create_replicated_snapshot( 

1802 context, share, snapshot) 

1803 

1804 else: 

1805 self.share_rpcapi.create_snapshot(context, share, snapshot) 

1806 

1807 return snapshot 

1808 

1809 def _modify_quotas_for_share_migration(self, context, share, 

1810 new_share_type): 

1811 """Consume quotas for share migration. 

1812 

1813 If a share migration was requested and a new share type was provided, 

1814 quotas must be consumed from this share type. If no quotas are 

1815 available for shares, gigabytes, share replicas or replica gigabytes, 

1816 an error will be thrown. 

1817 """ 

1818 

1819 new_share_type_id = new_share_type['id'] 

1820 

1821 if new_share_type_id == share['share_type_id']: 

1822 return 

1823 

1824 new_type_extra_specs = self.get_share_attributes_from_share_type( 

1825 new_share_type) 

1826 new_type_replication_type = new_type_extra_specs.get( 

1827 'replication_type', None) 

1828 

1829 deltas = {} 

1830 

1831 # NOTE(carloss): If a new share type with a replication type was 

1832 # specified, there is need to allocate quotas in the new share type. 

1833 # We won't remove the current consumed quotas, since both share 

1834 # instances will co-exist until the migration gets completed, 

1835 # cancelled or it fails. 

1836 if new_type_replication_type: 

1837 deltas['share_replicas'] = 1 

1838 deltas['replica_gigabytes'] = share['size'] 

1839 

1840 deltas.update({ 

1841 'share_type_id': new_share_type_id, 

1842 'shares': 1, 

1843 'gigabytes': share['size'] 

1844 }) 

1845 

1846 try: 

1847 reservations = QUOTAS.reserve( 

1848 context, project_id=share['project_id'], 

1849 user_id=share['user_id'], **deltas) 

1850 QUOTAS.commit( 

1851 context, reservations, project_id=share['project_id'], 

1852 user_id=share['user_id'], share_type_id=new_share_type_id) 

1853 except exception.OverQuota as e: 

1854 overs = e.kwargs['overs'] 

1855 usages = e.kwargs['usages'] 

1856 quotas = e.kwargs['quotas'] 

1857 

1858 def _consumed(name): 

1859 return (usages[name]['reserved'] + usages[name]['in_use']) 

1860 

1861 if 'replica_gigabytes' in overs: 

1862 LOG.warning("Replica gigabytes quota exceeded " 

1863 "for %(s_pid)s, tried to migrate " 

1864 "%(s_size)sG share (%(d_consumed)dG of " 

1865 "%(d_quota)dG already consumed).", { 

1866 's_pid': context.project_id, 

1867 's_size': share['size'], 

1868 'd_consumed': _consumed( 

1869 'replica_gigabytes'), 

1870 'd_quota': quotas['replica_gigabytes']}) 

1871 msg = _("Failed while migrating a share with replication " 

1872 "support. Maximum number of allowed " 

1873 "replica gigabytes is exceeded.") 

1874 raise exception.ShareReplicaSizeExceedsAvailableQuota( 

1875 message=msg) 

1876 

1877 if 'share_replicas' in overs: 

1878 LOG.warning("Quota exceeded for %(s_pid)s, " 

1879 "unable to migrate share-replica (%(d_consumed)d " 

1880 "of %(d_quota)d already consumed).", { 

1881 's_pid': context.project_id, 

1882 'd_consumed': _consumed('share_replicas'), 

1883 'd_quota': quotas['share_replicas']}) 

1884 msg = _( 

1885 "Failed while migrating a share with replication " 

1886 "support. Maximum number of allowed share-replicas " 

1887 "is exceeded.") 

1888 raise exception.ShareReplicasLimitExceeded(msg) 

1889 

1890 if 'gigabytes' in overs: 1890 ↛ 1900line 1890 didn't jump to line 1900 because the condition on line 1890 was always true

1891 LOG.warning("Quota exceeded for %(s_pid)s, " 

1892 "tried to migrate " 

1893 "%(s_size)sG share (%(d_consumed)dG of " 

1894 "%(d_quota)dG already consumed).", { 

1895 's_pid': context.project_id, 

1896 's_size': share['size'], 

1897 'd_consumed': _consumed('gigabytes'), 

1898 'd_quota': quotas['gigabytes']}) 

1899 raise exception.ShareSizeExceedsAvailableQuota() 

1900 if 'shares' in overs: 

1901 LOG.warning("Quota exceeded for %(s_pid)s, " 

1902 "tried to migrate " 

1903 "share (%(d_consumed)d shares " 

1904 "already consumed).", { 

1905 's_pid': context.project_id, 

1906 'd_consumed': _consumed('shares')}) 

1907 raise exception.ShareLimitExceeded(allowed=quotas['shares']) 

1908 

1909 def migration_start( 

1910 self, context, share, dest_host, force_host_assisted_migration, 

1911 preserve_metadata, writable, nondisruptive, preserve_snapshots, 

1912 new_share_network=None, new_share_type=None): 

1913 """Migrates share to a new host.""" 

1914 

1915 if force_host_assisted_migration and ( 

1916 preserve_metadata or writable or nondisruptive or 

1917 preserve_snapshots): 

1918 msg = _('Invalid parameter combination. Cannot set parameters ' 

1919 '"nondisruptive", "writable", "preserve_snapshots" or ' 

1920 '"preserve_metadata" to True when enabling the ' 

1921 '"force_host_assisted_migration" option.') 

1922 LOG.error(msg) 

1923 raise exception.InvalidInput(reason=msg) 

1924 

1925 share_instance = share.instance 

1926 

1927 # NOTE(gouthamr): Ensure share does not have replicas. 

1928 # Currently share migrations are disallowed for replicated shares. 

1929 if share.has_replicas: 

1930 msg = _('Share %s has replicas. Remove the replicas before ' 

1931 'attempting to migrate the share.') % share['id'] 

1932 LOG.error(msg) 

1933 raise exception.Conflict(err=msg) 

1934 

1935 # TODO(ganso): We do not support migrating shares in or out of groups 

1936 # for now. 

1937 if share.get('share_group_id'): 

1938 msg = _('Share %s is a member of a group. This operation is not ' 

1939 'currently supported for shares that are members of ' 

1940 'groups.') % share['id'] 

1941 LOG.error(msg) 

1942 raise exception.InvalidShare(reason=msg) 

1943 

1944 # We only handle "available" share for now 

1945 if share_instance['status'] != constants.STATUS_AVAILABLE: 

1946 msg = _('Share instance %(instance_id)s status must be available, ' 

1947 'but current status is: %(instance_status)s.') % { 

1948 'instance_id': share_instance['id'], 

1949 'instance_status': share_instance['status']} 

1950 raise exception.InvalidShare(reason=msg) 

1951 

1952 # Access rules status must not be error 

1953 if share_instance['access_rules_status'] == constants.STATUS_ERROR: 

1954 msg = _('Share instance %(instance_id)s access rules status must ' 

1955 'not be in %(error)s when attempting to start a ' 

1956 'migration.') % { 

1957 'instance_id': share_instance['id'], 

1958 'error': constants.STATUS_ERROR} 

1959 raise exception.InvalidShare(reason=msg) 

1960 

1961 self._check_is_share_busy(share) 

1962 

1963 if force_host_assisted_migration: 

1964 # We only handle shares without snapshots for 

1965 # host-assisted migration 

1966 snaps = self.db.share_snapshot_get_all_for_share(context, 

1967 share['id']) 

1968 if snaps: 1968 ↛ 1973line 1968 didn't jump to line 1973 because the condition on line 1968 was always true

1969 msg = _("Share %s must not have snapshots when using " 

1970 "host-assisted migration.") % share['id'] 

1971 raise exception.Conflict(err=msg) 

1972 

1973 dest_host_host = share_utils.extract_host(dest_host) 

1974 

1975 # Make sure the host is in the list of available hosts 

1976 utils.validate_service_host(context, dest_host_host) 

1977 

1978 if new_share_type: 

1979 share_type = new_share_type 

1980 # ensure pass the size limitations in the share type 

1981 size = share['size'] 

1982 share_types.provision_filter_on_size(context, share_type, size) 

1983 new_share_type_id = new_share_type['id'] 

1984 dhss = share_type['extra_specs']['driver_handles_share_servers'] 

1985 dhss = strutils.bool_from_string(dhss, strict=True) 

1986 if (dhss and not new_share_network and 

1987 not share_instance['share_network_id']): 

1988 msg = _( 

1989 "New share network must be provided when share type of" 

1990 " given share %s has extra_spec " 

1991 "'driver_handles_share_servers' as True.") % share['id'] 

1992 raise exception.InvalidInput(reason=msg) 

1993 self._modify_quotas_for_share_migration(context, share, 

1994 new_share_type) 

1995 else: 

1996 share_type = {} 

1997 share_type_id = share_instance['share_type_id'] 

1998 if share_type_id: 1998 ↛ 2000line 1998 didn't jump to line 2000 because the condition on line 1998 was always true

1999 share_type = share_types.get_share_type(context, share_type_id) 

2000 new_share_type_id = share_instance['share_type_id'] 

2001 

2002 dhss = share_type['extra_specs']['driver_handles_share_servers'] 

2003 dhss = strutils.bool_from_string(dhss, strict=True) 

2004 

2005 if dhss: 

2006 if new_share_network: 

2007 new_share_network_id = new_share_network['id'] 

2008 else: 

2009 new_share_network_id = share_instance['share_network_id'] 

2010 else: 

2011 if new_share_network: 

2012 msg = _( 

2013 "New share network must not be provided when share type of" 

2014 " given share %s has extra_spec " 

2015 "'driver_handles_share_servers' as False.") % share['id'] 

2016 raise exception.InvalidInput(reason=msg) 

2017 

2018 new_share_network_id = None 

2019 

2020 # Make sure the destination is different than the source 

2021 if (new_share_network_id == share_instance['share_network_id'] and 

2022 new_share_type_id == share_instance['share_type_id'] and 

2023 dest_host == share_instance['host']): 

2024 msg = ("Destination host (%(dest_host)s), share network " 

2025 "(%(dest_sn)s) or share type (%(dest_st)s) are the same " 

2026 "as the current host's '%(src_host)s', '%(src_sn)s' and " 

2027 "'%(src_st)s' respectively. Nothing to be done.") % { 

2028 'dest_host': dest_host, 

2029 'dest_sn': new_share_network_id, 

2030 'dest_st': new_share_type_id, 

2031 'src_host': share_instance['host'], 

2032 'src_sn': share_instance['share_network_id'], 

2033 'src_st': share_instance['share_type_id'], 

2034 } 

2035 LOG.info(msg) 

2036 self.db.share_update( 

2037 context, share['id'], 

2038 {'task_state': constants.TASK_STATE_MIGRATION_SUCCESS}) 

2039 return 200 

2040 

2041 service = self.db.service_get_by_args( 

2042 context, dest_host_host, 'manila-share') 

2043 

2044 type_azs = share_type['extra_specs'].get('availability_zones', '') 

2045 type_azs = [t for t in type_azs.split(',') if type_azs] 

2046 if type_azs and service['availability_zone']['name'] not in type_azs: 

2047 msg = _("Share %(shr)s cannot be migrated to host %(dest)s " 

2048 "because share type %(type)s is not supported within the " 

2049 "availability zone (%(az)s) that the host is in.") 

2050 type_name = '%s' % (share_type['name'] or '') 

2051 type_id = '(ID: %s)' % share_type['id'] 

2052 payload = {'type': '%s%s' % (type_name, type_id), 

2053 'az': service['availability_zone']['name'], 

2054 'shr': share['id'], 

2055 'dest': dest_host} 

2056 raise exception.InvalidShare(reason=msg % payload) 

2057 

2058 request_spec = self._get_request_spec_dict( 

2059 context, 

2060 share, 

2061 share_type, 

2062 availability_zone_id=service['availability_zone_id'], 

2063 share_network_id=new_share_network_id) 

2064 

2065 self.db.share_update( 

2066 context, share['id'], 

2067 {'task_state': constants.TASK_STATE_MIGRATION_STARTING}) 

2068 

2069 self.db.share_instance_update(context, share_instance['id'], 

2070 {'status': constants.STATUS_MIGRATING}) 

2071 

2072 self.scheduler_rpcapi.migrate_share_to_host( 

2073 context, share['id'], dest_host, force_host_assisted_migration, 

2074 preserve_metadata, writable, nondisruptive, preserve_snapshots, 

2075 new_share_network_id, new_share_type_id, request_spec) 

2076 

2077 return 202 

2078 

2079 def migration_complete(self, context, share): 

2080 

2081 if share['task_state'] not in ( 

2082 constants.TASK_STATE_DATA_COPYING_COMPLETED, 

2083 constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE): 

2084 msg = self._migration_validate_error_message(share) 

2085 if msg is None: 

2086 msg = _("First migration phase of share %s not completed" 

2087 " yet.") % share['id'] 

2088 LOG.error(msg) 

2089 raise exception.InvalidShare(reason=msg) 

2090 

2091 share_instance_id, new_share_instance_id = ( 

2092 self.get_migrating_instances(share)) 

2093 

2094 share_instance_ref = self.db.share_instance_get( 

2095 context, share_instance_id, with_share_data=True) 

2096 

2097 self.share_rpcapi.migration_complete(context, share_instance_ref, 

2098 new_share_instance_id) 

2099 

2100 def get_migrating_instances(self, share): 

2101 

2102 share_instance_id = None 

2103 new_share_instance_id = None 

2104 

2105 for instance in share.instances: 

2106 if instance['status'] == constants.STATUS_MIGRATING: 

2107 share_instance_id = instance['id'] 

2108 if instance['status'] == constants.STATUS_MIGRATING_TO: 

2109 new_share_instance_id = instance['id'] 

2110 

2111 if None in (share_instance_id, new_share_instance_id): 

2112 msg = _("Share instances %(instance_id)s and " 

2113 "%(new_instance_id)s in inconsistent states, cannot" 

2114 " continue share migration for share %(share_id)s" 

2115 ".") % {'instance_id': share_instance_id, 

2116 'new_instance_id': new_share_instance_id, 

2117 'share_id': share['id']} 

2118 raise exception.ShareMigrationFailed(reason=msg) 

2119 

2120 return share_instance_id, new_share_instance_id 

2121 

2122 def migration_get_progress(self, context, share): 

2123 

2124 if share['task_state'] == ( 

2125 constants.TASK_STATE_MIGRATION_DRIVER_IN_PROGRESS): 

2126 

2127 share_instance_id, migrating_instance_id = ( 

2128 self.get_migrating_instances(share)) 

2129 

2130 share_instance_ref = self.db.share_instance_get( 

2131 context, share_instance_id, with_share_data=True) 

2132 

2133 service_host = share_utils.extract_host(share_instance_ref['host']) 

2134 

2135 service = self.db.service_get_by_args( 

2136 context, service_host, 'manila-share') 

2137 

2138 if utils.service_is_up(service): 

2139 try: 

2140 result = self.share_rpcapi.migration_get_progress( 

2141 context, share_instance_ref, migrating_instance_id) 

2142 except exception.InvalidShare: 

2143 # reload to get the latest task_state 

2144 share = self.db.share_get(context, share['id']) 

2145 result = self._migration_get_progress_state(share) 

2146 except Exception: 

2147 msg = _("Failed to obtain migration progress of share " 

2148 "%s.") % share['id'] 

2149 LOG.exception(msg) 

2150 raise exception.ShareMigrationError(reason=msg) 

2151 else: 

2152 result = None 

2153 

2154 elif share['task_state'] == ( 

2155 constants.TASK_STATE_DATA_COPYING_IN_PROGRESS): 

2156 data_rpc = data_rpcapi.DataAPI() 

2157 LOG.info("Sending request to get share migration information" 

2158 " of share %s.", share['id']) 

2159 

2160 services = self.db.service_get_all_by_topic(context, 'manila-data') 

2161 

2162 if len(services) > 0 and utils.service_is_up(services[0]): 

2163 

2164 try: 

2165 result = data_rpc.data_copy_get_progress( 

2166 context, share['id']) 

2167 except Exception: 

2168 msg = _("Failed to obtain migration progress of share " 

2169 "%s.") % share['id'] 

2170 LOG.exception(msg) 

2171 raise exception.ShareMigrationError(reason=msg) 

2172 else: 

2173 result = None 

2174 else: 

2175 result = self._migration_get_progress_state(share) 

2176 

2177 if not (result and result.get('total_progress') is not None): 

2178 msg = self._migration_validate_error_message(share) 

2179 if msg is None: 

2180 msg = _("Migration progress of share %s cannot be obtained at " 

2181 "this moment.") % share['id'] 

2182 LOG.error(msg) 

2183 raise exception.InvalidShare(reason=msg) 

2184 

2185 return result 

2186 

2187 def _migration_get_progress_state(self, share): 

2188 

2189 task_state = share['task_state'] 

2190 if task_state in (constants.TASK_STATE_MIGRATION_SUCCESS, 

2191 constants.TASK_STATE_DATA_COPYING_ERROR, 

2192 constants.TASK_STATE_MIGRATION_CANCELLED, 

2193 constants.TASK_STATE_MIGRATION_CANCEL_IN_PROGRESS, 

2194 constants.TASK_STATE_MIGRATION_COMPLETING, 

2195 constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE, 

2196 constants.TASK_STATE_DATA_COPYING_COMPLETED, 

2197 constants.TASK_STATE_DATA_COPYING_COMPLETING, 

2198 constants.TASK_STATE_DATA_COPYING_CANCELLED, 

2199 constants.TASK_STATE_MIGRATION_ERROR): 

2200 return {'total_progress': 100} 

2201 elif task_state in (constants.TASK_STATE_MIGRATION_STARTING, 

2202 constants.TASK_STATE_MIGRATION_DRIVER_STARTING, 

2203 constants.TASK_STATE_DATA_COPYING_STARTING, 

2204 constants.TASK_STATE_MIGRATION_IN_PROGRESS): 

2205 return {'total_progress': 0} 

2206 else: 

2207 return None 

2208 

2209 def _migration_validate_error_message(self, resource, 

2210 resource_type='share'): 

2211 task_state = resource['task_state'] 

2212 if task_state == constants.TASK_STATE_MIGRATION_SUCCESS: 

2213 msg = _("Migration of %(resource_type)s %(resource_id)s has " 

2214 "already completed.") % { 

2215 'resource_id': resource['id'], 

2216 'resource_type': resource_type} 

2217 elif task_state in (None, constants.TASK_STATE_MIGRATION_ERROR): 

2218 msg = _("There is no migration being performed for " 

2219 "%(resource_type)s %(resource_id)s at this moment.") % { 

2220 'resource_id': resource['id'], 

2221 'resource_type': resource_type} 

2222 elif task_state == constants.TASK_STATE_MIGRATION_CANCELLED: 

2223 msg = _("Migration of %(resource_type)s %(resource_id)s was " 

2224 "already cancelled.") % { 

2225 'resource_id': resource['id'], 

2226 'resource_type': resource_type} 

2227 elif task_state in (constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE, 

2228 constants.TASK_STATE_DATA_COPYING_COMPLETED): 

2229 msg = _("Migration of %(resource_type)s %(resource_id)s has " 

2230 "already completed first phase.") % { 

2231 'resource_id': resource['id'], 

2232 'resource_type': resource_type} 

2233 else: 

2234 return None 

2235 return msg 

2236 

2237 def migration_cancel(self, context, share): 

2238 

2239 migrating = True 

2240 if share['task_state'] in ( 

2241 constants.TASK_STATE_DATA_COPYING_COMPLETED, 

2242 constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE, 

2243 constants.TASK_STATE_MIGRATION_DRIVER_IN_PROGRESS): 

2244 

2245 share_instance_id, migrating_instance_id = ( 

2246 self.get_migrating_instances(share)) 

2247 

2248 share_instance_ref = self.db.share_instance_get( 

2249 context, share_instance_id, with_share_data=True) 

2250 

2251 service_host = share_utils.extract_host(share_instance_ref['host']) 

2252 

2253 service = self.db.service_get_by_args( 

2254 context, service_host, 'manila-share') 

2255 

2256 if utils.service_is_up(service): 

2257 self.share_rpcapi.migration_cancel( 

2258 context, share_instance_ref, migrating_instance_id) 

2259 else: 

2260 migrating = False 

2261 

2262 elif share['task_state'] == ( 

2263 constants.TASK_STATE_DATA_COPYING_IN_PROGRESS): 

2264 

2265 data_rpc = data_rpcapi.DataAPI() 

2266 LOG.info("Sending request to cancel migration of " 

2267 "share %s.", share['id']) 

2268 

2269 services = self.db.service_get_all_by_topic(context, 'manila-data') 

2270 

2271 if len(services) > 0 and utils.service_is_up(services[0]): 

2272 try: 

2273 data_rpc.data_copy_cancel(context, share['id']) 

2274 except Exception: 

2275 msg = _("Failed to cancel migration of share " 

2276 "%s.") % share['id'] 

2277 LOG.exception(msg) 

2278 raise exception.ShareMigrationError(reason=msg) 

2279 else: 

2280 migrating = False 

2281 

2282 else: 

2283 migrating = False 

2284 

2285 if not migrating: 

2286 msg = self._migration_validate_error_message(share) 

2287 if msg is None: 

2288 msg = _("Migration of share %s cannot be cancelled at this " 

2289 "moment.") % share['id'] 

2290 LOG.error(msg) 

2291 raise exception.InvalidShare(reason=msg) 

2292 

2293 @policy.wrap_check_policy('share') 

2294 def delete_snapshot(self, context, snapshot, force=False): 

2295 statuses = (constants.STATUS_AVAILABLE, constants.STATUS_ERROR) 

2296 if not (force or snapshot['aggregate_status'] in statuses): 

2297 msg = _("Share Snapshot status must be one of %(statuses)s.") % { 

2298 "statuses": statuses} 

2299 raise exception.InvalidShareSnapshot(reason=msg) 

2300 

2301 share = self.db.share_get(context, snapshot['share_id']) 

2302 

2303 snapshot_instances = ( 

2304 self.db.share_snapshot_instance_get_all_with_filters( 

2305 context, {'snapshot_ids': snapshot['id']}) 

2306 ) 

2307 

2308 deferred_delete = CONF.is_deferred_deletion_enabled 

2309 if force and deferred_delete: 

2310 deferred_delete = False 

2311 

2312 current_status = snapshot['aggregate_status'] 

2313 if current_status not in (constants.STATUS_DEFERRED_DELETING, 2313 ↛ 2322line 2313 didn't jump to line 2322 because the condition on line 2313 was always true

2314 constants.STATUS_ERROR_DEFERRED_DELETING): 

2315 new_status = constants.STATUS_DELETING 

2316 if deferred_delete: 

2317 new_status = constants.STATUS_DEFERRED_DELETING 

2318 for snapshot_instance in snapshot_instances: 

2319 self.db.share_snapshot_instance_update( 

2320 context, snapshot_instance['id'], {'status': new_status}) 

2321 

2322 if share['has_replicas']: 

2323 self.share_rpcapi.delete_replicated_snapshot( 

2324 context, snapshot, share['instance']['host'], 

2325 share_id=share['id'], force=force) 

2326 else: 

2327 self.share_rpcapi.delete_snapshot( 

2328 context, snapshot, share['instance']['host'], 

2329 force=force, deferred_delete=deferred_delete) 

2330 

2331 @policy.wrap_check_policy('share') 

2332 def update(self, context, share, fields): 

2333 return self.db.share_update(context, share['id'], fields) 

2334 

2335 @policy.wrap_check_policy('share') 

2336 def snapshot_update(self, context, snapshot, fields): 

2337 return self.db.share_snapshot_update(context, snapshot['id'], fields) 

2338 

2339 def get(self, context, share_id): 

2340 share = self.db.share_get(context, share_id) 

2341 if not share['is_public']: 

2342 authorized = policy.check_policy( 

2343 context, 'share', 'get', share, do_raise=False) 

2344 if not authorized: 

2345 raise exception.NotFound() 

2346 if share['status'] in ( 

2347 constants.STATUS_DEFERRED_DELETING, 

2348 constants.STATUS_ERROR_DEFERRED_DELETING): 

2349 policy_str = "list_shares_in_deferred_deletion_states" 

2350 authorized = policy.check_policy( 

2351 context, 'share', policy_str, share, do_raise=False) 

2352 if not authorized: 

2353 raise exception.NotFound() 

2354 return share 

2355 

2356 def get_all(self, context, search_opts=None, sort_key='created_at', 

2357 sort_dir='desc'): 

2358 return self._get_all(context, search_opts=search_opts, 

2359 sort_key=sort_key, sort_dir=sort_dir) 

2360 

2361 def get_all_with_count(self, context, search_opts=None, 

2362 sort_key='created_at', sort_dir='desc'): 

2363 return self._get_all(context, search_opts=search_opts, 

2364 sort_key=sort_key, sort_dir=sort_dir, 

2365 show_count=True) 

2366 

2367 def _get_all(self, context, search_opts=None, sort_key='created_at', 

2368 sort_dir='desc', show_count=False): 

2369 if search_opts is None: 

2370 search_opts = {} 

2371 

2372 LOG.debug("Searching for shares by: %s", search_opts) 

2373 

2374 # Prepare filters 

2375 filters = {} 

2376 

2377 filter_keys = [ 

2378 'display_name', 'share_group_id', 'display_name~', 

2379 'display_description', 'display_description~', 'snapshot_id', 

2380 'status', 'share_type_id', 'project_id', 'export_location_id', 

2381 'export_location_path', 'limit', 'offset', 'host', 

2382 'share_network_id', 'is_soft_deleted', 'mount_point_name', 

2383 'encryption_key_ref'] 

2384 

2385 for key in filter_keys: 

2386 if key in search_opts: 

2387 filters[key] = search_opts.pop(key) 

2388 

2389 if 'metadata' in search_opts: 

2390 filters['metadata'] = search_opts.pop('metadata') 

2391 if not isinstance(filters['metadata'], dict): 

2392 msg = _("Wrong metadata filter provided: " 

2393 "%s.") % filters['metadata'] 

2394 raise exception.InvalidInput(reason=msg) 

2395 if 'extra_specs' in search_opts: 

2396 # Verify policy for extra-specs access 

2397 policy.check_policy(context, 'share_types_extra_spec', 'index') 

2398 filters['extra_specs'] = search_opts.pop('extra_specs') 

2399 if not isinstance(filters['extra_specs'], dict): 

2400 msg = _("Wrong extra specs filter provided: " 

2401 "%s.") % filters['extra_specs'] 

2402 raise exception.InvalidInput(reason=msg) 

2403 

2404 if not (isinstance(sort_key, str) and sort_key): 

2405 msg = _("Wrong sort_key filter provided: " 

2406 "'%s'.") % sort_key 

2407 raise exception.InvalidInput(reason=msg) 

2408 if not (isinstance(sort_dir, str) and sort_dir): 

2409 msg = _("Wrong sort_dir filter provided: " 

2410 "'%s'.") % sort_dir 

2411 raise exception.InvalidInput(reason=msg) 

2412 

2413 is_public = search_opts.pop('is_public', False) 

2414 is_public = strutils.bool_from_string(is_public, strict=True) 

2415 

2416 get_methods = { 

2417 'get_by_share_server': ( 

2418 self.db.share_get_all_by_share_server_with_count 

2419 if show_count else self.db.share_get_all_by_share_server), 

2420 'get_all': ( 

2421 self.db.share_get_all_with_count 

2422 if show_count else self.db.share_get_all), 

2423 'get_all_by_project': ( 

2424 self.db.share_get_all_by_project_with_count 

2425 if show_count else self.db.share_get_all_by_project)} 

2426 

2427 # check if user is querying with deferred states and forbid 

2428 # users that aren't authorized to query shares in these states 

2429 policy_str = "list_shares_in_deferred_deletion_states" 

2430 do_raise = ('status' in filters and 'deferred' in filters['status']) 

2431 show_deferred_deleted = policy.check_policy( 

2432 context, 'share', policy_str, do_raise=do_raise) 

2433 if show_deferred_deleted: 

2434 filters['list_deferred_delete'] = True 

2435 

2436 list_all_projects = False 

2437 all_tenants = utils.is_all_tenants(search_opts) 

2438 if all_tenants: 

2439 list_all_projects = policy.check_policy( 

2440 context, 'share', 'list_all_projects', do_raise=False) 

2441 

2442 # Get filtered list of shares 

2443 if 'host' in filters: 

2444 policy.check_policy(context, 'share', 'list_by_host') 

2445 if 'share_server_id' in search_opts: 

2446 # NOTE(vponomaryov): this is project_id independent 

2447 policy.check_policy(context, 'share', 'list_by_share_server_id') 

2448 result = get_methods['get_by_share_server']( 

2449 context, search_opts.pop('share_server_id'), filters=filters, 

2450 sort_key=sort_key, sort_dir=sort_dir) 

2451 elif list_all_projects: 

2452 result = get_methods['get_all']( 

2453 context, filters=filters, sort_key=sort_key, sort_dir=sort_dir) 

2454 else: 

2455 result = get_methods['get_all_by_project']( 

2456 context, project_id=context.project_id, filters=filters, 

2457 is_public=is_public, sort_key=sort_key, sort_dir=sort_dir) 

2458 

2459 if show_count: 2459 ↛ 2460line 2459 didn't jump to line 2460 because the condition on line 2459 was never true

2460 count = result[0] 

2461 shares = result[1] 

2462 else: 

2463 shares = result 

2464 

2465 result = (count, shares) if show_count else shares 

2466 

2467 return result 

2468 

2469 def get_snapshot(self, context, snapshot_id): 

2470 snapshot = self.db.share_snapshot_get(context, snapshot_id) 

2471 authorized = policy.check_policy(context, 'share_snapshot', 

2472 'get_snapshot', snapshot, 

2473 do_raise=False) 

2474 if not authorized: 

2475 raise exception.NotFound() 

2476 if snapshot.get('status') in ( 

2477 constants.STATUS_DEFERRED_DELETING, 

2478 constants.STATUS_ERROR_DEFERRED_DELETING): 

2479 policy_str = "list_snapshots_in_deferred_deletion_states" 

2480 authorized = policy.check_policy( 

2481 context, 'share_snapshot', policy_str, 

2482 snapshot, do_raise=False) 

2483 if not authorized: 2483 ↛ 2485line 2483 didn't jump to line 2485 because the condition on line 2483 was always true

2484 raise exception.NotFound() 

2485 return snapshot 

2486 

2487 def get_all_snapshots(self, context, search_opts=None, limit=None, 

2488 offset=None, sort_key='share_id', sort_dir='desc'): 

2489 return self._get_all_snapshots(context, search_opts=search_opts, 

2490 limit=limit, offset=offset, 

2491 sort_key=sort_key, sort_dir=sort_dir) 

2492 

2493 def get_all_snapshots_with_count(self, context, search_opts=None, 

2494 limit=None, offset=None, 

2495 sort_key='share_id', sort_dir='desc'): 

2496 return self._get_all_snapshots(context, search_opts=search_opts, 

2497 limit=limit, offset=offset, 

2498 sort_key=sort_key, sort_dir=sort_dir, 

2499 show_count=True) 

2500 

2501 def _get_all_snapshots(self, context, search_opts=None, limit=None, 

2502 offset=None, sort_key='share_id', sort_dir='desc', 

2503 show_count=False): 

2504 policy.check_policy(context, 'share_snapshot', 'get_all_snapshots') 

2505 

2506 search_opts = search_opts or {} 

2507 LOG.debug("Searching for snapshots by: %s", search_opts) 

2508 

2509 # Read and remove key 'all_tenants' if was provided 

2510 list_all_projects = False 

2511 all_tenants = utils.is_all_tenants(search_opts) 

2512 if all_tenants: 

2513 search_opts.pop('all_tenants', None) 

2514 list_all_projects = policy.check_policy( 

2515 context, 'share_snapshot', 'list_all_projects', do_raise=False) 

2516 

2517 string_args = {'sort_key': sort_key, 'sort_dir': sort_dir} 

2518 string_args.update(search_opts) 

2519 for k, v in string_args.items(): 

2520 if not (isinstance(v, str) and v) and k != 'metadata': 

2521 msg = _("Wrong '%(k)s' filter provided: " 

2522 "'%(v)s'.") % {'k': k, 'v': string_args[k]} 

2523 raise exception.InvalidInput(reason=msg) 

2524 

2525 # check if user is querying with deferred states and forbid 

2526 # users that aren't authorized to query shares in these states 

2527 policy_str = "list_snapshots_in_deferred_deletion_states" 

2528 do_raise = ('status' in search_opts and 

2529 'deferred' in search_opts['status']) 

2530 show_deferred_deleted = policy.check_policy( 

2531 context, 'share_snapshot', policy_str, do_raise=do_raise) 

2532 if show_deferred_deleted: 

2533 search_opts['list_deferred_delete'] = True 

2534 

2535 get_methods = { 

2536 'get_all': ( 

2537 self.db.share_snapshot_get_all_with_count 

2538 if show_count else self.db.share_snapshot_get_all), 

2539 'get_all_by_project': ( 

2540 self.db.share_snapshot_get_all_by_project_with_count 

2541 if show_count else self.db.share_snapshot_get_all_by_project)} 

2542 

2543 if list_all_projects: 

2544 result = get_methods['get_all']( 

2545 context, filters=search_opts, limit=limit, offset=offset, 

2546 sort_key=sort_key, sort_dir=sort_dir) 

2547 else: 

2548 result = get_methods['get_all_by_project']( 

2549 context, context.project_id, filters=search_opts, 

2550 limit=limit, offset=offset, sort_key=sort_key, 

2551 sort_dir=sort_dir) 

2552 

2553 if show_count: 2553 ↛ 2554line 2553 didn't jump to line 2554 because the condition on line 2553 was never true

2554 count = result[0] 

2555 snapshots = result[1] 

2556 else: 

2557 snapshots = result 

2558 

2559 result = (count, snapshots) if show_count else snapshots 

2560 return result 

2561 

2562 def get_latest_snapshot_for_share(self, context, share_id): 

2563 """Get the newest snapshot of a share.""" 

2564 return self.db.share_snapshot_get_latest_for_share(context, share_id) 

2565 

2566 @staticmethod 

2567 def _any_invalid_share_instance(share, allow_on_error_state=False): 

2568 invalid_states = ( 

2569 constants.INVALID_SHARE_INSTANCE_STATUSES_FOR_ACCESS_RULE_UPDATES) 

2570 if not allow_on_error_state: 2570 ↛ 2573line 2570 didn't jump to line 2573 because the condition on line 2570 was always true

2571 invalid_states += (constants.STATUS_ERROR,) 

2572 

2573 for instance in share.instances: 

2574 if (not instance['host'] or instance['status'] in invalid_states): 

2575 return True 

2576 return False 

2577 

2578 def allow_access(self, ctx, share, access_type, access_to, 

2579 access_level=None, metadata=None, 

2580 allow_on_error_state=False): 

2581 """Allow access to share.""" 

2582 

2583 # Access rule validation: 

2584 if access_level not in constants.ACCESS_LEVELS + (None, ): 

2585 msg = _("Invalid share access level: %s.") % access_level 

2586 raise exception.InvalidShareAccess(reason=msg) 

2587 

2588 api_common.check_metadata_properties(metadata) 

2589 access_exists = self.db.share_access_check_for_existing_access( 

2590 ctx, share['id'], access_type, access_to) 

2591 

2592 if access_exists: 

2593 raise exception.ShareAccessExists(access_type=access_type, 

2594 access=access_to) 

2595 

2596 if self._any_invalid_share_instance(share, allow_on_error_state): 

2597 msg = _("New access rules cannot be applied while the share or " 

2598 "any of its replicas or migration copies lacks a valid " 

2599 "host or is in an invalid state.") 

2600 raise exception.InvalidShare(message=msg) 

2601 

2602 values = { 

2603 'share_id': share['id'], 

2604 'access_type': access_type, 

2605 'access_to': access_to, 

2606 'access_level': access_level, 

2607 'metadata': metadata, 

2608 } 

2609 

2610 access = self.db.share_access_create(ctx, values) 

2611 

2612 for share_instance in share.instances: 

2613 self.allow_access_to_instance(ctx, share_instance) 

2614 

2615 return access 

2616 

2617 def allow_access_to_instance(self, context, share_instance): 

2618 self._conditionally_transition_share_instance_access_rules_status( 

2619 context, share_instance) 

2620 self.share_rpcapi.update_access(context, share_instance) 

2621 

2622 def _conditionally_transition_share_instance_access_rules_status( 

2623 self, context, share_instance): 

2624 conditionally_change = { 

2625 constants.STATUS_ACTIVE: constants.SHARE_INSTANCE_RULES_SYNCING, 

2626 } 

2627 self.access_helper.get_and_update_share_instance_access_rules_status( 

2628 context, conditionally_change=conditionally_change, 

2629 share_instance_id=share_instance['id']) 

2630 

2631 def update_access(self, ctx, share, access, values): 

2632 

2633 if self._any_invalid_share_instance(share, allow_on_error_state=True): 

2634 msg = _("Access rules cannot be updated while the share, " 

2635 "any of its replicas or migration copies lacks a valid " 

2636 "host or is in an invalid state.") 

2637 raise exception.InvalidShare(message=msg) 

2638 

2639 access = self.db.share_access_update(ctx, access['id'], values) 

2640 for share_instance in share.instances: 

2641 self.update_access_to_instance(ctx, share_instance, access) 

2642 

2643 return access 

2644 

2645 def update_access_to_instance(self, context, share_instance, access): 

2646 self._conditionally_transition_share_instance_access_rules_status( 

2647 context, share_instance) 

2648 updates = {'state': constants.ACCESS_STATE_QUEUED_TO_UPDATE} 

2649 self.access_helper.get_and_update_share_instance_access_rule( 

2650 context, access['id'], updates=updates, 

2651 share_instance_id=share_instance['id']) 

2652 

2653 self.share_rpcapi.update_access(context, share_instance) 

2654 

2655 def deny_access(self, ctx, share, access, allow_on_error_state=False): 

2656 """Deny access to share.""" 

2657 

2658 if self._any_invalid_share_instance(share, allow_on_error_state): 

2659 msg = _("Access rules cannot be denied while the share, " 

2660 "any of its replicas or migration copies lacks a valid " 

2661 "host or is in an invalid state.") 

2662 raise exception.InvalidShare(message=msg) 

2663 

2664 for share_instance in share.instances: 

2665 self.deny_access_to_instance(ctx, share_instance, access) 

2666 

2667 def deny_access_to_instance(self, context, share_instance, access): 

2668 self._conditionally_transition_share_instance_access_rules_status( 

2669 context, share_instance) 

2670 updates = {'state': constants.ACCESS_STATE_QUEUED_TO_DENY} 

2671 self.access_helper.get_and_update_share_instance_access_rule( 

2672 context, access['id'], updates=updates, 

2673 share_instance_id=share_instance['id']) 

2674 

2675 self.share_rpcapi.update_access(context, share_instance) 

2676 

2677 def access_get_all(self, context, share, filters=None): 

2678 """Returns all access rules for share.""" 

2679 policy.check_policy(context, 'share', 'access_get_all') 

2680 rules = self.db.share_access_get_all_for_share( 

2681 context, share['id'], filters=filters) 

2682 return rules 

2683 

2684 def access_get(self, context, access_id): 

2685 """Returns access rule with the id.""" 

2686 policy.check_policy(context, 'share', 'access_get') 

2687 rule = self.db.share_access_get(context, access_id) 

2688 # NOTE(gouthamr): Check if the caller has access to the share that 

2689 # the rule belongs to: 

2690 self.get(context, rule['share_id']) 

2691 

2692 return rule 

2693 

2694 def _validate_scheduler_hints(self, context, share, share_uuids): 

2695 for uuid in share_uuids: 

2696 if not uuidutils.is_uuid_like(uuid): 

2697 raise exception.InvalidUUID(uuid=uuid) 

2698 try: 

2699 self.get(context, uuid) 

2700 except (exception.NotFound, exception.PolicyNotAuthorized): 

2701 raise exception.ShareNotFound(share_id=uuid) 

2702 

2703 def _save_scheduler_hints(self, context, share, share_uuids, key): 

2704 share_uuids = share_uuids.split(",") 

2705 

2706 self._validate_scheduler_hints(context, share, share_uuids) 

2707 val_uuids = None 

2708 for uuid in share_uuids: 

2709 try: 

2710 result = self.db.share_metadata_get_item(context, uuid, key) 

2711 except exception.MetadataItemNotFound: 

2712 item = {key: share['id']} 

2713 else: 

2714 existing_uuids = result.get(key, "") 

2715 item = {key: 

2716 ','.join(existing_uuids.split(',') + [share['id']])} 

2717 self.db.share_metadata_update_item(context, uuid, item) 

2718 if not val_uuids: 

2719 val_uuids = uuid 

2720 else: 

2721 val_uuids = val_uuids + "," + uuid 

2722 

2723 if val_uuids: 

2724 item = {key: val_uuids} 

2725 self.db.share_metadata_update_item(context, share['id'], item) 

2726 

2727 def save_scheduler_hints(self, context, share, scheduler_hints=None): 

2728 if scheduler_hints is None: 2728 ↛ 2731line 2728 didn't jump to line 2731 because the condition on line 2728 was always true

2729 return 

2730 

2731 same_host_uuids = scheduler_hints.get(AFFINITY_HINT, None) 

2732 different_host_uuids = scheduler_hints.get(ANTI_AFFINITY_HINT, None) 

2733 

2734 if same_host_uuids: 

2735 self._save_scheduler_hints(context, share, same_host_uuids, 

2736 AFFINITY_KEY) 

2737 if different_host_uuids: 

2738 self._save_scheduler_hints(context, share, different_host_uuids, 

2739 ANTI_AFFINITY_KEY) 

2740 

2741 def _delete_scheduler_hints(self, context, share, key): 

2742 try: 

2743 result = self.db.share_metadata_get_item(context, share['id'], 

2744 key) 

2745 except exception.MetadataItemNotFound: 

2746 return 

2747 

2748 share_uuids = result.get(key, "").split(",") 

2749 for uuid in share_uuids: 

2750 try: 

2751 result = self.db.share_metadata_get_item(context, uuid, key) 

2752 except exception.MetadataItemNotFound: 

2753 continue 

2754 

2755 new_val_uuids = [val_uuid for val_uuid 

2756 in result.get(key, "").split(",") 

2757 if val_uuid != share['id']] 

2758 if not new_val_uuids: 

2759 self.db.share_metadata_delete(context, uuid, key) 

2760 else: 

2761 item = {key: ','.join(new_val_uuids)} 

2762 self.db.share_metadata_update_item(context, uuid, item) 

2763 self.db.share_metadata_delete(context, share['id'], key) 

2764 

2765 def delete_scheduler_hints(self, context, share): 

2766 self._delete_scheduler_hints(context, share, AFFINITY_KEY) 

2767 self._delete_scheduler_hints(context, share, ANTI_AFFINITY_KEY) 

2768 

2769 def _check_is_share_busy(self, share): 

2770 """Raises an exception if share is busy with an active task.""" 

2771 if share.is_busy: 

2772 msg = _("Share %(share_id)s is busy as part of an active " 

2773 "task: %(task)s.") % { 

2774 'share_id': share['id'], 

2775 'task': share['task_state'] 

2776 } 

2777 raise exception.ShareBusyException(reason=msg) 

2778 

2779 @staticmethod 

2780 def check_is_share_size_within_per_share_quota_limit(context, size): 

2781 """Raises an exception if share size above per share quota limit.""" 

2782 try: 

2783 values = {'per_share_gigabytes': size} 

2784 QUOTAS.limit_check(context, project_id=context.project_id, 

2785 **values) 

2786 except exception.OverQuota as e: 

2787 quotas = e.kwargs['quotas'] 

2788 raise exception.ShareSizeExceedsLimit( 

2789 size=size, limit=quotas['per_share_gigabytes']) 

2790 

2791 def update_share_access_metadata(self, context, access_id, metadata): 

2792 """Updates share access metadata.""" 

2793 try: 

2794 api_common.check_metadata_properties(metadata) 

2795 except exception.InvalidMetadata: 

2796 raise exception.InvalidMetadata() 

2797 except exception.InvalidMetadataSize: 

2798 raise exception.InvalidMetadataSize() 

2799 return self.db.share_access_metadata_update( 

2800 context, access_id, metadata) 

2801 

2802 def get_share_network(self, context, share_net_id): 

2803 return self.db.share_network_get(context, share_net_id) 

2804 

2805 def extend(self, context, share, new_size, force=False): 

2806 if force: 2806 ↛ 2807line 2806 didn't jump to line 2807 because the condition on line 2806 was never true

2807 policy.check_policy(context, 'share', 'force_extend', share) 

2808 else: 

2809 policy.check_policy(context, 'share', 'extend', share) 

2810 

2811 if share['status'] != constants.STATUS_AVAILABLE: 

2812 msg_params = { 

2813 'valid_status': constants.STATUS_AVAILABLE, 

2814 'share_id': share['id'], 

2815 'status': share['status'], 

2816 } 

2817 msg = _("Share %(share_id)s status must be '%(valid_status)s' " 

2818 "to extend, but current status is: " 

2819 "%(status)s.") % msg_params 

2820 raise exception.InvalidShare(reason=msg) 

2821 

2822 self._check_is_share_busy(share) 

2823 

2824 size_increase = int(new_size) - share['size'] 

2825 if size_increase <= 0: 

2826 msg = (_("New size for extend must be greater " 

2827 "than current size. (current: %(size)s, " 

2828 "extended: %(new_size)s).") % {'new_size': new_size, 

2829 'size': share['size']}) 

2830 raise exception.InvalidInput(reason=msg) 

2831 

2832 self.check_is_share_size_within_per_share_quota_limit(context, 

2833 new_size) 

2834 

2835 # ensure we pass the share_type provisioning filter on size 

2836 try: 

2837 share_type = share_types.get_share_type( 

2838 context, share['instance']['share_type_id']) 

2839 except (exception.InvalidShareType, exception.ShareTypeNotFound): 

2840 share_type = None 

2841 

2842 allowed_to_extend_past_max_share_size = policy.check_policy( 

2843 context, 'share', constants.POLICY_EXTEND_BEYOND_MAX_SHARE_SIZE, 

2844 target_obj=share, do_raise=False) 

2845 if allowed_to_extend_past_max_share_size: 

2846 share_types.provision_filter_on_size(context, share_type, 

2847 new_size, 

2848 operation='admin-extend') 

2849 else: 

2850 share_types.provision_filter_on_size(context, share_type, 

2851 new_size, operation='extend') 

2852 

2853 replicas = self.db.share_replicas_get_all_by_share( 

2854 context, share['id']) 

2855 supports_replication = len(replicas) > 0 

2856 

2857 deltas = { 

2858 'project_id': share['project_id'], 

2859 'gigabytes': size_increase, 

2860 'user_id': share['user_id'], 

2861 'share_type_id': share['instance']['share_type_id'] 

2862 } 

2863 

2864 # NOTE(carloss): If the share type supports replication, we must get 

2865 # all the replicas that pertain to the share and calculate the final 

2866 # size (size to increase * amount of replicas), since all the replicas 

2867 # are going to be extended when the driver sync them. 

2868 if supports_replication: 

2869 replica_gigs_to_increase = len(replicas) * size_increase 

2870 deltas.update({'replica_gigabytes': replica_gigs_to_increase}) 

2871 

2872 try: 

2873 # we give the user_id of the share, to update the quota usage 

2874 # for the user, who created the share, because on share delete 

2875 # only this quota will be decreased 

2876 reservations = QUOTAS.reserve(context, **deltas) 

2877 except exception.OverQuota as exc: 

2878 # Check if the exceeded quota was 'gigabytes' 

2879 self.check_if_share_quotas_exceeded(context, exc, share['size'], 

2880 operation='extend') 

2881 # NOTE(carloss): Check if the exceeded quota is 

2882 # 'replica_gigabytes'. If so the failure could be caused due to 

2883 # lack of quotas to extend the share's replicas, then the 

2884 # 'check_if_replica_quotas_exceeded' method can't be reused here 

2885 # since the error message must be different from the default one. 

2886 if supports_replication: 2886 ↛ 2912line 2886 didn't jump to line 2912 because the condition on line 2886 was always true

2887 overs = exc.kwargs['overs'] 

2888 usages = exc.kwargs['usages'] 

2889 quotas = exc.kwargs['quotas'] 

2890 

2891 def _consumed(name): 

2892 return (usages[name]['reserved'] + usages[name]['in_use']) 

2893 

2894 if 'replica_gigabytes' in overs: 2894 ↛ 2912line 2894 didn't jump to line 2912 because the condition on line 2894 was always true

2895 LOG.warning("Replica gigabytes quota exceeded " 

2896 "for %(s_pid)s, tried to extend " 

2897 "%(s_size)sG share (%(d_consumed)dG of " 

2898 "%(d_quota)dG already consumed).", { 

2899 's_pid': context.project_id, 

2900 's_size': share['size'], 

2901 'd_consumed': _consumed( 

2902 'replica_gigabytes'), 

2903 'd_quota': quotas['replica_gigabytes']}) 

2904 msg = _("Failed while extending a share with replication " 

2905 "support. There is no available quota to extend " 

2906 "the share and its %(count)d replicas. Maximum " 

2907 "number of allowed replica_gigabytes is " 

2908 "exceeded.") % {'count': len(replicas)} 

2909 raise exception.ShareReplicaSizeExceedsAvailableQuota( 

2910 message=msg) 

2911 

2912 self.update(context, share, {'status': constants.STATUS_EXTENDING}) 

2913 if force: 2913 ↛ 2914line 2913 didn't jump to line 2914 because the condition on line 2913 was never true

2914 self.share_rpcapi.extend_share(context, share, 

2915 new_size, reservations) 

2916 else: 

2917 share_type = share_types.get_share_type( 

2918 context, share['instance']['share_type_id']) 

2919 request_spec = self._get_request_spec_dict(context, share, 

2920 share_type) 

2921 request_spec.update({'is_share_extend': True}) 

2922 self.scheduler_rpcapi.extend_share(context, share['id'], new_size, 

2923 reservations, request_spec) 

2924 LOG.info("Extend share request issued successfully.", 

2925 resource=share) 

2926 

2927 def shrink(self, context, share, new_size): 

2928 status = str(share['status']).lower() 

2929 valid_statuses = (constants.STATUS_AVAILABLE, 

2930 constants.STATUS_SHRINKING_POSSIBLE_DATA_LOSS_ERROR) 

2931 

2932 if status not in valid_statuses: 

2933 msg_params = { 

2934 'valid_status': ", ".join(valid_statuses), 

2935 'share_id': share['id'], 

2936 'status': status, 

2937 } 

2938 msg = _("Share %(share_id)s status must in (%(valid_status)s) " 

2939 "to shrink, but current status is: " 

2940 "%(status)s.") % msg_params 

2941 raise exception.InvalidShare(reason=msg) 

2942 

2943 self._check_is_share_busy(share) 

2944 

2945 size_decrease = int(share['size']) - int(new_size) 

2946 if size_decrease <= 0 or new_size <= 0: 

2947 msg = (_("New size for shrink must be less " 

2948 "than current size and greater than 0 (current: %(size)s," 

2949 " new: %(new_size)s)") % {'new_size': new_size, 

2950 'size': share['size']}) 

2951 raise exception.InvalidInput(reason=msg) 

2952 

2953 # ensure we pass the share_type provisioning filter on size 

2954 try: 

2955 share_type = share_types.get_share_type( 

2956 context, share['instance']['share_type_id']) 

2957 except (exception.InvalidShareType, exception.ShareTypeNotFound): 

2958 share_type = None 

2959 share_types.provision_filter_on_size(context, share_type, new_size, 

2960 operation='shrink') 

2961 

2962 self.update(context, share, {'status': constants.STATUS_SHRINKING}) 

2963 self.share_rpcapi.shrink_share(context, share, new_size) 

2964 LOG.info("Shrink share (id=%(id)s) request issued successfully." 

2965 " New size: %(size)s", {'id': share['id'], 

2966 'size': new_size}) 

2967 

2968 def snapshot_allow_access(self, context, snapshot, access_type, access_to): 

2969 """Allow access to a share snapshot.""" 

2970 access_exists = self.db.share_snapshot_check_for_existing_access( 

2971 context, snapshot['id'], access_type, access_to) 

2972 

2973 if access_exists: 

2974 raise exception.ShareSnapshotAccessExists(access_type=access_type, 

2975 access=access_to) 

2976 

2977 values = { 

2978 'share_snapshot_id': snapshot['id'], 

2979 'access_type': access_type, 

2980 'access_to': access_to, 

2981 } 

2982 

2983 if any((instance['status'] != constants.STATUS_AVAILABLE) or 

2984 (instance['share_instance']['host'] is None) 

2985 for instance in snapshot.instances): 

2986 msg = _("New access rules cannot be applied while the snapshot or " 

2987 "any of its replicas or migration copies lacks a valid " 

2988 "host or is not in %s state.") % constants.STATUS_AVAILABLE 

2989 

2990 raise exception.InvalidShareSnapshotInstance(reason=msg) 

2991 

2992 access = self.db.share_snapshot_access_create(context, values) 

2993 

2994 for snapshot_instance in snapshot.instances: 

2995 self.share_rpcapi.snapshot_update_access( 

2996 context, snapshot_instance) 

2997 

2998 return access 

2999 

3000 def snapshot_deny_access(self, context, snapshot, access): 

3001 """Deny access to a share snapshot.""" 

3002 if any((instance['status'] != constants.STATUS_AVAILABLE) or 

3003 (instance['share_instance']['host'] is None) 

3004 for instance in snapshot.instances): 

3005 msg = _("Access rules cannot be denied while the snapshot or " 

3006 "any of its replicas or migration copies lacks a valid " 

3007 "host or is not in %s state.") % constants.STATUS_AVAILABLE 

3008 

3009 raise exception.InvalidShareSnapshotInstance(reason=msg) 

3010 

3011 for snapshot_instance in snapshot.instances: 

3012 rule = self.db.share_snapshot_instance_access_get( 

3013 context, access['id'], snapshot_instance['id']) 

3014 self.db.share_snapshot_instance_access_update( 

3015 context, rule['access_id'], snapshot_instance['id'], 

3016 {'state': constants.ACCESS_STATE_QUEUED_TO_DENY}) 

3017 self.share_rpcapi.snapshot_update_access( 

3018 context, snapshot_instance) 

3019 

3020 def snapshot_access_get_all(self, context, snapshot): 

3021 """Returns all access rules for share snapshot.""" 

3022 rules = self.db.share_snapshot_access_get_all_for_share_snapshot( 

3023 context, snapshot['id'], {}) 

3024 return rules 

3025 

3026 def snapshot_access_get(self, context, access_id): 

3027 """Returns snapshot access rule with the id.""" 

3028 rule = self.db.share_snapshot_access_get(context, access_id) 

3029 return rule 

3030 

3031 def snapshot_export_locations_get(self, context, snapshot): 

3032 return self.db.share_snapshot_export_locations_get(context, snapshot) 

3033 

3034 def snapshot_export_location_get(self, context, el_id): 

3035 return self.db.share_snapshot_instance_export_location_get(context, 

3036 el_id) 

3037 

3038 def share_server_migration_get_destination(self, context, source_server_id, 

3039 status=None): 

3040 """Returns destination share server for a share server migration.""" 

3041 

3042 filters = {'source_share_server_id': source_server_id} 

3043 if status: 3043 ↛ 3046line 3043 didn't jump to line 3046 because the condition on line 3043 was always true

3044 filters.update({'status': status}) 

3045 

3046 dest_share_servers = self.db.share_server_get_all_with_filters( 

3047 context, filters=filters) 

3048 if not dest_share_servers: 

3049 msg = _("A destination share server wasn't found for source " 

3050 "share server %s.") % source_server_id 

3051 raise exception.InvalidShareServer(reason=msg) 

3052 if len(dest_share_servers) > 1: 

3053 msg = _("More than one destination share server was found for " 

3054 "source share server %s. Aborting...") % source_server_id 

3055 raise exception.InvalidShareServer(reason=msg) 

3056 

3057 return dest_share_servers[0] 

3058 

3059 def get_share_server_migration_request_spec_dict( 

3060 self, context, share_instances, snapshot_instances, **kwargs): 

3061 """Returns request specs related to share server and all its shares.""" 

3062 

3063 shares_total_size = sum([instance.get('size', 0) 

3064 for instance in share_instances]) 

3065 snapshots_total_size = sum([instance.get('size', 0) 

3066 for instance in snapshot_instances]) 

3067 

3068 shares_req_spec = [] 

3069 for share_instance in share_instances: 

3070 share_type_id = share_instance['share_type_id'] 

3071 share_type = share_types.get_share_type(context, share_type_id) 

3072 req_spec = self._get_request_spec_dict(context, share_instance, 

3073 share_type, 

3074 **kwargs) 

3075 shares_req_spec.append(req_spec) 

3076 

3077 server_request_spec = { 

3078 'shares_size': shares_total_size, 

3079 'snapshots_size': snapshots_total_size, 

3080 'shares_req_spec': shares_req_spec, 

3081 } 

3082 return server_request_spec 

3083 

3084 def _migration_initial_checks(self, context, share_server, dest_host, 

3085 new_share_network): 

3086 shares = self.db.share_get_all_by_share_server( 

3087 context, share_server['id']) 

3088 

3089 shares_in_recycle_bin = self.db.share_get_all_soft_deleted( 

3090 context, share_server['id']) 

3091 

3092 if len(shares) == 0: 

3093 msg = _("Share server %s does not have shares." 

3094 % share_server['id']) 

3095 raise exception.InvalidShareServer(reason=msg) 

3096 

3097 if shares_in_recycle_bin: 3097 ↛ 3098line 3097 didn't jump to line 3098 because the condition on line 3097 was never true

3098 msg = _("Share server %s has at least one share that has " 

3099 "been soft deleted." % share_server['id']) 

3100 raise exception.InvalidShareServer(reason=msg) 

3101 

3102 # We only handle "active" share servers for now 

3103 if share_server['status'] != constants.STATUS_ACTIVE: 

3104 msg = _('Share server %(server_id)s status must be active, ' 

3105 'but current status is: %(server_status)s.') % { 

3106 'server_id': share_server['id'], 

3107 'server_status': share_server['status']} 

3108 raise exception.InvalidShareServer(reason=msg) 

3109 

3110 share_groups_related_to_share_server = ( 

3111 self.db.share_group_get_all_by_share_server( 

3112 context, share_server['id'])) 

3113 

3114 if share_groups_related_to_share_server: 

3115 msg = _("The share server %s can not be migrated because it is " 

3116 "related to a share group.") % share_server['id'] 

3117 raise exception.InvalidShareServer(reason=msg) 

3118 

3119 # Same backend and same network, nothing changes 

3120 src_backend = share_utils.extract_host(share_server['host'], 

3121 level='backend_name') 

3122 dest_backend = share_utils.extract_host(dest_host, 

3123 level='backend_name') 

3124 current_share_network_id = shares[0]['instance']['share_network_id'] 

3125 if (src_backend == dest_backend and 

3126 (new_share_network is None or 

3127 new_share_network['id'] == current_share_network_id)): 

3128 msg = _('There is no difference between source and destination ' 

3129 'backends and between source and destination share ' 

3130 'networks. Share server migration will not proceed.') 

3131 raise exception.InvalidShareServer(reason=msg) 

3132 

3133 filters = {'source_share_server_id': share_server['id'], 

3134 'status': constants.STATUS_SERVER_MIGRATING_TO} 

3135 dest_share_servers = self.db.share_server_get_all_with_filters( 

3136 context, filters=filters) 

3137 if len(dest_share_servers): 

3138 msg = _("There is at least one destination share server pointing " 

3139 "to this source share server. Clean up your environment " 

3140 "before starting a new migration.") 

3141 raise exception.InvalidShareServer(reason=msg) 

3142 

3143 dest_service_host = share_utils.extract_host(dest_host) 

3144 # Make sure the host is in the list of available hosts 

3145 utils.validate_service_host(context, dest_service_host) 

3146 

3147 service = self.db.service_get_by_args( 

3148 context, dest_service_host, 'manila-share') 

3149 

3150 # Get all share types 

3151 type_ids = set([share['instance']['share_type_id'] 

3152 for share in shares]) 

3153 types = [share_types.get_share_type(context, type_id) 

3154 for type_id in type_ids] 

3155 

3156 # Check if share type azs are supported by the destination host 

3157 for share_type in types: 

3158 azs = share_type['extra_specs'].get('availability_zones', '') 

3159 if azs and service['availability_zone']['name'] not in azs: 

3160 msg = _("Share server %(server)s cannot be migrated to host " 

3161 "%(dest)s because the share type %(type)s is used by " 

3162 "one of the shares, and this share type is not " 

3163 "supported within the availability zone (%(az)s) that " 

3164 "the host is in.") 

3165 type_name = '%s' % (share_type['name'] or '') 

3166 type_id = '(ID: %s)' % share_type['id'] 

3167 payload = {'type': '%s%s' % (type_name, type_id), 

3168 'az': service['availability_zone']['name'], 

3169 'server': share_server['id'], 

3170 'dest': dest_host} 

3171 raise exception.InvalidShareServer(reason=msg % payload) 

3172 

3173 if new_share_network: 

3174 new_share_network_id = new_share_network['id'] 

3175 else: 

3176 new_share_network_id = shares[0]['instance']['share_network_id'] 

3177 # NOTE(carloss): check if the new or old share network has a subnet 

3178 # that spans the availability zone of the destination host, otherwise 

3179 # we should deny this operation. 

3180 dest_az = self.db.availability_zone_get( 

3181 context, service['availability_zone']['name']) 

3182 compatible_subnets = ( 

3183 self.db.share_network_subnets_get_all_by_availability_zone_id( 

3184 context, new_share_network_id, dest_az['id'])) 

3185 

3186 if not compatible_subnets: 

3187 msg = _("The share network %(network)s does not have a subnet " 

3188 "that spans the destination host availability zone.") 

3189 payload = {'network': new_share_network_id} 

3190 raise exception.InvalidShareServer(reason=msg % payload) 

3191 

3192 net_changes_identified = False 

3193 if new_share_network: 

3194 net_changes_identified = not share_utils.is_az_subnets_compatible( 

3195 share_server['share_network_subnets'], compatible_subnets) 

3196 

3197 # NOTE(carloss): Refreshing the list of shares since something could've 

3198 # changed from the initial list. 

3199 shares = self.db.share_get_all_by_share_server( 

3200 context, share_server['id']) 

3201 for share in shares: 

3202 if share['status'] != constants.STATUS_AVAILABLE: 

3203 msg = _('Share %(share_id)s status must be available, ' 

3204 'but current status is: %(share_status)s.') % { 

3205 'share_id': share['id'], 

3206 'share_status': share['status']} 

3207 raise exception.InvalidShareServer(reason=msg) 

3208 

3209 if (not share_server.get( 

3210 'share_replicas_migration_support', False) and 

3211 share.has_replicas): 

3212 msg = _('Share %s has replicas. Remove the replicas of all ' 

3213 'shares in the share server before attempting to ' 

3214 'migrate it.') % share['id'] 

3215 LOG.error(msg) 

3216 raise exception.InvalidShareServer(reason=msg) 

3217 

3218 # NOTE(carloss): Not validating the flag preserve_snapshots at this 

3219 # point, considering that even if the admin set the value to False, 

3220 # the driver can still support preserving snapshots and the 

3221 # snapshots would be copied anyway. So the share/manager will be 

3222 # responsible for checking if the driver does not support snapshot 

3223 # preservation, and if there are snapshots in the share server. 

3224 share_snapshots = self.db.share_snapshot_get_all_for_share( 

3225 context, share['id']) 

3226 all_snapshots_are_available = all( 

3227 [snapshot['status'] == constants.STATUS_AVAILABLE 

3228 for snapshot in share_snapshots]) 

3229 if not all_snapshots_are_available: 3229 ↛ 3230line 3229 didn't jump to line 3230 because the condition on line 3229 was never true

3230 msg = _( 

3231 "All snapshots must have '%(status)s' status to be " 

3232 "migrated by the driver along with share " 

3233 "%(resource_id)s.") % { 

3234 'resource_id': share['id'], 

3235 'status': constants.STATUS_AVAILABLE, 

3236 } 

3237 LOG.error(msg) 

3238 raise exception.InvalidShareServer(reason=msg) 

3239 

3240 if share.get('share_group_id'): 

3241 msg = _('Share %s is a member of a group. This operation is ' 

3242 'not currently supported for share servers that ' 

3243 'contain shares members of groups.') % share['id'] 

3244 LOG.error(msg) 

3245 raise exception.InvalidShareServer(reason=msg) 

3246 

3247 share_instance = share['instance'] 

3248 # Access rules status must not be error 

3249 if share_instance['access_rules_status'] == constants.STATUS_ERROR: 3249 ↛ 3250line 3249 didn't jump to line 3250 because the condition on line 3249 was never true

3250 msg = _( 

3251 'Share instance %(instance_id)s access rules status must ' 

3252 'not be in %(error)s when attempting to start a share ' 

3253 'server migration.') % { 

3254 'instance_id': share_instance['id'], 

3255 'error': constants.STATUS_ERROR} 

3256 raise exception.InvalidShareServer(reason=msg) 

3257 try: 

3258 self._check_is_share_busy(share) 

3259 except exception.ShareBusyException as e: 

3260 raise exception.InvalidShareServer(reason=e.msg) 

3261 

3262 return ( 

3263 shares, types, service, new_share_network_id, 

3264 net_changes_identified) 

3265 

3266 def share_server_migration_check(self, context, share_server, dest_host, 

3267 writable, nondisruptive, 

3268 preserve_snapshots, 

3269 new_share_network=None): 

3270 """Migrates share server to a new host.""" 

3271 shares, types, service, new_share_network_id, net_params_changed = ( 

3272 self._migration_initial_checks(context, share_server, dest_host, 

3273 new_share_network)) 

3274 

3275 # If a nondisruptive migration was requested and different neutron net 

3276 # id and neutron subnet ids were identified 

3277 if net_params_changed and nondisruptive: 

3278 result = { 

3279 'compatible': False, 

3280 'writable': False, 

3281 'nondisruptive': False, 

3282 'preserve_snapshots': False, 

3283 'migration_cancel': False, 

3284 'migration_get_progress': False, 

3285 'share_network_id': new_share_network_id 

3286 } 

3287 return result 

3288 

3289 # NOTE(dviroel): Service is up according to validations made on initial 

3290 # checks 

3291 result = self.share_rpcapi.share_server_migration_check( 

3292 context, share_server['id'], dest_host, writable, nondisruptive, 

3293 preserve_snapshots, new_share_network_id) 

3294 

3295 # NOTE(carloss): In case users haven't requested a nondisruptive 

3296 # migration and a network change was identified, we must get the 

3297 # driver's check result and if there is need to, manipulate it. 

3298 # The result is provided by the driver and based on the back end 

3299 # possibility to perform a nondisruptive migration or not. If 

3300 # a network change was provided, we know that the migration will be 

3301 # disruptive, so in order to do not confuse the user, we must present 

3302 # the share server migration as disruptive 

3303 if result.get('nondisruptive') and net_params_changed: 3303 ↛ 3304line 3303 didn't jump to line 3304 because the condition on line 3303 was never true

3304 result['nondisruptive'] = False 

3305 

3306 return result 

3307 

3308 def share_server_migration_start( 

3309 self, context, share_server, dest_host, writable, nondisruptive, 

3310 preserve_snapshots, new_share_network=None): 

3311 """Migrates share server to a new host.""" 

3312 

3313 shares, types, service, new_share_network_id, net_params_changed = ( 

3314 self._migration_initial_checks(context, share_server, 

3315 dest_host, 

3316 new_share_network)) 

3317 

3318 if nondisruptive and net_params_changed: 

3319 msg = _("Nondisruptive migration would only be feasible when the " 

3320 "current and new share networks carry the same " 

3321 "'neutron_net_id' and 'neutron_subnet_id', or when no " 

3322 "network changes are occurring.") 

3323 raise exception.InvalidInput(reason=msg) 

3324 

3325 # Updates the share server status to migration starting 

3326 self.db.share_server_update( 

3327 context, share_server['id'], 

3328 {'task_state': constants.TASK_STATE_MIGRATION_STARTING, 

3329 'status': constants.STATUS_SERVER_MIGRATING}) 

3330 

3331 share_instances = self.db.share_instance_get_all_by_share_server( 

3332 context, share_server['id']) 

3333 share_instance_ids = [ 

3334 share_instance['id'] for share_instance in share_instances] 

3335 

3336 snap_instances = self.db.share_snapshot_instance_get_all_with_filters( 

3337 context, {'share_instance_ids': share_instance_ids}) 

3338 snapshot_instance_ids = [ 

3339 snap_instance['id'] for snap_instance in snap_instances] 

3340 

3341 # Updates all shares and snapshot instances 

3342 self.db.share_and_snapshot_instances_status_update( 

3343 context, {'status': constants.STATUS_SERVER_MIGRATING}, 

3344 share_instance_ids=share_instance_ids, 

3345 snapshot_instance_ids=snapshot_instance_ids, 

3346 current_expected_status=constants.STATUS_AVAILABLE 

3347 ) 

3348 

3349 # NOTE(dviroel): Service is up according to validations made on initial 

3350 # checks 

3351 self.share_rpcapi.share_server_migration_start( 

3352 context, share_server, dest_host, writable, nondisruptive, 

3353 preserve_snapshots, new_share_network_id) 

3354 

3355 def share_server_migration_complete(self, context, share_server): 

3356 """Invokes 2nd phase of share server migration.""" 

3357 if share_server['status'] != constants.STATUS_SERVER_MIGRATING: 

3358 msg = _("Share server %s is not migrating") % share_server['id'] 

3359 LOG.error(msg) 

3360 raise exception.InvalidShareServer(reason=msg) 

3361 if (share_server['task_state'] != 

3362 constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE): 

3363 msg = _("The first phase of migration has to finish to " 

3364 "request the completion of server %s's " 

3365 "migration.") % share_server['id'] 

3366 LOG.error(msg) 

3367 raise exception.InvalidShareServer(reason=msg) 

3368 

3369 dest_share_server = self.share_server_migration_get_destination( 

3370 context, share_server['id'], 

3371 status=constants.STATUS_SERVER_MIGRATING_TO 

3372 ) 

3373 

3374 dest_host = share_utils.extract_host(dest_share_server['host']) 

3375 utils.validate_service_host(context, dest_host) 

3376 

3377 self.share_rpcapi.share_server_migration_complete( 

3378 context, dest_share_server['host'], share_server, 

3379 dest_share_server) 

3380 

3381 return { 

3382 'destination_share_server_id': dest_share_server['id'] 

3383 } 

3384 

3385 def share_server_migration_cancel(self, context, share_server): 

3386 """Attempts to cancel share server migration.""" 

3387 if share_server['status'] != constants.STATUS_SERVER_MIGRATING: 

3388 msg = _("Migration of share server %s cannot be cancelled because " 

3389 "the provided share server is not being migrated." 

3390 % (share_server['id'])) 

3391 LOG.error(msg) 

3392 raise exception.InvalidShareServer(reason=msg) 

3393 

3394 if share_server['task_state'] in ( 

3395 constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE, 

3396 constants.TASK_STATE_MIGRATION_DRIVER_IN_PROGRESS): 

3397 

3398 dest_share_server = self.share_server_migration_get_destination( 

3399 context, share_server['id'], 

3400 status=constants.STATUS_SERVER_MIGRATING_TO 

3401 ) 

3402 

3403 dest_host = share_utils.extract_host(dest_share_server['host']) 

3404 utils.validate_service_host(context, dest_host) 

3405 

3406 self.share_rpcapi.share_server_migration_cancel( 

3407 context, dest_share_server['host'], share_server, 

3408 dest_share_server) 

3409 else: 

3410 msg = self._migration_validate_error_message( 

3411 share_server, resource_type='share_server') 

3412 if msg is None: 3412 ↛ 3417line 3412 didn't jump to line 3417 because the condition on line 3412 was always true

3413 msg = _("Migration of share server %s can be cancelled only " 

3414 "after the driver already started the migration, or " 

3415 "when the first phase of the migration gets " 

3416 "completed.") % share_server['id'] 

3417 LOG.error(msg) 

3418 raise exception.InvalidShareServer(reason=msg) 

3419 

3420 def share_server_migration_get_progress(self, context, 

3421 src_share_server_id): 

3422 """Retrieve migration progress for a given share server.""" 

3423 try: 

3424 share_server = self.db.share_server_get(context, 

3425 src_share_server_id) 

3426 except exception.ShareServerNotFound: 

3427 msg = _('Share server %s was not found. We will search for a ' 

3428 'successful migration') % src_share_server_id 

3429 LOG.debug(msg) 

3430 # Search for a successful migration, raise an error if not found 

3431 dest_share_server = self.share_server_migration_get_destination( 

3432 context, src_share_server_id, 

3433 status=constants.STATUS_ACTIVE 

3434 ) 

3435 return { 

3436 'total_progress': 100, 

3437 'destination_share_server_id': dest_share_server['id'], 

3438 'task_state': dest_share_server['task_state'], 

3439 } 

3440 # Source server still exists so it must be in 'server_migrating' status 

3441 if share_server['status'] != constants.STATUS_SERVER_MIGRATING: 

3442 msg = _("Migration progress of share server %s cannot be " 

3443 "obtained. The provided share server is not being " 

3444 "migrated.") % share_server['id'] 

3445 LOG.error(msg) 

3446 raise exception.InvalidShareServer(reason=msg) 

3447 

3448 try: 

3449 dest_share_server = self.share_server_migration_get_destination( 

3450 context, share_server['id'], 

3451 status=constants.STATUS_SERVER_MIGRATING_TO 

3452 ) 

3453 except Exception: 

3454 msg = ("Migration progress of share server %s cannot be " 

3455 "determined yet. Please retry the migration get " 

3456 "progress operation.") % share_server['id'] 

3457 LOG.info(msg) 

3458 

3459 result = { 

3460 'destination_share_server_id': '', 

3461 'task_state': '' 

3462 } 

3463 

3464 result.update(self._migration_get_progress_state(share_server)) 

3465 

3466 return result 

3467 

3468 if (share_server['task_state'] == 

3469 constants.TASK_STATE_MIGRATION_DRIVER_IN_PROGRESS): 

3470 

3471 dest_host = share_utils.extract_host(dest_share_server['host']) 

3472 utils.validate_service_host(context, dest_host) 

3473 

3474 try: 

3475 result = ( 

3476 self.share_rpcapi.share_server_migration_get_progress( 

3477 context, dest_share_server['host'], 

3478 share_server, dest_share_server)) 

3479 except Exception: 

3480 msg = _("Failed to obtain migration progress of share " 

3481 "server %s.") % share_server['id'] 

3482 LOG.exception(msg) 

3483 raise exception.ShareServerMigrationError(reason=msg) 

3484 

3485 else: 

3486 result = self._migration_get_progress_state(share_server) 

3487 

3488 if not (result and result.get('total_progress') is not None): 

3489 msg = self._migration_validate_error_message( 

3490 share_server, resource_type='share_server') 

3491 if msg is None: 3491 ↛ 3492line 3491 didn't jump to line 3492 because the condition on line 3491 was never true

3492 msg = _("Migration progress of share server %s cannot be " 

3493 "obtained at this moment.") % share_server['id'] 

3494 LOG.error(msg) 

3495 raise exception.InvalidShareServer(reason=msg) 

3496 

3497 result.update({ 

3498 'destination_share_server_id': dest_share_server['id'], 

3499 'task_state': dest_share_server['task_state'] 

3500 }) 

3501 return result 

3502 

3503 def _share_network_update_initial_checks(self, context, share_network, 

3504 new_security_service, 

3505 current_security_service=None): 

3506 api_common.check_share_network_is_active(share_network) 

3507 

3508 if not current_security_service: 

3509 # Since we are adding a new security service, we can't have one 

3510 # of the same type already associated with this share network 

3511 for attached_service in share_network['security_services']: 

3512 if attached_service['type'] == new_security_service['type']: 3512 ↛ 3511line 3512 didn't jump to line 3511 because the condition on line 3512 was always true

3513 msg = _("Cannot add security service to share network. " 

3514 "Security service with '%(ss_type)s' type already " 

3515 "added to '%(sn_id)s' share network") % { 

3516 'ss_type': new_security_service['type'], 

3517 'sn_id': share_network['id'] 

3518 } 

3519 raise exception.InvalidSecurityService(reason=msg) 

3520 else: 

3521 # Validations needed only for update operation 

3522 current_service_is_associated = ( 

3523 self.db.share_network_security_service_association_get( 

3524 context, share_network['id'], 

3525 current_security_service['id'])) 

3526 

3527 if not current_service_is_associated: 3527 ↛ 3528line 3527 didn't jump to line 3528 because the condition on line 3527 was never true

3528 msg = _("The specified current security service %(service)s " 

3529 "is not associated to the share network %(network)s." 

3530 ) % { 

3531 'service': current_security_service['id'], 

3532 'network': share_network['id'] 

3533 } 

3534 raise exception.InvalidSecurityService(reason=msg) 

3535 

3536 if (current_security_service['type'] != 3536 ↛ 3548line 3536 didn't jump to line 3548 because the condition on line 3536 was always true

3537 new_security_service['type']): 

3538 msg = _("A security service can only be replaced by one of " 

3539 "the same type. The current security service type is " 

3540 "'%(ss_type)s' and the new security service type is " 

3541 "'%(new_ss_type)s'") % { 

3542 'ss_type': current_security_service['type'], 

3543 'new_ss_type': new_security_service['type'], 

3544 'sn_id': share_network['id'] 

3545 } 

3546 raise exception.InvalidSecurityService(reason=msg) 

3547 

3548 share_servers = set() 

3549 for subnet in share_network['share_network_subnets']: 

3550 if subnet['share_servers']: 3550 ↛ 3549line 3550 didn't jump to line 3549 because the condition on line 3550 was always true

3551 share_servers.update(subnet['share_servers']) 

3552 

3553 backend_hosts = set() 

3554 if share_servers: 3554 ↛ 3632line 3554 didn't jump to line 3632 because the condition on line 3554 was always true

3555 if not share_network['security_service_update_support']: 

3556 msg = _("Updating security services is not supported on this " 

3557 "share network (%(sn_id)s) while it has shares. " 

3558 "See the capability " 

3559 "'security_service_update_support'.") % { 

3560 "sn_id": share_network["id"] 

3561 } 

3562 raise exception.InvalidShareNetwork(reason=msg) 

3563 

3564 # We can only handle "active" share servers for now 

3565 for share_server in share_servers: 

3566 if share_server['status'] != constants.STATUS_ACTIVE: 

3567 msg = _('Some resources exported on share network ' 

3568 '%(shar_net_id)s are not currently available.') % { 

3569 'shar_net_id': share_network['id'] 

3570 } 

3571 raise exception.InvalidShareNetwork(reason=msg) 

3572 # Create a set of backend hosts 

3573 backend_hosts.add(share_server['host']) 

3574 

3575 for backend_host in backend_hosts: 

3576 # We need an admin context to validate these hosts 

3577 admin_ctx = manila_context.get_admin_context() 

3578 # Make sure the host is in the list of available hosts 

3579 utils.validate_service_host(admin_ctx, backend_host) 

3580 

3581 shares_in_recycle_bin = ( 

3582 self.db.share_get_all_soft_deleted_by_network( 

3583 context, share_network['id'])) 

3584 if shares_in_recycle_bin: 3584 ↛ 3585line 3584 didn't jump to line 3585 because the condition on line 3584 was never true

3585 msg = _("Some shares with share network %(sn_id)s have " 

3586 "been soft deleted.") % {'sn_id': share_network['id']} 

3587 raise exception.InvalidShareNetwork(reason=msg) 

3588 

3589 shares = self.get_all( 

3590 context, search_opts={'share_network_id': share_network['id']}) 

3591 shares_not_available = [ 

3592 share['id'] for share in shares if 

3593 share['status'] != constants.STATUS_AVAILABLE] 

3594 

3595 if shares_not_available: 

3596 msg = _("Some shares exported on share network %(sn_id)s are " 

3597 "not available: %(share_ids)s.") % { 

3598 'sn_id': share_network['id'], 

3599 'share_ids': shares_not_available, 

3600 } 

3601 raise exception.InvalidShareNetwork(reason=msg) 

3602 

3603 shares_rules_not_available = [ 

3604 share['id'] for share in shares if 

3605 share['instance'][ 

3606 'access_rules_status'] != constants.STATUS_ACTIVE] 

3607 

3608 if shares_rules_not_available: 

3609 msg = _( 

3610 "Either these shares or one of their replicas or " 

3611 "migration copies exported on share network %(sn_id)s " 

3612 "are not available: %(share_ids)s.") % { 

3613 'sn_id': share_network['id'], 

3614 'share_ids': shares_rules_not_available, 

3615 } 

3616 raise exception.InvalidShareNetwork(reason=msg) 

3617 

3618 busy_shares = [] 

3619 for share in shares: 

3620 try: 

3621 self._check_is_share_busy(share) 

3622 except exception.ShareBusyException: 

3623 busy_shares.append(share['id']) 

3624 if busy_shares: 3624 ↛ 3632line 3624 didn't jump to line 3632 because the condition on line 3624 was always true

3625 msg = _("Some shares exported on share network %(sn_id)s " 

3626 "are busy: %(share_ids)s.") % { 

3627 'sn_id': share_network['id'], 

3628 'share_ids': busy_shares, 

3629 } 

3630 raise exception.InvalidShareNetwork(reason=msg) 

3631 

3632 return list(share_servers), list(backend_hosts) 

3633 

3634 def get_security_service_update_key( 

3635 self, operation, new_security_service_id, 

3636 current_security_service_id=None): 

3637 if current_security_service_id: 3637 ↛ 3642line 3637 didn't jump to line 3642 because the condition on line 3637 was always true

3638 return ('share_network_sec_service_update_' + 

3639 current_security_service_id + '_' + 

3640 new_security_service_id + '_' + operation) 

3641 else: 

3642 return ('share_network_sec_service_add_' + 

3643 new_security_service_id + '_' + operation) 

3644 

3645 @locked_security_service_update_operation 

3646 def _security_service_update_validate_hosts( 

3647 self, context, share_network, 

3648 backend_hosts, share_servers, 

3649 new_security_service_id=None, 

3650 current_security_service_id=None): 

3651 

3652 # create a key based on users request 

3653 update_key = self.get_security_service_update_key( 

3654 'hosts_check', new_security_service_id, 

3655 current_security_service_id=current_security_service_id) 

3656 

3657 return self._do_update_validate_hosts( 

3658 context, share_network['id'], backend_hosts, update_key, 

3659 new_security_service_id=new_security_service_id, 

3660 current_security_service_id=current_security_service_id) 

3661 

3662 def _do_update_validate_hosts( 

3663 self, context, share_network_id, 

3664 backend_hosts, update_key, new_share_network_subnet=None, 

3665 new_security_service_id=None, current_security_service_id=None): 

3666 

3667 # check if there is an entry being processed. 

3668 update_value = self.db.async_operation_data_get( 

3669 context, share_network_id, update_key) 

3670 if not update_value: 

3671 # Create a new entry, send all asynchronous rpcs and return. 

3672 hosts_to_validate = {} 

3673 for host in backend_hosts: 

3674 hosts_to_validate[host] = None 

3675 self.db.async_operation_data_update( 

3676 context, share_network_id, 

3677 {update_key: json.dumps(hosts_to_validate)}) 

3678 for host in backend_hosts: 

3679 if new_share_network_subnet: 

3680 (self.share_rpcapi. 

3681 check_update_share_server_network_allocations( 

3682 context, host, share_network_id, 

3683 new_share_network_subnet)) 

3684 else: 

3685 (self.share_rpcapi. 

3686 check_update_share_network_security_service( 

3687 context, host, share_network_id, 

3688 new_security_service_id, 

3689 current_security_service_id=( 

3690 current_security_service_id))) 

3691 return None, hosts_to_validate 

3692 

3693 else: 

3694 # process current existing hosts and update them if needed. 

3695 current_hosts = json.loads(update_value) 

3696 hosts_to_include = ( 

3697 set(backend_hosts).difference(set(current_hosts.keys()))) 

3698 hosts_to_validate = {} 

3699 for host in backend_hosts: 

3700 hosts_to_validate[host] = current_hosts.get(host, None) 

3701 

3702 # Check if there is any unsupported host. 

3703 if any(hosts_to_validate[host] is False for host in backend_hosts): 

3704 return False, hosts_to_validate 

3705 

3706 # Update the list of hosts to be validated. 

3707 if hosts_to_include: 

3708 self.db.async_operation_data_update( 

3709 context, share_network_id, 

3710 {update_key: json.dumps(hosts_to_validate)}) 

3711 

3712 for host in hosts_to_include: 

3713 # send asynchronous check only for new backend hosts. 

3714 if new_share_network_subnet: 

3715 (self.share_rpcapi. 

3716 check_update_share_server_network_allocations( 

3717 context, host, share_network_id, 

3718 new_share_network_subnet)) 

3719 else: 

3720 (self.share_rpcapi. 

3721 check_update_share_network_security_service( 

3722 context, host, share_network_id, 

3723 new_security_service_id, 

3724 current_security_service_id=( 

3725 current_security_service_id))) 

3726 

3727 return None, hosts_to_validate 

3728 

3729 if all(hosts_to_validate[host] for host in backend_hosts): 

3730 return True, hosts_to_validate 

3731 

3732 return None, current_hosts 

3733 

3734 def check_share_network_security_service_update( 

3735 self, context, share_network, new_security_service, 

3736 current_security_service=None, reset_operation=False): 

3737 share_servers, backend_hosts = ( 

3738 self._share_network_update_initial_checks( 

3739 context, share_network, new_security_service, 

3740 current_security_service=current_security_service)) 

3741 

3742 if not backend_hosts: 

3743 # There is no backend host to validate. Operation is supported. 

3744 return { 

3745 'compatible': True, 

3746 'hosts_check_result': {}, 

3747 } 

3748 curr_sec_serv_id = ( 

3749 current_security_service['id'] 

3750 if current_security_service else None) 

3751 key = self.get_security_service_update_key( 

3752 'hosts_check', new_security_service['id'], 

3753 current_security_service_id=curr_sec_serv_id) 

3754 if reset_operation: 

3755 self.db.async_operation_data_delete(context, share_network['id'], 

3756 key) 

3757 try: 

3758 compatible, hosts_info = ( 

3759 self._security_service_update_validate_hosts( 

3760 context, share_network, backend_hosts, share_servers, 

3761 new_security_service_id=new_security_service['id'], 

3762 current_security_service_id=curr_sec_serv_id)) 

3763 except Exception as e: 

3764 LOG.error(e) 

3765 # Due to an internal error, we will delete the entry 

3766 self.db.async_operation_data_delete( 

3767 context, share_network['id'], key) 

3768 msg = _( 

3769 'The share network %(share_net_id)s cannot be updated ' 

3770 'since at least one of its backend hosts do not support ' 

3771 'this operation.') % { 

3772 'share_net_id': share_network['id']} 

3773 raise exception.InvalidShareNetwork(reason=msg) 

3774 

3775 return { 

3776 'compatible': compatible, 

3777 'hosts_check_result': hosts_info 

3778 } 

3779 

3780 def update_share_network_security_service(self, context, share_network, 

3781 new_security_service, 

3782 current_security_service=None): 

3783 share_servers, backend_hosts = ( 

3784 self._share_network_update_initial_checks( 

3785 context, share_network, new_security_service, 

3786 current_security_service=current_security_service)) 

3787 if not backend_hosts: 

3788 # There is no backend host to validate or update. 

3789 return 

3790 

3791 curr_sec_serv_id = ( 

3792 current_security_service['id'] 

3793 if current_security_service else None) 

3794 

3795 update_key = self.get_security_service_update_key( 

3796 'hosts_check', new_security_service['id'], 

3797 current_security_service_id=curr_sec_serv_id) 

3798 # check if there is an entry being processed at this moment 

3799 update_value = self.db.async_operation_data_get( 

3800 context, share_network['id'], update_key) 

3801 if not update_value: 

3802 msg = _( 

3803 'The share network %(share_net_id)s cannot start the update ' 

3804 'process since no check operation was found. Before starting ' 

3805 'the update operation, a "check" operation must be triggered ' 

3806 'to validate if all backend hosts support the provided ' 

3807 'configuration paramaters.') % { 

3808 'share_net_id': share_network['id'] 

3809 } 

3810 raise exception.InvalidShareNetwork(reason=msg) 

3811 

3812 try: 

3813 result, __ = self._security_service_update_validate_hosts( 

3814 context, share_network, backend_hosts, share_servers, 

3815 new_security_service_id=new_security_service['id'], 

3816 current_security_service_id=curr_sec_serv_id) 

3817 except Exception: 

3818 # Due to an internal error, we will delete the entry 

3819 self.db.async_operation_data_delete( 

3820 context, share_network['id'], update_key) 

3821 msg = _( 

3822 'The share network %(share_net_id)s cannot be updated ' 

3823 'since at least one of its backend hosts do not support ' 

3824 'this operation.') % { 

3825 'share_net_id': share_network['id']} 

3826 raise exception.InvalidShareNetwork(reason=msg) 

3827 

3828 if result is False: 

3829 msg = _( 

3830 'The share network %(share_net_id)s cannot be updated ' 

3831 'since at least one of its backend hosts do not support ' 

3832 'this operation.') % { 

3833 'share_net_id': share_network['id']} 

3834 raise exception.InvalidShareNetwork(reason=msg) 

3835 elif result is None: 3835 ↛ 3836line 3835 didn't jump to line 3836 because the condition on line 3835 was never true

3836 msg = _( 

3837 'Not all of the validation has been completed yet. A ' 

3838 'validation check is in progress. This operation can be ' 

3839 'retried.') 

3840 raise exception.InvalidShareNetwork(reason=msg) 

3841 

3842 self.db.share_network_update( 

3843 context, share_network['id'], 

3844 {'status': constants.STATUS_NETWORK_CHANGE}) 

3845 

3846 # NOTE(dviroel): We want to change the status for all share servers to 

3847 # identify when all modifications are made, and update share network 

3848 # status to 'active' again. 

3849 share_servers_ids = [ss.id for ss in share_servers] 

3850 self.db.share_servers_update( 

3851 context, share_servers_ids, 

3852 {'status': constants.STATUS_SERVER_NETWORK_CHANGE}) 

3853 

3854 for backend_host in backend_hosts: 

3855 self.share_rpcapi.update_share_network_security_service( 

3856 context, backend_host, share_network['id'], 

3857 new_security_service['id'], 

3858 current_security_service_id=curr_sec_serv_id) 

3859 

3860 # Erase db entry, since we won't need it anymore 

3861 self.db.async_operation_data_delete( 

3862 context, share_network['id'], update_key) 

3863 

3864 LOG.info('Security service update has been started for share network ' 

3865 '%(share_net_id)s.', {'share_net_id': share_network['id']}) 

3866 

3867 @locked_share_server_update_allocations_operation 

3868 def _share_server_update_allocations_validate_hosts( 

3869 self, context, backend_hosts, update_key, share_network_id=None, 

3870 neutron_net_id=None, neutron_subnet_id=None, 

3871 availability_zone_id=None): 

3872 

3873 new_share_network_subnet = { 

3874 'neutron_net_id': neutron_net_id, 

3875 'neutron_subnet_id': neutron_subnet_id, 

3876 'availability_zone_id': availability_zone_id, 

3877 } 

3878 return self._do_update_validate_hosts( 

3879 context, share_network_id, backend_hosts, update_key, 

3880 new_share_network_subnet=new_share_network_subnet) 

3881 

3882 def get_share_server_update_allocations_key( 

3883 self, share_network_id, availability_zone_id): 

3884 return ('share_server_update_allocations_' + share_network_id + '_' + 

3885 str(availability_zone_id) + '_' + 'hosts_check') 

3886 

3887 def _share_server_update_allocations_initial_checks( 

3888 self, context, share_network, share_servers): 

3889 

3890 api_common.check_share_network_is_active(share_network) 

3891 if not share_network['network_allocation_update_support']: 

3892 msg = _("Updating network allocations is not supported on this " 

3893 "share network (%(sn_id)s) while it has shares. " 

3894 "See the capability 'network_allocation_update_support'." 

3895 ) % {"sn_id": share_network["id"]} 

3896 raise exception.InvalidShareNetwork(reason=msg) 

3897 

3898 backend_hosts = set() 

3899 for share_server in share_servers: 

3900 share_server_id = share_server['id'] 

3901 if share_server['status'] != constants.STATUS_ACTIVE: 

3902 msg = _('The share server %(server)s in the specified ' 

3903 'availability zone subnet is not currently ' 

3904 'available.') % {'server': share_server_id} 

3905 raise exception.InvalidShareNetwork(reason=msg) 

3906 

3907 # We need an admin context to validate these hosts. 

3908 admin_ctx = manila_context.get_admin_context() 

3909 # Make sure the host is in the list of available hosts. 

3910 utils.validate_service_host(admin_ctx, share_server['host']) 

3911 

3912 # Create a set of backend hosts. 

3913 backend_hosts.add(share_server['host']) 

3914 

3915 shares = self.db.share_get_all_by_share_server( 

3916 context, share_server_id) 

3917 shares_not_available = [ 

3918 share['id'] 

3919 for share in shares if 

3920 share['status'] != constants.STATUS_AVAILABLE] 

3921 

3922 if shares_not_available: 

3923 msg = _("The share server (%(server_id)s) in the specified " 

3924 "availability zone subnet has some shares that are " 

3925 "not available: " 

3926 "%(share_ids)s.") % { 

3927 'server_id': share_server_id, 

3928 'share_ids': shares_not_available, 

3929 } 

3930 raise exception.InvalidShareNetwork(reason=msg) 

3931 

3932 shares_rules_not_available = [ 

3933 share['id'] for share in shares if 

3934 share['instance'][ 

3935 'access_rules_status'] != constants.STATUS_ACTIVE] 

3936 

3937 if shares_rules_not_available: 

3938 msg = _("The share server (%(server_id)s) in the specified " 

3939 "availability zone subnet has either these shares or " 

3940 "one of their replicas or migration copies that are " 

3941 "not available: %(share_ids)s.") % { 

3942 'server_id': share_server_id, 

3943 'share_ids': shares_rules_not_available, 

3944 } 

3945 raise exception.InvalidShareNetwork(reason=msg) 

3946 

3947 busy_shares = [] 

3948 for share in shares: 

3949 try: 

3950 self._check_is_share_busy(share) 

3951 except exception.ShareBusyException: 

3952 busy_shares.append(share['id']) 

3953 if busy_shares: 

3954 msg = _("The share server (%(server_id)s) in the specified " 

3955 "availability zone subnet has some shares that are " 

3956 "busy as part of an active task: " 

3957 "%(share_ids)s.") % { 

3958 'server_id': share_server_id, 

3959 'share_ids': busy_shares, 

3960 } 

3961 raise exception.InvalidShareNetwork(reason=msg) 

3962 

3963 return backend_hosts 

3964 

3965 def check_update_share_server_network_allocations( 

3966 self, context, share_network, new_share_network_subnet, 

3967 reset_operation): 

3968 

3969 backend_hosts = self._share_server_update_allocations_initial_checks( 

3970 context, share_network, new_share_network_subnet['share_servers']) 

3971 

3972 update_key = self.get_share_server_update_allocations_key( 

3973 share_network['id'], 

3974 new_share_network_subnet['availability_zone_id']) 

3975 if reset_operation: 3975 ↛ 3978line 3975 didn't jump to line 3978 because the condition on line 3975 was always true

3976 self.db.async_operation_data_delete(context, share_network['id'], 

3977 update_key) 

3978 try: 

3979 compatible, hosts_info = ( 

3980 self._share_server_update_allocations_validate_hosts( 

3981 context, backend_hosts, update_key, 

3982 share_network_id=share_network['id'], 

3983 neutron_net_id=( 

3984 new_share_network_subnet.get('neutron_net_id')), 

3985 neutron_subnet_id=( 

3986 new_share_network_subnet.get('neutron_subnet_id')), 

3987 availability_zone_id=new_share_network_subnet.get( 

3988 "availability_zone_id"))) 

3989 except Exception as e: 

3990 LOG.exception(e) 

3991 # Due to an internal error, we will delete the entry. 

3992 self.db.async_operation_data_delete( 

3993 context, share_network['id'], update_key) 

3994 msg = _( 

3995 "The server's allocations cannot be updated on availability " 

3996 "zone %(zone_id)s of the share network %(share_net_id)s, " 

3997 "since at least one of its backend hosts do not support this " 

3998 "operation.") % { 

3999 'share_net_id': share_network['id'], 

4000 'zone_id': new_share_network_subnet['availability_zone_id']} 

4001 raise exception.InvalidShareNetwork(reason=msg) 

4002 

4003 return { 

4004 'compatible': compatible, 

4005 'hosts_check_result': hosts_info 

4006 } 

4007 

4008 def update_share_server_network_allocations( 

4009 self, context, share_network, new_share_network_subnet): 

4010 

4011 backend_hosts = self._share_server_update_allocations_initial_checks( 

4012 context, share_network, new_share_network_subnet['share_servers']) 

4013 

4014 update_key = self.get_share_server_update_allocations_key( 

4015 share_network['id'], 

4016 new_share_network_subnet['availability_zone_id']) 

4017 

4018 # check if there is an entry being processed at this moment. 

4019 update_value = self.db.async_operation_data_get( 

4020 context, share_network['id'], update_key) 

4021 if not update_value: 

4022 msg = _( 

4023 'The share network %(share_net_id)s cannot start the update ' 

4024 'process since no check operation was found. Before starting ' 

4025 'the update operation, a "check" operation must be triggered ' 

4026 'to validate if all backend hosts support the provided ' 

4027 'configuration paramaters.') % { 

4028 'share_net_id': share_network['id'] 

4029 } 

4030 raise exception.InvalidShareNetwork(reason=msg) 

4031 

4032 subnet_info = { 

4033 'availability_zone_id': 

4034 new_share_network_subnet.get("availability_zone_id"), 

4035 'neutron_net_id': 

4036 new_share_network_subnet.get('neutron_net_id'), 

4037 'neutron_subnet_id': 

4038 new_share_network_subnet.get('neutron_subnet_id'), 

4039 } 

4040 try: 

4041 result, __ = self._share_server_update_allocations_validate_hosts( 

4042 context, backend_hosts, update_key, 

4043 share_network_id=share_network['id'], 

4044 neutron_net_id=( 

4045 new_share_network_subnet.get('neutron_net_id')), 

4046 neutron_subnet_id=( 

4047 new_share_network_subnet.get('neutron_subnet_id')), 

4048 availability_zone_id=new_share_network_subnet.get( 

4049 "availability_zone_id")) 

4050 except Exception: 

4051 # Due to an internal error, we will delete the entry. 

4052 self.db.async_operation_data_delete( 

4053 context, share_network['id'], update_key) 

4054 msg = _( 

4055 "The server's allocations cannot be updated on availability " 

4056 "zone %(zone_id)s of the share network %(share_net_id)s, " 

4057 "since an internal error occurred." 

4058 "operation.") % { 

4059 'share_net_id': share_network['id'], 

4060 'zone_id': subnet_info['availability_zone_id'] 

4061 } 

4062 raise exception.InvalidShareNetwork(reason=msg) 

4063 

4064 if result is False: 

4065 msg = _( 

4066 "The server's allocations cannot be updated on availability " 

4067 "zone %(zone_id)s of the share network %(share_net_id)s, " 

4068 "since at least one of its backend hosts do not support this " 

4069 "operation.") % { 

4070 'share_net_id': share_network['id'], 

4071 'zone_id': subnet_info['availability_zone_id'] 

4072 } 

4073 raise exception.InvalidShareNetwork(reason=msg) 

4074 elif result is None: 

4075 msg = _( 

4076 'Not all of the validation has been completed yet. A ' 

4077 'validation check is in progress. This operation can be ' 

4078 'retried.') 

4079 raise exception.InvalidShareNetwork(reason=msg) 

4080 

4081 # change db to start the update. 

4082 self.db.share_network_update( 

4083 context, share_network['id'], 

4084 {'status': constants.STATUS_NETWORK_CHANGE}) 

4085 share_servers_ids = [ss['id'] for ss in 

4086 new_share_network_subnet['share_servers']] 

4087 self.db.share_servers_update( 

4088 context, share_servers_ids, 

4089 {'status': constants.STATUS_SERVER_NETWORK_CHANGE}) 

4090 

4091 # create the new subnet. 

4092 new_share_network_subnet_db = self.db.share_network_subnet_create( 

4093 context, new_share_network_subnet) 

4094 

4095 # triggering the actual update. 

4096 for backend_host in backend_hosts: 

4097 self.share_rpcapi.update_share_server_network_allocations( 

4098 context, backend_host, share_network['id'], 

4099 new_share_network_subnet_db['id']) 

4100 

4101 # Erase db entry, since we won't need it anymore. 

4102 self.db.async_operation_data_delete( 

4103 context, share_network['id'], update_key) 

4104 

4105 LOG.info('Share servers allocations update have been started for ' 

4106 'share network %(share_net_id)s on its availability zone ' 

4107 '%(az_id)s with new subnet %(subnet_id)s.', 

4108 { 

4109 'share_net_id': share_network['id'], 

4110 'az_id': new_share_network_subnet['availability_zone_id'], 

4111 'subnet_id': new_share_network_subnet_db['id'], 

4112 }) 

4113 return new_share_network_subnet_db 

4114 

4115 def create_share_backup(self, context, share, backup): 

4116 share_id = share['id'] 

4117 self._check_is_share_busy(share) 

4118 

4119 if share['status'] != constants.STATUS_AVAILABLE: 

4120 msg_args = {'share_id': share_id, 'state': share['status']} 

4121 msg = (_("Share %(share_id)s is in '%(state)s' state, but it must " 

4122 "be in 'available' state to create a backup.") % msg_args) 

4123 raise exception.InvalidShare(message=msg) 

4124 

4125 snapshots = self.db.share_snapshot_get_all_for_share(context, share_id) 

4126 if snapshots: 4126 ↛ 4127line 4126 didn't jump to line 4127 because the condition on line 4126 was never true

4127 msg = _("Cannot backup share %s while it has snapshots.") 

4128 raise exception.InvalidShare(message=msg % share_id) 

4129 

4130 if share.has_replicas: 

4131 msg = _("Cannot backup share %s while it has replicas.") 

4132 raise exception.InvalidShare(message=msg % share_id) 

4133 

4134 # Reserve a quota before setting share status and backup status 

4135 try: 

4136 reservations = QUOTAS.reserve( 

4137 context, backups=1, backup_gigabytes=share['size']) 

4138 except exception.OverQuota as e: 

4139 overs = e.kwargs['overs'] 

4140 usages = e.kwargs['usages'] 

4141 quotas = e.kwargs['quotas'] 

4142 

4143 def _consumed(resource_name): 

4144 return (usages[resource_name]['reserved'] + 

4145 usages[resource_name]['in_use']) 

4146 

4147 for over in overs: 4147 ↛ 4170line 4147 didn't jump to line 4170 because the loop on line 4147 didn't complete

4148 if 'backup_gigabytes' in over: 

4149 msg = ("Quota exceeded for %(s_pid)s, tried to create " 

4150 "%(s_size)sG backup, but (%(d_consumed)dG of " 

4151 "%(d_quota)dG already consumed.)") 

4152 LOG.warning(msg, {'s_pid': context.project_id, 

4153 's_size': share['size'], 

4154 'd_consumed': _consumed(over), 

4155 'd_quota': quotas[over]}) 

4156 raise exception.ShareBackupSizeExceedsAvailableQuota( 

4157 requested=share['size'], 

4158 consumed=_consumed('backup_gigabytes'), 

4159 quota=quotas['backup_gigabytes']) 

4160 elif 'backups' in over: 4160 ↛ 4147line 4160 didn't jump to line 4147 because the condition on line 4160 was always true

4161 msg = ("Quota exceeded for %(s_pid)s, tried to create " 

4162 "backup, but (%(d_consumed)d of %(d_quota)d " 

4163 "backups already consumed.)") 

4164 LOG.warning(msg, {'s_pid': context.project_id, 

4165 'd_consumed': _consumed(over), 

4166 'd_quota': quotas[over]}) 

4167 raise exception.BackupLimitExceeded( 

4168 allowed=quotas[over]) 

4169 

4170 backup_options = backup.get('backup_options', None) 

4171 topic = CONF.share_topic 

4172 

4173 # Validate right backup type is provided 

4174 backup_type = backup_options and backup.get( 

4175 'backup_options').get(constants.BACKUP_TYPE) 

4176 filters = { 

4177 'status': constants.STATUS_AVAILABLE, 

4178 'share_id': share_id, 

4179 'topic': CONF.share_topic, 

4180 } 

4181 backups = self.db.share_backups_get_all(context, filters) 

4182 if backup_type and len(backups) > 0: 4182 ↛ 4183line 4182 didn't jump to line 4183 because the condition on line 4182 was never true

4183 previous_backup_type = backups[0][constants.BACKUP_TYPE] 

4184 backup_options = backup.get('backup_options') 

4185 current_backup_type = backup_options.get(constants.BACKUP_TYPE) 

4186 if previous_backup_type != current_backup_type: 

4187 err_msg = _("Share '%(share)s' has existing backups with" 

4188 " backup_type: '%(correct_backup_type)s'. You must" 

4189 " delete these backups to schedule a backup with" 

4190 " a different backup_type, or re-use the same" 

4191 " backup_type.") 

4192 msg_args = { 

4193 'share': share.get('display_name'), 

4194 'correct_backup_type': previous_backup_type, 

4195 } 

4196 raise exc.HTTPBadRequest(explanation=err_msg % msg_args) 

4197 

4198 backup_ref = {} 

4199 try: 

4200 # No backup options were provided, which means that it will use the 

4201 # generic backup approach. 

4202 if not backup_options: 

4203 topic = CONF.data_topic 

4204 

4205 backup_protocols = ( 

4206 CONF.data_manager_backup_supported_share_protocols) 

4207 

4208 if (share['share_proto'].upper() 

4209 not in [proto.upper() for proto in backup_protocols]): 

4210 error_msg = _( 

4211 "Cannot backup share %(share)s. The generic approach " 

4212 "for share backups only supports the following " 

4213 "protocols: %(protos)s.") % { 

4214 "share": share_id, 

4215 "protos": backup_protocols 

4216 } 

4217 raise exception.InvalidShare(error_msg) 

4218 

4219 backup_ref = self.db.share_backup_create( 

4220 context, share['id'], 

4221 { 

4222 'user_id': context.user_id, 

4223 'project_id': context.project_id, 

4224 'progress': '0', 

4225 'restore_progress': '0', 

4226 'status': constants.STATUS_CREATING, 

4227 'display_description': backup.get('description'), 

4228 'display_name': backup.get('name'), 

4229 'size': share['size'], 

4230 'availability_zone': share['instance'] 

4231 ['availability_zone'], 

4232 'backup_type': backup_type, 

4233 } 

4234 ) 

4235 QUOTAS.commit(context, reservations) 

4236 except Exception: 

4237 with excutils.save_and_reraise_exception(): 

4238 QUOTAS.rollback(context, reservations) 

4239 

4240 self.db.share_update( 

4241 context, share_id, 

4242 {'status': constants.STATUS_BACKUP_CREATING}) 

4243 

4244 backup_ref['backup_options'] = backup.get('backup_options', {}) 

4245 backup_values = {} 

4246 if backup_ref['backup_options']: 

4247 backup_ref['host'] = share_utils.extract_host(share['host']) 

4248 backup_values.update({'host': backup_ref['host']}) 

4249 

4250 backup_values.update({'topic': topic}) 

4251 self.db.share_backup_update(context, backup_ref['id'], backup_values) 

4252 

4253 if topic == CONF.share_topic: 

4254 self.share_rpcapi.create_backup(context, backup_ref) 

4255 elif topic == CONF.data_topic: 4255 ↛ 4258line 4255 didn't jump to line 4258 because the condition on line 4255 was always true

4256 data_rpc = data_rpcapi.DataAPI() 

4257 data_rpc.create_backup(context, backup_ref) 

4258 return backup_ref 

4259 

4260 def delete_share_backup(self, context, backup): 

4261 """Make the RPC call to delete a share backup. 

4262 

4263 :param context: request context 

4264 :param backup: the model of backup that is retrieved from DB. 

4265 :raises: InvalidBackup 

4266 :raises: BackupDriverException 

4267 :raises: ServiceNotFound 

4268 """ 

4269 if backup.status not in [constants.STATUS_AVAILABLE, 

4270 constants.STATUS_ERROR]: 

4271 msg = (_('Backup %s status must be available or error.') 

4272 % backup['id']) 

4273 raise exception.InvalidBackup(reason=msg) 

4274 

4275 self.db.share_backup_update( 

4276 context, backup['id'], {'status': constants.STATUS_DELETING}) 

4277 

4278 if backup['topic'] == CONF.share_topic: 

4279 self.share_rpcapi.delete_backup(context, backup) 

4280 elif backup['topic'] == CONF.data_topic: 4280 ↛ exitline 4280 didn't return from function 'delete_share_backup' because the condition on line 4280 was always true

4281 data_rpc = data_rpcapi.DataAPI() 

4282 data_rpc.delete_backup(context, backup) 

4283 

4284 def restore_share_backup(self, context, backup, target_share_id=None): 

4285 """Make the RPC call to restore a backup.""" 

4286 backup_id = backup['id'] 

4287 if backup['status'] != constants.STATUS_AVAILABLE: 

4288 msg = (_('Backup %s status must be available.') % backup['id']) 

4289 raise exception.InvalidBackup(reason=msg) 

4290 

4291 if target_share_id: 4291 ↛ 4292line 4291 didn't jump to line 4292 because the condition on line 4291 was never true

4292 share = self.get(context, target_share_id) 

4293 else: 

4294 share = self.get(context, backup['share_id']) 

4295 

4296 share_id = share['id'] 

4297 if share['status'] != constants.STATUS_AVAILABLE: 

4298 msg = _('Share to be restored to must be available.') 

4299 raise exception.InvalidShare(reason=msg) 

4300 

4301 backup_size = backup['size'] 

4302 LOG.debug('Checking backup size %(backup_size)s against share size ' 

4303 '%(share_size)s.', {'backup_size': backup_size, 

4304 'share_size': share['size']}) 

4305 if backup_size > share['size']: 

4306 msg = (_('Share size %(share_size)d is too small to restore ' 

4307 'backup of size %(size)d.') % 

4308 {'share_size': share['size'], 'size': backup_size}) 

4309 raise exception.InvalidShare(reason=msg) 

4310 

4311 LOG.info("Overwriting share %(share_id)s with restore of " 

4312 "backup %(backup_id)s.", 

4313 {'share_id': share_id, 'backup_id': backup_id}) 

4314 

4315 self.db.share_backup_update( 

4316 context, backup_id, 

4317 {'status': constants.STATUS_RESTORING}) 

4318 self.db.share_update( 

4319 context, share_id, 

4320 {'status': constants.STATUS_BACKUP_RESTORING, 

4321 'source_backup_id': backup_id}) 

4322 

4323 if backup['topic'] == CONF.share_topic: 

4324 self.share_rpcapi.restore_backup(context, backup, share_id) 

4325 elif backup['topic'] == CONF.data_topic: 4325 ↛ 4329line 4325 didn't jump to line 4329 because the condition on line 4325 was always true

4326 data_rpc = data_rpcapi.DataAPI() 

4327 data_rpc.restore_backup(context, backup, share_id) 

4328 

4329 restore_info = {'backup_id': backup_id, 'share_id': share_id} 

4330 return restore_info 

4331 

4332 def update_share_backup(self, context, backup, fields): 

4333 return self.db.share_backup_update(context, backup['id'], fields)