Coverage for manila/scheduler/host_manager.py: 93%

376 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2011 OpenStack, LLC. 

2# Copyright (c) 2015 Rushil Chugh 

3# Copyright (c) 2015 Clinton Knight 

4# Copyright (c) 2015 EMC Corporation 

5# All Rights Reserved. 

6# 

7# Licensed under the Apache License, Version 2.0 (the "License"); you may 

8# not use this file except in compliance with the License. You may obtain 

9# a copy of the License at 

10# 

11# http://www.apache.org/licenses/LICENSE-2.0 

12# 

13# Unless required by applicable law or agreed to in writing, software 

14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

16# License for the specific language governing permissions and limitations 

17# under the License. 

18 

19""" 

20Manage hosts in the current zone. 

21""" 

22 

23import collections 

24import re 

25 

26from oslo_config import cfg 

27from oslo_log import log 

28from oslo_utils import timeutils 

29 

30from manila import db 

31from manila import exception 

32from manila.scheduler.filters import base_host as base_host_filter 

33from manila.scheduler import utils as scheduler_utils 

34from manila.scheduler.weighers import base_host as base_host_weigher 

35from manila.share import utils as share_utils 

36from manila import utils 

37 

38 

39host_manager_opts = [ 

40 cfg.ListOpt('scheduler_default_filters', 

41 default=[ 

42 'OnlyHostFilter', 

43 'AvailabilityZoneFilter', 

44 'CapacityFilter', 

45 'CapabilitiesFilter', 

46 'DriverFilter', 

47 'ShareReplicationFilter', 

48 'CreateFromSnapshotFilter', 

49 'AffinityFilter', 

50 'AntiAffinityFilter', 

51 ], 

52 help='Which filter class names to use for filtering hosts ' 

53 'when not specified in the request.'), 

54 cfg.ListOpt('scheduler_default_weighers', 

55 default=[ 

56 'CapacityWeigher', 

57 'GoodnessWeigher', 

58 'HostAffinityWeigher', 

59 ], 

60 help='Which weigher class names to use for weighing hosts.'), 

61 cfg.ListOpt( 

62 'scheduler_default_share_group_filters', 

63 default=[ 

64 'AvailabilityZoneFilter', 

65 'ConsistentSnapshotFilter', 

66 ], 

67 help='Which filter class names to use for filtering hosts ' 

68 'creating share group when not specified in the request.'), 

69 cfg.ListOpt( 

70 'scheduler_default_extend_filters', 

71 default=[ 

72 'CapacityFilter', 

73 'DriverFilter', 

74 ], 

75 help='Which filter class names to use for filtering hosts ' 

76 'extending share when not specified in the request.'), 

77] 

78 

79CONF = cfg.CONF 

80CONF.register_opts(host_manager_opts) 

81CONF.import_opt('max_over_subscription_ratio', 'manila.share.driver') 

82 

83LOG = log.getLogger(__name__) 

84 

85 

86class ReadOnlyDict(collections.UserDict): 

87 """A read-only dict.""" 

88 

89 def __init__(self, source=None): 

90 self.data = {} 

91 self.update(source) 

92 

93 def __setitem__(self, key, item): 

94 raise TypeError 

95 

96 def __delitem__(self, key): 

97 raise TypeError 

98 

99 def clear(self): 

100 raise TypeError 

101 

102 def pop(self, key, *args): 

103 raise TypeError 

104 

105 def popitem(self): 

106 raise TypeError 

107 

108 def update(self, source=None): 

109 if source is None: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 return 

111 elif isinstance(source, collections.UserDict): 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true

112 self.data = source.data 

113 elif isinstance(source, type({})): 113 ↛ 116line 113 didn't jump to line 116 because the condition on line 113 was always true

114 self.data = source 

115 else: 

116 raise TypeError 

117 

118 

119class HostState(object): 

120 """Mutable and immutable information tracked for a host.""" 

