Coverage for manila/scheduler/host_manager.py: 93%
376 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2011 OpenStack, LLC.
2# Copyright (c) 2015 Rushil Chugh
3# Copyright (c) 2015 Clinton Knight
4# Copyright (c) 2015 EMC Corporation
5# All Rights Reserved.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
19"""
20Manage hosts in the current zone.
21"""
23import collections
24import re
26from oslo_config import cfg
27from oslo_log import log
28from oslo_utils import timeutils
30from manila import db
31from manila import exception
32from manila.scheduler.filters import base_host as base_host_filter
33from manila.scheduler import utils as scheduler_utils
34from manila.scheduler.weighers import base_host as base_host_weigher
35from manila.share import utils as share_utils
36from manila import utils
39host_manager_opts = [
40 cfg.ListOpt('scheduler_default_filters',
41 default=[
42 'OnlyHostFilter',
43 'AvailabilityZoneFilter',
44 'CapacityFilter',
45 'CapabilitiesFilter',
46 'DriverFilter',
47 'ShareReplicationFilter',
48 'CreateFromSnapshotFilter',
49 'AffinityFilter',
50 'AntiAffinityFilter',
51 ],
52 help='Which filter class names to use for filtering hosts '
53 'when not specified in the request.'),
54 cfg.ListOpt('scheduler_default_weighers',
55 default=[
56 'CapacityWeigher',
57 'GoodnessWeigher',
58 'HostAffinityWeigher',
59 ],
60 help='Which weigher class names to use for weighing hosts.'),
61 cfg.ListOpt(
62 'scheduler_default_share_group_filters',
63 default=[
64 'AvailabilityZoneFilter',
65 'ConsistentSnapshotFilter',
66 ],
67 help='Which filter class names to use for filtering hosts '
68 'creating share group when not specified in the request.'),
69 cfg.ListOpt(
70 'scheduler_default_extend_filters',
71 default=[
72 'CapacityFilter',
73 'DriverFilter',
74 ],
75 help='Which filter class names to use for filtering hosts '
76 'extending share when not specified in the request.'),
77]
79CONF = cfg.CONF
80CONF.register_opts(host_manager_opts)
81CONF.import_opt('max_over_subscription_ratio', 'manila.share.driver')
83LOG = log.getLogger(__name__)
86class ReadOnlyDict(collections.UserDict):
87 """A read-only dict."""
89 def __init__(self, source=None):
90 self.data = {}
91 self.update(source)
93 def __setitem__(self, key, item):
94 raise TypeError
96 def __delitem__(self, key):
97 raise TypeError
99 def clear(self):
100 raise TypeError
102 def pop(self, key, *args):
103 raise TypeError
105 def popitem(self):
106 raise TypeError
108 def update(self, source=None):
109 if source is None: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 return
111 elif isinstance(source, collections.UserDict): 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true
112 self.data = source.data
113 elif isinstance(source, type({})): 113 ↛ 116line 113 didn't jump to line 116 because the condition on line 113 was always true
114 self.data = source
115 else:
116 raise TypeError
119class HostState(object):
120 """Mutable and immutable information tracked for a host."""
122 def __init__(self, host, capabilities=None, service=None):
123 self.capabilities = None
124 self.service = None
125 self.host = host
126 self.update_capabilities(capabilities, service)
128 self.share_backend_name = None
129 self.vendor_name = None
130 self.driver_version = 0
131 self.storage_protocol = None
132 self.qos = False
133 # Mutable available resources.
134 # These will change as resources are virtually "consumed".
135 self.total_capacity_gb = 0
136 self.free_capacity_gb = None
137 self.reserved_percentage = 0
138 self.reserved_snapshot_percentage = 0
139 self.reserved_share_extend_percentage = 0
140 self.allocated_capacity_gb = 0
141 # NOTE(xyang): The apparent allocated space indicating how much
142 # capacity has been provisioned. This could be the sum of sizes
143 # of all shares on a backend, which could be greater than or
144 # equal to the allocated_capacity_gb.
145 self.provisioned_capacity_gb = 0
146 self.max_over_subscription_ratio = 1.0
147 self.thin_provisioning = False
148 self.driver_handles_share_servers = False
149 self.snapshot_support = True
150 self.create_share_from_snapshot_support = True
151 self.revert_to_snapshot_support = False
152 self.mount_snapshot_support = False
153 self.dedupe = False
154 self.compression = False
155 self.replication_type = None
156 self.replication_domain = None
157 self.ipv4_support = None
158 self.ipv6_support = None
159 self.security_service_update_support = False
160 self.network_allocation_update_support = False
161 self.share_server_multiple_subnet_support = False
162 self.mount_point_name_support = False
163 self.share_replicas_migration_support = False
164 self.encryption_support = None
166 # PoolState for all pools
167 self.pools = {}
168 self.updated = None
170 # Share Group capabilities
171 self.sg_consistent_snapshot_support = None
173 def update_capabilities(self, capabilities=None, service=None):
174 # Read-only capability dicts
176 if capabilities is None:
177 capabilities = {}
178 self.capabilities = ReadOnlyDict(capabilities)
179 if service is None:
180 service = {}
181 self.service = ReadOnlyDict(service)
183 def update_from_share_capability(
184 self, capability, service=None, context=None):
185 """Update information about a host from its share_node info.
187 'capability' is the status info reported by share backend, a typical
188 capability looks like this::
190 capability = {
191 'share_backend_name': 'Local NFS', #\
192 'vendor_name': 'OpenStack', # backend level
193 'driver_version': '1.0', # mandatory/fixed
194 'storage_protocol': 'NFS', #/ stats&capabilities
196 'active_shares': 10, #\
197 'IOPS_provisioned': 30000, # optional custom
198 'fancy_capability_1': 'eat', # stats & capabilities
199 'fancy_capability_2': 'drink', #/
201 'pools':[
202 {
203 'pool_name': '1st pool', #\
204 'total_capacity_gb': 500, # mandatory stats
205 'free_capacity_gb': 230, # for pools
206 'allocated_capacity_gb': 270, # |
207 'qos': 'False', # |
208 'reserved_percentage': 0, # |
209 'reserved_snapshot_percentage': 0, # |
210 'reserved_share_extend_percentage': 0, #/
212 'dying_disks': 100, #\
213 'super_hero_1': 'spider-man', # optional custom
214 'super_hero_2': 'flash', # stats &
215 'super_hero_3': 'neoncat', # capabilities
216 'super_hero_4': 'green lantern', #/
217 },
218 {
219 'pool_name': '2nd pool',
220 'total_capacity_gb': 1024,
221 'free_capacity_gb': 1024,
222 'allocated_capacity_gb': 0,
223 'qos': 'False',
224 'reserved_percentage': 0,
225 'reserved_snapshot_percentage': 0,
226 'reserved_share_extend_percentage': 0,
228 'dying_disks': 200,
229 'super_hero_1': 'superman',
230 'super_hero_2': 'Hulk',
231 }]
232 }
233 """
234 self.update_capabilities(capability, service)
236 if capability:
237 if self.updated and self.updated > capability['timestamp']: 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true
238 return
240 # Update backend level info
241 self.update_backend(capability)
243 # Update pool level info
244 self.update_pools(capability, service, context=context)
246 def update_pools(self, capability, service, context=None):
247 """Update storage pools information from backend reported info."""
248 if not capability: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 return
251 pools = capability.get('pools', None)
252 active_pools = set()
253 if pools and isinstance(pools, list):
254 # Update all pools stats according to information from list
255 # of pools in share capacity
256 for pool_cap in pools:
257 pool_name = pool_cap['pool_name']
258 self._append_backend_info(pool_cap)
259 cur_pool = self.pools.get(pool_name, None)
260 if not cur_pool:
261 # Add new pool
262 cur_pool = PoolState(self.host, pool_cap, pool_name)
263 self.pools[pool_name] = cur_pool
264 cur_pool.update_from_share_capability(
265 pool_cap, service, context=context)
267 active_pools.add(pool_name)
268 elif pools is None: 268 ↛ 297line 268 didn't jump to line 297 because the condition on line 268 was always true
269 # To handle legacy driver that doesn't report pool
270 # information in the capability, we have to prepare
271 # a pool from backend level info, or to update the one
272 # we created in self.pools.
273 pool_name = self.share_backend_name
274 if pool_name is None:
275 # To get DEFAULT_POOL_NAME
276 pool_name = share_utils.extract_host(self.host, 'pool', True)
278 if len(self.pools) == 0:
279 # No pool was there
280 single_pool = PoolState(self.host, capability, pool_name)
281 self._append_backend_info(capability)
282 self.pools[pool_name] = single_pool
283 else:
284 # This is a update from legacy driver
285 try:
286 single_pool = self.pools[pool_name]
287 except KeyError:
288 single_pool = PoolState(self.host, capability, pool_name)
289 self._append_backend_info(capability)
290 self.pools[pool_name] = single_pool
292 single_pool.update_from_share_capability(
293 capability, service, context=context)
294 active_pools.add(pool_name)
296 # Remove non-active pools from self.pools
297 nonactive_pools = set(self.pools.keys()) - active_pools
298 for pool in nonactive_pools:
299 LOG.debug("Removing non-active pool %(pool)s @ %(host)s "
300 "from scheduler cache.",
301 {'pool': pool, 'host': self.host})
302 del self.pools[pool]
304 def _append_backend_info(self, pool_cap):
305 # Fill backend level info to pool if needed.
306 if not pool_cap.get('share_backend_name'):
307 pool_cap['share_backend_name'] = self.share_backend_name
309 if not pool_cap.get('storage_protocol'):
310 pool_cap['storage_protocol'] = self.storage_protocol
312 if not pool_cap.get('vendor_name'):
313 pool_cap['vendor_name'] = self.vendor_name
315 if not pool_cap.get('driver_version'):
316 pool_cap['driver_version'] = self.driver_version
318 if not pool_cap.get('timestamp'): 318 ↛ 321line 318 didn't jump to line 321 because the condition on line 318 was always true
319 pool_cap['timestamp'] = self.updated
321 if not pool_cap.get('storage_protocol'):
322 pool_cap['storage_protocol'] = self.storage_protocol
324 if 'driver_handles_share_servers' not in pool_cap:
325 pool_cap['driver_handles_share_servers'] = (
326 self.driver_handles_share_servers)
328 if 'snapshot_support' not in pool_cap:
329 pool_cap['snapshot_support'] = self.snapshot_support
331 if 'create_share_from_snapshot_support' not in pool_cap:
332 pool_cap['create_share_from_snapshot_support'] = (
333 self.create_share_from_snapshot_support)
335 if 'revert_to_snapshot_support' not in pool_cap:
336 pool_cap['revert_to_snapshot_support'] = (
337 self.revert_to_snapshot_support)
339 if 'mount_snapshot_support' not in pool_cap:
340 pool_cap['mount_snapshot_support'] = self.mount_snapshot_support
342 if 'dedupe' not in pool_cap:
343 pool_cap['dedupe'] = self.dedupe
345 if 'compression' not in pool_cap:
346 pool_cap['compression'] = self.compression
348 if not pool_cap.get('replication_type'):
349 pool_cap['replication_type'] = self.replication_type
351 if not pool_cap.get('replication_domain'):
352 pool_cap['replication_domain'] = self.replication_domain
354 if 'sg_consistent_snapshot_support' not in pool_cap:
355 pool_cap['sg_consistent_snapshot_support'] = (
356 self.sg_consistent_snapshot_support)
358 if 'security_service_update_support' not in pool_cap:
359 pool_cap['security_service_update_support'] = (
360 self.security_service_update_support)
362 if 'network_allocation_update_support' not in pool_cap:
363 pool_cap['network_allocation_update_support'] = (
364 self.network_allocation_update_support)
366 if 'share_server_multiple_subnet_support' not in pool_cap:
367 pool_cap['share_server_multiple_subnet_support'] = (
368 self.share_server_multiple_subnet_support)
370 if 'share_replicas_migration_support' not in pool_cap:
371 pool_cap['share_replicas_migration_support'] = (
372 self.share_replicas_migration_support)
374 if 'encryption_support' not in pool_cap:
375 pool_cap['encryption_support'] = (
376 self.encryption_support)
378 if self.ipv4_support is not None:
379 pool_cap['ipv4_support'] = self.ipv4_support
381 if self.ipv6_support is not None:
382 pool_cap['ipv6_support'] = self.ipv6_support
384 def update_backend(self, capability):
385 self.share_backend_name = capability.get('share_backend_name')
386 self.vendor_name = capability.get('vendor_name')
387 self.driver_version = capability.get('driver_version')
388 self.storage_protocol = capability.get('storage_protocol')
389 self.driver_handles_share_servers = capability.get(
390 'driver_handles_share_servers')
391 self.snapshot_support = capability.get('snapshot_support')
392 self.create_share_from_snapshot_support = capability.get(
393 'create_share_from_snapshot_support')
394 self.revert_to_snapshot_support = capability.get(
395 'revert_to_snapshot_support', False)
396 self.mount_snapshot_support = capability.get(
397 'mount_snapshot_support', False)
398 self.updated = capability['timestamp']
399 self.replication_type = capability.get('replication_type')
400 self.replication_domain = capability.get('replication_domain')
401 self.sg_consistent_snapshot_support = capability.get(
402 'share_group_stats', {}).get('consistent_snapshot_support')
403 if capability.get('ipv4_support') is not None:
404 self.ipv4_support = capability['ipv4_support']
405 if capability.get('ipv6_support') is not None:
406 self.ipv6_support = capability['ipv6_support']
407 self.security_service_update_support = capability.get(
408 'security_service_update_support', False)
409 self.network_allocation_update_support = capability.get(
410 'network_allocation_update_support', False)
411 self.share_server_multiple_subnet_support = capability.get(
412 'share_server_multiple_subnet_support', False)
413 self.share_replicas_migration_support = capability.get(
414 'share_replicas_migration_support', False)
415 self.encryption_support = capability.get('encryption_support', None)
417 def consume_from_share(self, share):
418 """Incrementally update host state from an share."""
419 if self.provisioned_capacity_gb is not None:
420 self.provisioned_capacity_gb += share['size']
422 self.allocated_capacity_gb += share['size']
424 if (isinstance(self.free_capacity_gb, str)
425 and self.free_capacity_gb != 'unknown'):
426 raise exception.InvalidCapacity(
427 name='free_capacity_gb',
428 value=self.free_capacity_gb
429 )
431 if self.free_capacity_gb != 'unknown':
432 self.free_capacity_gb -= share['size']
433 self.updated = timeutils.utcnow()
435 def __repr__(self):
436 return ("host: '%(host)s', free_capacity_gb: %(free)s, "
437 "pools: %(pools)s" % {'host': self.host,
438 'free': self.free_capacity_gb,
439 'pools': self.pools}
440 )
443class PoolState(HostState):
445 def __init__(self, host, capabilities, pool_name):
446 new_host = share_utils.append_host(host, pool_name)
447 super(PoolState, self).__init__(new_host, capabilities)
448 self.pool_name = pool_name
449 # No pools in pool
450 self.pools = None
452 def _estimate_provisioned_capacity(self, host_name, context=None):
453 """Estimate provisioned capacity from share sizes on backend."""
454 return db.share_instance_sizes_sum_by_host(context, host_name)
456 @utils.synchronized("update_from_share_capability")
457 def update_from_share_capability(
458 self, capability, service=None, context=None):
459 """Update information about a pool from its share_node info."""
460 self.update_capabilities(capability, service)
461 if capability: 461 ↛ exitline 461 didn't return from function 'update_from_share_capability' because the condition on line 461 was always true
462 if self.updated and self.updated > capability['timestamp']: 462 ↛ 463line 462 didn't jump to line 463 because the condition on line 462 was never true
463 return
464 self.update_backend(capability)
466 self.total_capacity_gb = capability['total_capacity_gb']
467 self.free_capacity_gb = capability['free_capacity_gb']
468 self.allocated_capacity_gb = capability.get(
469 'allocated_capacity_gb', 0)
470 self.qos = capability.get('qos', False)
471 self.reserved_percentage = capability['reserved_percentage']
472 self.reserved_snapshot_percentage = (
473 capability['reserved_snapshot_percentage'])
474 self.reserved_share_extend_percentage = (
475 capability['reserved_share_extend_percentage'])
476 self.thin_provisioning = scheduler_utils.thin_provisioning(
477 capability.get('thin_provisioning', False))
478 # NOTE(xyang): provisioned_capacity_gb is the apparent total
479 # capacity of all the shares created on a backend, which is
480 # greater than or equal to allocated_capacity_gb, which is the
481 # apparent total capacity of all the shares created on a backend
482 # in Manila.
483 # NOTE(nidhimittalhada): If 'provisioned_capacity_gb' is not set,
484 # then calculating 'provisioned_capacity_gb' from share sizes
485 # on host, as per information available in manila database.
486 # NOTE(jose-castro-leon): Only calculate provisioned_capacity_gb
487 # on thin provisioned pools
488 self.provisioned_capacity_gb = capability.get(
489 'provisioned_capacity_gb')
491 if self.thin_provisioning and self.provisioned_capacity_gb is None:
492 self.provisioned_capacity_gb = (
493 self._estimate_provisioned_capacity(self.host,
494 context=context))
496 self.max_over_subscription_ratio = capability.get(
497 'max_over_subscription_ratio',
498 CONF.max_over_subscription_ratio)
499 self.dedupe = capability.get(
500 'dedupe', False)
501 self.compression = capability.get(
502 'compression', False)
503 self.replication_type = capability.get(
504 'replication_type', self.replication_type)
505 self.replication_domain = capability.get(
506 'replication_domain')
507 self.sg_consistent_snapshot_support = capability.get(
508 'sg_consistent_snapshot_support')
509 self.security_service_update_support = capability.get(
510 'security_service_update_support', False)
511 self.network_allocation_update_support = capability.get(
512 'network_allocation_update_support', False)
513 self.share_server_multiple_subnet_support = capability.get(
514 'share_server_multiple_subnet_support', False)
515 self.share_replicas_migration_support = capability.get(
516 'share_replicas_migration_support', False)
517 self.encryption_support = capability.get('encryption_support')
519 def update_pools(self, capability):
520 # Do nothing, since we don't have pools within pool, yet
521 pass
524class HostManager(object):
525 """Base HostManager class."""
527 host_state_cls = HostState
529 def __init__(self):
530 self.service_states = {} # { <host>: {<service>: {cap k : v}}}
531 self.host_state_map = {}
532 self.filter_handler = base_host_filter.HostFilterHandler(
533 'manila.scheduler.filters')
534 self.filter_classes = self.filter_handler.get_all_classes()
535 self.weight_handler = base_host_weigher.HostWeightHandler(
536 'manila.scheduler.weighers')
537 self.weight_classes = self.weight_handler.get_all_classes()
539 def _choose_host_filters(self, filter_cls_names):
540 """Choose acceptable filters.
542 Since the caller may specify which filters to use we need to
543 have an authoritative list of what is permissible. This
544 function checks the filter names against a predefined set of
545 acceptable filters.
546 """
547 if filter_cls_names is None:
548 filter_cls_names = CONF.scheduler_default_filters
549 if not isinstance(filter_cls_names, (list, tuple)): 549 ↛ 550line 549 didn't jump to line 550 because the condition on line 549 was never true
550 filter_cls_names = [filter_cls_names]
551 good_filters = []
552 bad_filters = []
553 for filter_name in filter_cls_names:
554 found_class = False
555 for cls in self.filter_classes:
556 if cls.__name__ == filter_name:
557 found_class = True
558 good_filters.append(cls)
559 break
560 if not found_class:
561 bad_filters.append(filter_name)
562 if bad_filters:
563 msg = ", ".join(bad_filters)
564 raise exception.SchedulerHostFilterNotFound(filter_name=msg)
565 return good_filters
567 def _choose_host_weighers(self, weight_cls_names):
568 """Choose acceptable weighers.
570 Since the caller may specify which weighers to use, we need to
571 have an authoritative list of what is permissible. This
572 function checks the weigher names against a predefined set of
573 acceptable weighers.
574 """
575 if weight_cls_names is None: 575 ↛ 577line 575 didn't jump to line 577 because the condition on line 575 was always true
576 weight_cls_names = CONF.scheduler_default_weighers
577 if not isinstance(weight_cls_names, (list, tuple)): 577 ↛ 578line 577 didn't jump to line 578 because the condition on line 577 was never true
578 weight_cls_names = [weight_cls_names]
580 good_weighers = []
581 bad_weighers = []
582 for weigher_name in weight_cls_names:
583 found_class = False
584 for cls in self.weight_classes: 584 ↛ 589line 584 didn't jump to line 589 because the loop on line 584 didn't complete
585 if cls.__name__ == weigher_name:
586 good_weighers.append(cls)
587 found_class = True
588 break
589 if not found_class: 589 ↛ 590line 589 didn't jump to line 590 because the condition on line 589 was never true
590 bad_weighers.append(weigher_name)
591 if bad_weighers: 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true
592 msg = ", ".join(bad_weighers)
593 raise exception.SchedulerHostWeigherNotFound(weigher_name=msg)
594 return good_weighers
596 def get_filtered_hosts(self, hosts, filter_properties,
597 filter_class_names=None):
598 """Filter hosts and return only ones passing all filters."""
599 filter_classes = self._choose_host_filters(filter_class_names)
600 return self.filter_handler.get_filtered_objects(filter_classes,
601 hosts,
602 filter_properties)
604 def get_weighed_hosts(self, hosts, weight_properties,
605 weigher_class_names=None):
606 """Weigh the hosts."""
607 weigher_classes = self._choose_host_weighers(weigher_class_names)
608 weight_properties['server_pools_mapping'] = {}
609 for backend, info in self.service_states.items():
610 weight_properties['server_pools_mapping'].update(
611 info.get('server_pools_mapping', {}))
612 return self.weight_handler.get_weighed_objects(weigher_classes,
613 hosts,
614 weight_properties)
616 def update_service_capabilities(self, service_name, host,
617 capabilities, timestamp):
618 """Update the per-service capabilities based on this notification."""
619 if service_name not in ('share',): 619 ↛ 620line 619 didn't jump to line 620 because the condition on line 619 was never true
620 LOG.debug('Ignoring %(service_name)s service update '
621 'from %(host)s',
622 {'service_name': service_name, 'host': host})
623 return
625 # Copy the capabilities, so we don't modify the original dict
626 capability_copy = dict(capabilities)
627 timestamp = timestamp or timeutils.utcnow()
628 capability_copy["timestamp"] = timestamp # Reported time
630 capab_old = self.service_states.get(host, {"timestamp": 0})
631 # Ignore older updates
632 if capab_old['timestamp'] and timestamp < capab_old['timestamp']: 632 ↛ 633line 632 didn't jump to line 633 because the condition on line 632 was never true
633 LOG.info('Ignoring old capability report from %s.', host)
634 return
636 self.service_states[host] = capability_copy
638 LOG.debug("Received %(service_name)s service update from "
639 "%(host)s: %(cap)s",
640 {'service_name': service_name, 'host': host,
641 'cap': capabilities})
643 def _update_host_state_map(self, context, consider_disabled=False):
644 # Get resource usage across the available share nodes:
645 topic = CONF.share_topic
646 share_services = db.service_get_all_by_topic(
647 context,
648 topic,
649 consider_disabled=consider_disabled,
650 )
652 active_hosts = set()
653 for service in share_services:
654 host = service['host']
656 # Warn about down services and remove them from host_state_map
657 is_down = not utils.service_is_up(service)
658 is_disabled = (not consider_disabled and service['disabled'])
659 if is_down or is_disabled:
660 LOG.warning("Share service is down. (host: %s).", host)
661 continue
663 # Create and register host_state if not in host_state_map
664 capabilities = self.service_states.get(host, None)
665 host_state = self.host_state_map.get(host)
666 if not host_state:
667 host_state = self.host_state_cls(
668 host,
669 capabilities=capabilities,
670 service=dict(service.items()))
671 self.host_state_map[host] = host_state
673 # Update capabilities and attributes in host_state
674 host_state.update_from_share_capability(
675 capabilities, service=dict(service.items()), context=context)
676 active_hosts.add(host)
678 # remove non-active hosts from host_state_map
679 nonactive_hosts = set(self.host_state_map.keys()) - active_hosts
680 for host in nonactive_hosts:
681 LOG.info("Removing non-active host: %(host)s from "
682 "scheduler cache.", {'host': host})
683 self.host_state_map.pop(host, None)
685 def get_all_host_states_share(self, context, consider_disabled=False):
686 """Returns a dict of all the hosts the HostManager knows about.
688 Each of the consumable resources in HostState are
689 populated with capabilities scheduler received from RPC.
691 For example:
692 {'192.168.1.100': HostState(), ...}
693 """
695 self._update_host_state_map(
696 context,
697 consider_disabled=consider_disabled,
698 )
700 # Build a pool_state map and return that map instead of host_state_map
701 all_pools = {}
702 for host, state in self.host_state_map.items():
703 for key in state.pools:
704 pool = state.pools[key]
705 # Use host.pool_name to make sure key is unique
706 pool_key = '.'.join([host, pool.pool_name])
707 all_pools[pool_key] = pool
709 return all_pools.values()
711 def get_pools(self, context, filters=None, cached=False):
712 """Returns a dict of all pools on all hosts HostManager knows about."""
713 if not cached or not self.host_state_map: 713 ↛ 716line 713 didn't jump to line 716 because the condition on line 713 was always true
714 self._update_host_state_map(context)
716 all_pools = []
717 for host, host_state in self.host_state_map.items():
718 for pool in host_state.pools.values():
720 fully_qualified_pool_name = share_utils.append_host(
721 host, pool.pool_name)
722 host_name = share_utils.extract_host(
723 fully_qualified_pool_name, level='host')
724 backend_name = (share_utils.extract_host(
725 fully_qualified_pool_name, level='backend').split('@')[1]
726 if '@' in fully_qualified_pool_name else None)
727 pool_name = share_utils.extract_host(
728 fully_qualified_pool_name, level='pool')
730 new_pool = {
731 'name': fully_qualified_pool_name,
732 'host': host_name,
733 'backend': backend_name,
734 'pool': pool_name,
735 'capabilities': pool.capabilities,
736 }
737 if self._passes_filters(new_pool, filters):
738 all_pools.append(new_pool)
739 return all_pools
741 def _passes_filters(self, dict_to_check, filter_dict):
742 """Applies a set of regex filters to a dictionary.
744 If no filter keys are supplied, the data passes unfiltered and
745 the method returns True. Otherwise, each key in the filter
746 (filter_dict) must be present in the data (dict_to_check)
747 and the filter values are applied as regex expressions to
748 the data values. If any of the filter values fail to match
749 their corresponding data values, the method returns False.
750 But if all filters match, the method returns True.
751 """
752 if not filter_dict:
753 return True
755 for filter_key, filter_value in filter_dict.items():
756 if filter_key not in dict_to_check:
757 return False
758 if filter_key == 'capabilities':
759 if not scheduler_utils.capabilities_satisfied(
760 dict_to_check.get(filter_key), filter_value):
761 return False
762 elif not re.match(filter_value, dict_to_check.get(filter_key)):
763 return False
765 return True