121 

122 def __init__(self, host, capabilities=None, service=None): 

123 self.capabilities = None 

124 self.service = None 

125 self.host = host 

126 self.update_capabilities(capabilities, service) 

127 

128 self.share_backend_name = None 

129 self.vendor_name = None 

130 self.driver_version = 0 

131 self.storage_protocol = None 

132 self.qos = False 

133 # Mutable available resources. 

134 # These will change as resources are virtually "consumed". 

135 self.total_capacity_gb = 0 

136 self.free_capacity_gb = None 

137 self.reserved_percentage = 0 

138 self.reserved_snapshot_percentage = 0 

139 self.reserved_share_extend_percentage = 0 

140 self.allocated_capacity_gb = 0 

141 # NOTE(xyang): The apparent allocated space indicating how much 

142 # capacity has been provisioned. This could be the sum of sizes 

143 # of all shares on a backend, which could be greater than or 

144 # equal to the allocated_capacity_gb. 

145 self.provisioned_capacity_gb = 0 

146 self.max_over_subscription_ratio = 1.0 

147 self.thin_provisioning = False 

148 self.driver_handles_share_servers = False 

149 self.snapshot_support = True 

150 self.create_share_from_snapshot_support = True 

151 self.revert_to_snapshot_support = False 

152 self.mount_snapshot_support = False 

153 self.dedupe = False 

154 self.compression = False 

155 self.replication_type = None 

156 self.replication_domain = None 

157 self.ipv4_support = None 

158 self.ipv6_support = None 

159 self.security_service_update_support = False 

160 self.network_allocation_update_support = False 

161 self.share_server_multiple_subnet_support = False 

162 self.mount_point_name_support = False 

163 self.share_replicas_migration_support = False 

164 self.encryption_support = None 

165 

166 # PoolState for all pools 

167 self.pools = {} 

168 self.updated = None 

169 

170 # Share Group capabilities 

171 self.sg_consistent_snapshot_support = None 

172 

173 def update_capabilities(self, capabilities=None, service=None): 

174 # Read-only capability dicts 

175 

176 if capabilities is None: 

177 capabilities = {} 

178 self.capabilities = ReadOnlyDict(capabilities) 

179 if service is None: 

180 service = {} 

181 self.service = ReadOnlyDict(service) 

182 

183 def update_from_share_capability( 

184 self, capability, service=None, context=None): 

185 """Update information about a host from its share_node info. 

186 

187 'capability' is the status info reported by share backend, a typical 

188 capability looks like this:: 

189 

190 capability = { 

191 'share_backend_name': 'Local NFS', #\ 

192 'vendor_name': 'OpenStack', # backend level 

193 'driver_version': '1.0', # mandatory/fixed 

194 'storage_protocol': 'NFS', #/ stats&capabilities 

195 

196 'active_shares': 10, #\ 

197 'IOPS_provisioned': 30000, # optional custom 

198 'fancy_capability_1': 'eat', # stats & capabilities 

199 'fancy_capability_2': 'drink', #/ 

200 

201 'pools':[ 

202 { 

203 'pool_name': '1st pool', #\ 

204 'total_capacity_gb': 500, # mandatory stats 

205 'free_capacity_gb': 230, # for pools 

206 'allocated_capacity_gb': 270, # | 

207 'qos': 'False', # | 

208 'reserved_percentage': 0, # | 

209 'reserved_snapshot_percentage': 0, # | 

210 'reserved_share_extend_percentage': 0, #/ 

211 

212 'dying_disks': 100, #\ 

213 'super_hero_1': 'spider-man', # optional custom 

214 'super_hero_2': 'flash', # stats & 

215 'super_hero_3': 'neoncat', # capabilities 

216 'super_hero_4': 'green lantern', #/ 

217 }, 

218 { 

219 'pool_name': '2nd pool', 

220 'total_capacity_gb': 1024, 

221 'free_capacity_gb': 1024, 

222 'allocated_capacity_gb': 0, 

223 'qos': 'False', 

224 'reserved_percentage': 0, 

225 'reserved_snapshot_percentage': 0, 

226 'reserved_share_extend_percentage': 0, 

227 

228 'dying_disks': 200, 

229 'super_hero_1': 'superman', 

230 'super_hero_2': 'Hulk', 

231 }] 

232 } 

233 """ 

234 self.update_capabilities(capability, service) 

235 

236 if capability: 

237 if self.updated and self.updated > capability['timestamp']: 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true

238 return 

239 

240 # Update backend level info 

241 self.update_backend(capability) 

242 

243 # Update pool level info 

244 self.update_pools(capability, service, context=context) 

245 

246 def update_pools(self, capability, service, context=None): 

247 """Update storage pools information from backend reported info.""" 

248 if not capability: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 return 

250 

251 pools = capability.get('pools', None) 

252 active_pools = set() 

253 if pools and isinstance(pools, list): 

254 # Update all pools stats according to information from list 

255 # of pools in share capacity 

256 for pool_cap in pools: 

257 pool_name = pool_cap['pool_name'] 

258 self._append_backend_info(pool_cap) 

259 cur_pool = self.pools.get(pool_name, None) 

260 if not cur_pool: 

261 # Add new pool 

262 cur_pool = PoolState(self.host, pool_cap, pool_name) 

263 self.pools[pool_name] = cur_pool 

264 cur_pool.update_from_share_capability( 

265 pool_cap, service, context=context) 

266 

267 active_pools.add(pool_name) 

268 elif pools is None: 268 ↛ 297line 268 didn't jump to line 297 because the condition on line 268 was always true

269 # To handle legacy driver that doesn't report pool 

270 # information in the capability, we have to prepare 

271 # a pool from backend level info, or to update the one 

272 # we created in self.pools. 

273 pool_name = self.share_backend_name 

274 if pool_name is None: 

275 # To get DEFAULT_POOL_NAME 

276 pool_name = share_utils.extract_host(self.host, 'pool', True) 

277 

278 if len(self.pools) == 0: 

279 # No pool was there 

280 single_pool = PoolState(self.host, capability, pool_name) 

281 self._append_backend_info(capability) 

282 self.pools[pool_name] = single_pool 

283 else: 

284 # This is a update from legacy driver 

285 try: 

286 single_pool = self.pools[pool_name] 

287 except KeyError: 

288 single_pool = PoolState(self.host, capability, pool_name) 

289 self._append_backend_info(capability) 

290 self.pools[pool_name] = single_pool 

291 

292 single_pool.update_from_share_capability( 

293 capability, service, context=context) 

294 active_pools.add(pool_name) 

295 

296 # Remove non-active pools from self.pools 

297 nonactive_pools = set(self.pools.keys()) - active_pools 

298 for pool in nonactive_pools: 

299 LOG.debug("Removing non-active pool %(pool)s @ %(host)s " 

300 "from scheduler cache.", 

301 {'pool': pool, 'host': self.host}) 

302 del self.pools[pool] 

303 

304 def _append_backend_info(self, pool_cap): 

305 # Fill backend level info to pool if needed. 

306 if not pool_cap.get('share_backend_name'): 

307 pool_cap['share_backend_name'] = self.share_backend_name 

308 

309 if not pool_cap.get('storage_protocol'): 

310 pool_cap['storage_protocol'] = self.storage_protocol 

311 

312 if not pool_cap.get('vendor_name'): 

313 pool_cap['vendor_name'] = self.vendor_name 

314 

315 if not pool_cap.get('driver_version'): 

316 pool_cap['driver_version'] = self.driver_version 

317 

318 if not pool_cap.get('timestamp'): 318 ↛ 321line 318 didn't jump to line 321 because the condition on line 318 was always true

319 pool_cap['timestamp'] = self.updated 

320 

321 if not pool_cap.get('storage_protocol'): 

322 pool_cap['storage_protocol'] = self.storage_protocol 

323 

324 if 'driver_handles_share_servers' not in pool_cap: 

325 pool_cap['driver_handles_share_servers'] = ( 

326 self.driver_handles_share_servers) 

327 

328 if 'snapshot_support' not in pool_cap: 

329 pool_cap['snapshot_support'] = self.snapshot_support 

330 

331 if 'create_share_from_snapshot_support' not in pool_cap: 

332 pool_cap['create_share_from_snapshot_support'] = ( 

333 self.create_share_from_snapshot_support) 

334 

335 if 'revert_to_snapshot_support' not in pool_cap: 

336 pool_cap['revert_to_snapshot_support'] = ( 

337 self.revert_to_snapshot_support) 

338 

339 if 'mount_snapshot_support' not in pool_cap: 

340 pool_cap['mount_snapshot_support'] = self.mount_snapshot_support 

341 

342 if 'dedupe' not in pool_cap: 

343 pool_cap['dedupe'] = self.dedupe 

344 

345 if 'compression' not in pool_cap: 

346 pool_cap['compression'] = self.compression 

347 

348 if not pool_cap.get('replication_type'): 

349 pool_cap['replication_type'] = self.replication_type 

350 

351 if not pool_cap.get('replication_domain'): 

352 pool_cap['replication_domain'] = self.replication_domain 

353 

354 if 'sg_consistent_snapshot_support' not in pool_cap: 

355 pool_cap['sg_consistent_snapshot_support'] = ( 

356 self.sg_consistent_snapshot_support) 

357 

358 if 'security_service_update_support' not in pool_cap: 

359 pool_cap['security_service_update_support'] = ( 

360 self.security_service_update_support) 

361 

362 if 'network_allocation_update_support' not in pool_cap: 

363 pool_cap['network_allocation_update_support'] = ( 

364 self.network_allocation_update_support) 

365 

366 if 'share_server_multiple_subnet_support' not in pool_cap: 

367 pool_cap['share_server_multiple_subnet_support'] = ( 

368 self.share_server_multiple_subnet_support) 

369 

370 if 'share_replicas_migration_support' not in pool_cap: 

371 pool_cap['share_replicas_migration_support'] = ( 

372 self.share_replicas_migration_support) 

373 

374 if 'encryption_support' not in pool_cap: 

375 pool_cap['encryption_support'] = ( 

376 self.encryption_support) 

377 

378 if self.ipv4_support is not None: 

379 pool_cap['ipv4_support'] = self.ipv4_support 

380 

381 if self.ipv6_support is not None: 

382 pool_cap['ipv6_support'] = self.ipv6_support 

383 

384 def update_backend(self, capability): 

385 self.share_backend_name = capability.get('share_backend_name') 

386 self.vendor_name = capability.get('vendor_name') 

387 self.driver_version = capability.get('driver_version') 

388 self.storage_protocol = capability.get('storage_protocol') 

389 self.driver_handles_share_servers = capability.get( 

390 'driver_handles_share_servers') 

391 self.snapshot_support = capability.get('snapshot_support') 

392 self.create_share_from_snapshot_support = capability.get( 

393 'create_share_from_snapshot_support') 

394 self.revert_to_snapshot_support = capability.get( 

395 'revert_to_snapshot_support', False) 

396 self.mount_snapshot_support = capability.get( 

397 'mount_snapshot_support', False) 

398 self.updated = capability['timestamp'] 

399 self.replication_type = capability.get('replication_type') 

400 self.replication_domain = capability.get('replication_domain') 

401 self.sg_consistent_snapshot_support = capability.get( 

402 'share_group_stats', {}).get('consistent_snapshot_support') 

403 if capability.get('ipv4_support') is not None: 

404 self.ipv4_support = capability['ipv4_support'] 

405 if capability.get('ipv6_support') is not None: 

406 self.ipv6_support = capability['ipv6_support'] 

407 self.security_service_update_support = capability.get( 

408 'security_service_update_support', False) 

409 self.network_allocation_update_support = capability.get( 

410 'network_allocation_update_support', False) 

411 self.share_server_multiple_subnet_support = capability.get( 

412 'share_server_multiple_subnet_support', False) 

413 self.share_replicas_migration_support = capability.get( 

414 'share_replicas_migration_support', False) 

415 self.encryption_support = capability.get('encryption_support', None) 

416 

417 def consume_from_share(self, share): 

418 """Incrementally update host state from an share.""" 

419 if self.provisioned_capacity_gb is not None: 

420 self.provisioned_capacity_gb += share['size'] 

421 

422 self.allocated_capacity_gb += share['size'] 

423 

424 if (isinstance(self.free_capacity_gb, str) 

425 and self.free_capacity_gb != 'unknown'): 

426 raise exception.InvalidCapacity( 

427 name='free_capacity_gb', 

428 value=self.free_capacity_gb 

429 ) 

430 

431 if self.free_capacity_gb != 'unknown': 

432 self.free_capacity_gb -= share['size'] 

433 self.updated = timeutils.utcnow() 

434 

435 def __repr__(self): 

436 return ("host: '%(host)s', free_capacity_gb: %(free)s, " 

437 "pools: %(pools)s" % {'host': self.host, 

438 'free': self.free_capacity_gb, 

439 'pools': self.pools} 

440 ) 

441 

442 

443class PoolState(HostState): 

444 

445 def __init__(self, host, capabilities, pool_name): 

446 new_host = share_utils.append_host(host, pool_name) 

447 super(PoolState, self).__init__(new_host, capabilities) 

448 self.pool_name = pool_name 

449 # No pools in pool 

450 self.pools = None 

451 

452 def _estimate_provisioned_capacity(self, host_name, context=None): 

453 """Estimate provisioned capacity from share sizes on backend.""" 

454 return db.share_instance_sizes_sum_by_host(context, host_name) 

455 

456 @utils.synchronized("update_from_share_capability") 

457 def update_from_share_capability( 

458 self, capability, service=None, context=None): 

459 """Update information about a pool from its share_node info.""" 

460 self.update_capabilities(capability, service) 

461 if capability: 461 ↛ exitline 461 didn't return from function 'update_from_share_capability' because the condition on line 461 was always true

462 if self.updated and self.updated > capability['timestamp']: 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true

463 return 

464 self.update_backend(capability) 

465 

466 self.total_capacity_gb = capability['total_capacity_gb'] 

467 self.free_capacity_gb = capability['free_capacity_gb'] 

468 self.allocated_capacity_gb = capability.get( 

469 'allocated_capacity_gb', 0) 

470 self.qos = capability.get('qos', False) 

471 self.reserved_percentage = capability['reserved_percentage'] 

472 self.reserved_snapshot_percentage = ( 

473 capability['reserved_snapshot_percentage']) 

474 self.reserved_share_extend_percentage = ( 

475 capability['reserved_share_extend_percentage']) 

476 self.thin_provisioning = scheduler_utils.thin_provisioning( 

477 capability.get('thin_provisioning', False)) 

478 # NOTE(xyang): provisioned_capacity_gb is the apparent total 

479 # capacity of all the shares created on a backend, which is 

480 # greater than or equal to allocated_capacity_gb, which is the 

481 # apparent total capacity of all the shares created on a backend 

482 # in Manila. 

483 # NOTE(nidhimittalhada): If 'provisioned_capacity_gb' is not set, 

484 # then calculating 'provisioned_capacity_gb' from share sizes 

485 # on host, as per information available in manila database. 

486 # NOTE(jose-castro-leon): Only calculate provisioned_capacity_gb 

487 # on thin provisioned pools 

488 self.provisioned_capacity_gb = capability.get( 

489 'provisioned_capacity_gb') 

490 

491 if self.thin_provisioning and self.provisioned_capacity_gb is None: 

492 self.provisioned_capacity_gb = ( 

493 self._estimate_provisioned_capacity(self.host, 

494 context=context)) 

495 

496 self.max_over_subscription_ratio = capability.get( 

497 'max_over_subscription_ratio', 

498 CONF.max_over_subscription_ratio) 

499 self.dedupe = capability.get( 

500 'dedupe', False) 

501 self.compression = capability.get( 

502 'compression', False) 

503 self.replication_type = capability.get( 

504 'replication_type', self.replication_type) 

505 self.replication_domain = capability.get( 

506 'replication_domain') 

507 self.sg_consistent_snapshot_support = capability.get( 

508 'sg_consistent_snapshot_support') 

509 self.security_service_update_support = capability.get( 

510 'security_service_update_support', False) 

511 self.network_allocation_update_support = capability.get( 

512 'network_allocation_update_support', False) 

513 self.share_server_multiple_subnet_support = capability.get( 

514 'share_server_multiple_subnet_support', False) 

515 self.share_replicas_migration_support = capability.get( 

516 'share_replicas_migration_support', False) 

517 self.encryption_support = capability.get('encryption_support') 

518 

519 def update_pools(self, capability): 

520 # Do nothing, since we don't have pools within pool, yet 

521 pass 

522 

523 

524class HostManager(object): 

525 """Base HostManager class.""" 

526 

527 host_state_cls = HostState 

528 

529 def __init__(self): 

530 self.service_states = {} # { <host>: {<service>: {cap k : v}}} 

531 self.host_state_map = {} 

532 self.filter_handler = base_host_filter.HostFilterHandler( 

533 'manila.scheduler.filters') 

534 self.filter_classes = self.filter_handler.get_all_classes() 

535 self.weight_handler = base_host_weigher.HostWeightHandler( 

536 'manila.scheduler.weighers') 

537 self.weight_classes = self.weight_handler.get_all_classes() 

538 

539 def _choose_host_filters(self, filter_cls_names): 

540 """Choose acceptable filters. 

541 

542 Since the caller may specify which filters to use we need to 

543 have an authoritative list of what is permissible. This 

544 function checks the filter names against a predefined set of 

545 acceptable filters. 

546 """ 

547 if filter_cls_names is None: 

548 filter_cls_names = CONF.scheduler_default_filters 

549 if not isinstance(filter_cls_names, (list, tuple)): 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true

550 filter_cls_names = [filter_cls_names] 

551 good_filters = [] 

552 bad_filters = [] 

553 for filter_name in filter_cls_names: 

554 found_class = False 

555 for cls in self.filter_classes: 

556 if cls.__name__ == filter_name: 

557 found_class = True 

558 good_filters.append(cls) 

559 break 

560 if not found_class: 

561 bad_filters.append(filter_name) 

562 if bad_filters: 

563 msg = ", ".join(bad_filters) 

564 raise exception.SchedulerHostFilterNotFound(filter_name=msg) 

565 return good_filters 

566 

567 def _choose_host_weighers(self, weight_cls_names): 

568 """Choose acceptable weighers. 

569 

570 Since the caller may specify which weighers to use, we need to 

571 have an authoritative list of what is permissible. This 

572 function checks the weigher names against a predefined set of 

573 acceptable weighers. 

574 """ 

575 if weight_cls_names is None: 575 ↛ 577line 575 didn't jump to line 577 because the condition on line 575 was always true

576 weight_cls_names = CONF.scheduler_default_weighers 

577 if not isinstance(weight_cls_names, (list, tuple)): 577 ↛ 578line 577 didn't jump to line 578 because the condition on line 577 was never true

578 weight_cls_names = [weight_cls_names] 

579 

580 good_weighers = [] 

581 bad_weighers = [] 

582 for weigher_name in weight_cls_names: 

583 found_class = False 

584 for cls in self.weight_classes: 584 ↛ 589line 584 didn't jump to line 589 because the loop on line 584 didn't complete

585 if cls.__name__ == weigher_name: 

586 good_weighers.append(cls) 

587 found_class = True 

588 break 

589 if not found_class: 589 ↛ 590line 589 didn't jump to line 590 because the condition on line 589 was never true

590 bad_weighers.append(weigher_name) 

591 if bad_weighers: 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true

592 msg = ", ".join(bad_weighers) 

593 raise exception.SchedulerHostWeigherNotFound(weigher_name=msg) 

594 return good_weighers 

595 

596 def get_filtered_hosts(self, hosts, filter_properties, 

597 filter_class_names=None): 

598 """Filter hosts and return only ones passing all filters.""" 

599 filter_classes = self._choose_host_filters(filter_class_names) 

600 return self.filter_handler.get_filtered_objects(filter_classes, 

601 hosts, 

602 filter_properties) 

603 

604 def get_weighed_hosts(self, hosts, weight_properties, 

605 weigher_class_names=None): 

606 """Weigh the hosts.""" 

607 weigher_classes = self._choose_host_weighers(weigher_class_names) 

608 weight_properties['server_pools_mapping'] = {} 

609 for backend, info in self.service_states.items(): 

610 weight_properties['server_pools_mapping'].update( 

611 info.get('server_pools_mapping', {})) 

612 return self.weight_handler.get_weighed_objects(weigher_classes, 

613 hosts, 

614 weight_properties) 

615 

616 def update_service_capabilities(self, service_name, host, 

617 capabilities, timestamp): 

618 """Update the per-service capabilities based on this notification.""" 

619 if service_name not in ('share',): 619 ↛ 620line 619 didn't jump to line 620 because the condition on line 619 was never true

620 LOG.debug('Ignoring %(service_name)s service update ' 

621 'from %(host)s', 

622 {'service_name': service_name, 'host': host}) 

623 return 

624 

625 # Copy the capabilities, so we don't modify the original dict 

626 capability_copy = dict(capabilities) 

627 timestamp = timestamp or timeutils.utcnow() 

628 capability_copy["timestamp"] = timestamp # Reported time 

629 

630 capab_old = self.service_states.get(host, {"timestamp": 0}) 

631 # Ignore older updates 

632 if capab_old['timestamp'] and timestamp < capab_old['timestamp']: 632 ↛ 633line 632 didn't jump to line 633 because the condition on line 632 was never true

633 LOG.info('Ignoring old capability report from %s.', host) 

634 return 

635 

636 self.service_states[host] = capability_copy 

637 

638 LOG.debug("Received %(service_name)s service update from " 

639 "%(host)s: %(cap)s", 

640 {'service_name': service_name, 'host': host, 

641 'cap': capabilities}) 

642 

643 def _update_host_state_map(self, context, consider_disabled=False): 

644 # Get resource usage across the available share nodes: 

645 topic = CONF.share_topic 

646 share_services = db.service_get_all_by_topic( 

647 context, 

648 topic, 

649 consider_disabled=consider_disabled, 

650 ) 

651 

652 active_hosts = set() 

653 for service in share_services: 

654 host = service['host'] 

655 

656 # Warn about down services and remove them from host_state_map 

657 is_down = not utils.service_is_up(service) 

658 is_disabled = (not consider_disabled and service['disabled']) 

659 if is_down or is_disabled: 

660 LOG.warning("Share service is down. (host: %s).", host) 

661 continue 

662 

663 # Create and register host_state if not in host_state_map 

664 capabilities = self.service_states.get(host, None) 

665 host_state = self.host_state_map.get(host) 

666 if not host_state: 

667 host_state = self.host_state_cls( 

668 host, 

669 capabilities=capabilities, 

670 service=dict(service.items())) 

671 self.host_state_map[host] = host_state 

672 

673 # Update capabilities and attributes in host_state 

674 host_state.update_from_share_capability( 

675 capabilities, service=dict(service.items()), context=context) 

676 active_hosts.add(host) 

677 

678 # remove non-active hosts from host_state_map 

679 nonactive_hosts = set(self.host_state_map.keys()) - active_hosts 

680 for host in nonactive_hosts: 

681 LOG.info("Removing non-active host: %(host)s from " 

682 "scheduler cache.", {'host': host}) 

683 self.host_state_map.pop(host, None) 

684 

685 def get_all_host_states_share(self, context, consider_disabled=False): 

686 """Returns a dict of all the hosts the HostManager knows about. 

687 

688 Each of the consumable resources in HostState are 

689 populated with capabilities scheduler received from RPC. 

690 

691 For example: 

692 {'192.168.1.100': HostState(), ...} 

693 """ 

694 

695 self._update_host_state_map( 

696 context, 

697 consider_disabled=consider_disabled, 

698 ) 

699 

700 # Build a pool_state map and return that map instead of host_state_map 

701 all_pools = {} 

702 for host, state in self.host_state_map.items(): 

703 for key in state.pools: 

704 pool = state.pools[key] 

705 # Use host.pool_name to make sure key is unique 

706 pool_key = '.'.join([host, pool.pool_name]) 

707 all_pools[pool_key] = pool 

708 

709 return all_pools.values() 

710 

711 def get_pools(self, context, filters=None, cached=False): 

712 """Returns a dict of all pools on all hosts HostManager knows about.""" 

713 if not cached or not self.host_state_map: 713 ↛ 716line 713 didn't jump to line 716 because the condition on line 713 was always true

714 self._update_host_state_map(context) 

715 

716 all_pools = [] 

717 for host, host_state in self.host_state_map.items(): 

718 for pool in host_state.pools.values(): 

719 

720 fully_qualified_pool_name = share_utils.append_host( 

721 host, pool.pool_name) 

722 host_name = share_utils.extract_host( 

723 fully_qualified_pool_name, level='host') 

724 backend_name = (share_utils.extract_host( 

725 fully_qualified_pool_name, level='backend').split('@')[1] 

726 if '@' in fully_qualified_pool_name else None) 

727 pool_name = share_utils.extract_host( 

728 fully_qualified_pool_name, level='pool') 

729 

730 new_pool = { 

731 'name': fully_qualified_pool_name, 

732 'host': host_name, 

733 'backend': backend_name, 

734 'pool': pool_name, 

735 'capabilities': pool.capabilities, 

736 } 

737 if self._passes_filters(new_pool, filters): 

738 all_pools.append(new_pool) 

739 return all_pools 

740 

741 def _passes_filters(self, dict_to_check, filter_dict): 

742 """Applies a set of regex filters to a dictionary. 

743 

744 If no filter keys are supplied, the data passes unfiltered and 

745 the method returns True. Otherwise, each key in the filter 

746 (filter_dict) must be present in the data (dict_to_check) 

747 and the filter values are applied as regex expressions to 

748 the data values. If any of the filter values fail to match 

749 their corresponding data values, the method returns False. 

750 But if all filters match, the method returns True. 

751 """ 

752 if not filter_dict: 

753 return True 

754 

755 for filter_key, filter_value in filter_dict.items(): 

756 if filter_key not in dict_to_check: 

757 return False 

758 if filter_key == 'capabilities': 

759 if not scheduler_utils.capabilities_satisfied( 

760 dict_to_check.get(filter_key), filter_value): 

761 return False 

762 elif not re.match(filter_value, dict_to_check.get(filter_key)): 

763 return False 

764 

765 return True