Coverage for manila/scheduler/drivers/filter.py: 86%
265 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2011 Intel Corporation
2# Copyright (c) 2011 OpenStack, LLC.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
17"""
18The FilterScheduler is for scheduling of share and share group creation.
19You can customize this scheduler by specifying your own share/share group
20filters and weighing functions.
21"""
23from oslo_config import cfg
24from oslo_log import log
26from manila import exception
27from manila.i18n import _
28from manila.message import api as message_api
29from manila.message import message_field
30from manila import policy
31from manila.scheduler.drivers import base
32from manila.scheduler import scheduler_options
33from manila.share import share_types
35CONF = cfg.CONF
36LOG = log.getLogger(__name__)
38AFFINITY_HINT = 'same_host'
39ANTI_AFFINITY_HINT = 'different_host'
40AFFINITY_KEY = "__affinity_same_host"
41ANTI_AFFINITY_KEY = "__affinity_different_host"
44class FilterScheduler(base.Scheduler):
45 """Scheduler that can be used for filtering and weighing."""
46 def __init__(self, *args, **kwargs):
47 super(FilterScheduler, self).__init__(*args, **kwargs)
48 self.cost_function_cache = None
49 self.options = scheduler_options.SchedulerOptions()
50 self.max_attempts = self._max_attempts()
51 self.message_api = message_api.API()
53 def _get_configuration_options(self):
54 """Fetch options dictionary. Broken out for testing."""
55 return self.options.get_configuration()
57 def get_pools(self, context, filters, cached):
58 return self.host_manager.get_pools(context, filters, cached)
60 def _post_select_populate_filter_properties(self, filter_properties,
61 host_state):
62 """Add additional information to filter properties.
64 Add additional information to the filter properties after a host has
65 been selected by the scheduling process.
66 """
67 # Add a retry entry for the selected volume backend:
68 self._add_retry_host(filter_properties, host_state.host)
70 def _add_retry_host(self, filter_properties, host):
71 """Add retry entry for the selected volume backend.
73 In the event that the request gets re-scheduled, this entry
74 will signal that the given backend has already been tried.
75 """
76 retry = filter_properties.get('retry')
77 if not retry:
78 return
79 hosts = retry['hosts']
80 hosts.append(host)
82 def _max_attempts(self):
83 max_attempts = CONF.scheduler_max_attempts
84 if max_attempts < 1:
85 msg = _("Invalid value for 'scheduler_max_attempts', "
86 "must be >=1")
87 raise exception.InvalidParameterValue(err=msg)
88 return max_attempts
90 def schedule_create_share(self, context, request_spec, filter_properties):
91 weighed_host = self._schedule_share(context,
92 request_spec,
93 filter_properties)
95 host = weighed_host.obj.host
96 share_id = request_spec['share_id']
97 snapshot_id = request_spec['snapshot_id']
99 updated_share = base.share_update_db(context, share_id, host)
100 self._post_select_populate_filter_properties(filter_properties,
101 weighed_host.obj)
103 # context is not serializable
104 filter_properties.pop('context', None)
106 self.share_rpcapi.create_share_instance(
107 context, updated_share.instance, host,
108 request_spec=request_spec,
109 filter_properties=filter_properties,
110 snapshot_id=snapshot_id
111 )
113 def schedule_create_replica(self, context, request_spec,
114 filter_properties):
115 share_replica_id = request_spec['share_instance_properties'].get('id')
117 weighed_host = self._schedule_share(
118 context, request_spec, filter_properties)
120 host = weighed_host.obj.host
122 updated_share_replica = base.share_replica_update_db(
123 context, share_replica_id, host)
124 self._post_select_populate_filter_properties(filter_properties,
125 weighed_host.obj)
127 # context is not serializable
128 filter_properties.pop('context', None)
130 self.share_rpcapi.create_share_replica(
131 context, updated_share_replica, host, request_spec=request_spec,
132 filter_properties=filter_properties)
134 def _format_filter_properties(self, context, filter_properties,
135 request_spec):
137 elevated = context.elevated()
139 share_properties = request_spec['share_properties']
140 share_instance_properties = (request_spec.get(
141 'share_instance_properties', {}))
142 share_proto = request_spec.get('share_proto',
143 share_properties.get('share_proto'))
145 resource_properties = share_properties.copy()
146 resource_properties.update(share_instance_properties.copy())
147 share_type = request_spec.get("share_type", {})
148 if not share_type:
149 msg = _("You must create a share type in advance,"
150 " and specify in request body or"
151 " set default_share_type in manila.conf.")
152 LOG.error(msg)
153 self.message_api.create(
154 context,
155 message_field.Action.CREATE,
156 context.project_id,
157 resource_type=message_field.Resource.SHARE,
158 resource_id=request_spec.get('share_id', None),
159 detail=message_field.Detail.NO_DEFAULT_SHARE_TYPE)
160 raise exception.InvalidParameterValue(err=msg)
162 share_type['extra_specs'] = share_type.get('extra_specs') or {}
164 if share_type['extra_specs']:
165 for extra_spec_name in share_types.get_boolean_extra_specs():
166 extra_spec = share_type['extra_specs'].get(extra_spec_name)
167 if extra_spec is not None:
168 if not extra_spec.startswith("<is>"):
169 extra_spec = "<is> %s" % extra_spec
170 share_type['extra_specs'][extra_spec_name] = extra_spec
172 storage_protocol_spec = (
173 share_type['extra_specs'].get('storage_protocol')
174 )
175 if storage_protocol_spec is None and share_proto is not None:
176 # a host can report multiple protocols as "storage_protocol"
177 spec_value = "<in> %s" % share_proto
178 share_type['extra_specs']['storage_protocol'] = spec_value
180 resource_type = share_type
181 request_spec.update({'resource_properties': resource_properties})
183 config_options = self._get_configuration_options()
185 share_group = request_spec.get('share_group')
187 # NOTE(gouthamr): If 'active_replica_host' or 'snapshot_host' is
188 # present in the request spec, pass that host's 'replication_domain' to
189 # the ShareReplication and CreateFromSnapshot filters.
190 active_replica_host = request_spec.get('active_replica_host')
191 snapshot_host = request_spec.get('snapshot_host')
192 allowed_hosts = []
193 if active_replica_host:
194 allowed_hosts.append(active_replica_host)
195 if snapshot_host: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 allowed_hosts.append(snapshot_host)
197 replication_domain = None
198 if active_replica_host or snapshot_host:
199 temp_hosts = self.host_manager.get_all_host_states_share(elevated)
200 matching_host = next((host for host in temp_hosts
201 if host.host in allowed_hosts), None)
202 if matching_host:
203 replication_domain = matching_host.replication_domain
205 # NOTE(zengyingzhe): remove the 'share_backend_name' extra spec,
206 # let scheduler choose the available host for this replica or
207 # snapshot clone creation request.
208 share_type.get('extra_specs', {}).pop('share_backend_name', None)
210 if filter_properties is None: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 filter_properties = {}
212 self._populate_retry_share(filter_properties, resource_properties)
214 filter_properties.update({'context': context,
215 'request_spec': request_spec,
216 'config_options': config_options,
217 'share_type': share_type,
218 'resource_type': resource_type,
219 'share_group': share_group,
220 'replication_domain': replication_domain,
221 })
223 self.populate_filter_properties_share(context, request_spec,
224 filter_properties)
226 return filter_properties, share_properties
228 def _schedule_share(self, context, request_spec, filter_properties=None):
229 """Returns a list of hosts that meet the required specs.
231 The list is ordered by their fitness.
232 """
233 elevated = context.elevated()
235 filter_properties, share_properties = self._format_filter_properties(
236 context, filter_properties, request_spec)
238 # Find our local list of acceptable hosts by filtering and
239 # weighing our options. we virtually consume resources on
240 # it so subsequent selections can adjust accordingly.
242 # Note: remember, we are using an iterator here. So only
243 # traverse this list once.
244 consider_disabled = False
245 if policy.check_is_host_admin(context) and filter_properties.get(
246 'scheduler_hints', {}).get('only_host'):
247 # Admin user can schedule share on disabled host
248 consider_disabled = True
250 hosts = self.host_manager.get_all_host_states_share(
251 elevated,
252 consider_disabled=consider_disabled
253 )
254 if not hosts:
255 msg = _("There are no hosts to fulfill this "
256 "provisioning request. Are share "
257 "backend services down?")
258 self.message_api.create(
259 context,
260 message_field.Action.CREATE,
261 context.project_id,
262 resource_type=message_field.Resource.SHARE,
263 resource_id=request_spec.get('share_id', None),
264 detail=message_field.Detail.SHARE_BACKEND_NOT_READY_YET)
265 raise exception.WillNotSchedule(msg)
267 # Filter local hosts based on requirements ...
268 hosts, last_filter = self.host_manager.get_filtered_hosts(
269 hosts, filter_properties)
271 if not hosts:
272 msg = _('Failed to find a weighted host, the last executed filter'
273 ' was %s.')
274 raise exception.NoValidHost(
275 reason=msg % last_filter,
276 detail_data={'last_filter': last_filter})
278 LOG.debug("Filtered share %(hosts)s", {"hosts": hosts})
279 # weighted_host = WeightedHost() ... the best
280 # host for the job.
281 weighed_hosts = self.host_manager.get_weighed_hosts(hosts,
282 filter_properties)
283 best_host = weighed_hosts[0]
284 LOG.debug("Choosing for share: %(best_host)s",
285 {"best_host": best_host})
286 # NOTE(rushiagr): updating the available space parameters at same place
287 best_host.obj.consume_from_share(share_properties)
288 return best_host
290 def _populate_retry_share(self, filter_properties, properties):
291 """Populate filter properties with retry history.
293 Populate filter properties with history of retries for this
294 request. If maximum retries is exceeded, raise NoValidHost.
295 """
296 max_attempts = self.max_attempts
297 retry = filter_properties.pop('retry', {})
299 if max_attempts == 1:
300 # re-scheduling is disabled.
301 return
303 # retry is enabled, update attempt count:
304 if retry:
305 retry['num_attempts'] += 1
306 else:
307 retry = {
308 'num_attempts': 1,
309 'hosts': [] # list of share service hosts tried
310 }
311 filter_properties['retry'] = retry
313 share_id = properties.get('share_id')
314 self._log_share_error(share_id, retry)
316 if retry['num_attempts'] > max_attempts:
317 msg = _("Exceeded max scheduling attempts %(max_attempts)d for "
318 "share %(share_id)s") % {
319 "max_attempts": max_attempts,
320 "share_id": share_id
321 }
322 raise exception.NoValidHost(reason=msg)
324 def _log_share_error(self, share_id, retry):
325 """Log any exceptions from a previous share create operation.
327 If the request contained an exception from a previous share
328 create operation, log it to aid debugging.
329 """
330 exc = retry.pop('exc', None) # string-ified exception from share
331 if not exc: 331 ↛ 334line 331 didn't jump to line 334 because the condition on line 331 was always true
332 return # no exception info from a previous attempt, skip
334 hosts = retry.get('hosts')
335 if not hosts:
336 return # no previously attempted hosts, skip
338 last_host = hosts[-1]
339 LOG.error("Error scheduling %(share_id)s from last share-service: "
340 "%(last_host)s : %(exc)s", {
341 "share_id": share_id,
342 "last_host": last_host,
343 "exc": "exc"
344 })
346 def _populate_scheduler_hint(self, request_spec, hints, key, hint):
347 share_properties = request_spec.get('share_properties', {})
348 value = share_properties.get('metadata', {}).get(key, None)
349 if value: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true
350 hints.update({hint: value})
352 def populate_filter_properties_scheduler_hints(self, context, request_spec,
353 filter_properties):
354 share_id = request_spec.get('share_id', None)
355 if not share_id:
356 filter_properties['scheduler_hints'] = {}
357 return
358 else:
359 if filter_properties.get('scheduler_hints', None):
360 return
361 hints = {}
362 self._populate_scheduler_hint(request_spec, hints,
363 AFFINITY_KEY,
364 AFFINITY_HINT)
365 self._populate_scheduler_hint(request_spec, hints,
366 ANTI_AFFINITY_KEY,
367 ANTI_AFFINITY_HINT)
368 filter_properties['scheduler_hints'] = hints
370 def populate_filter_properties_share(self, context, request_spec,
371 filter_properties):
372 """Stuff things into filter_properties.
374 Can be overridden in a subclass to add more data.
375 """
376 shr = request_spec['share_properties']
377 inst = request_spec['share_instance_properties']
378 filter_properties['size'] = shr['size']
379 filter_properties['availability_zone_id'] = (
380 inst.get('availability_zone_id')
381 )
382 filter_properties['user_id'] = shr.get('user_id')
383 filter_properties['metadata'] = shr.get('metadata')
384 filter_properties['snapshot_id'] = shr.get('snapshot_id')
385 filter_properties['is_share_extend'] = (
386 request_spec.get('is_share_extend')
387 )
388 self.populate_filter_properties_scheduler_hints(context, request_spec,
389 filter_properties)
391 def schedule_create_share_group(self, context, share_group_id,
392 request_spec, filter_properties):
394 LOG.info("Scheduling share group %s.", share_group_id)
395 host = self._get_best_host_for_share_group(context, request_spec)
397 if not host:
398 msg = _("No hosts available for share group %s.") % share_group_id
399 raise exception.NoValidHost(reason=msg)
401 msg = "Chose host %(host)s for create_share_group %(group)s."
402 LOG.info(msg, {'host': host, 'group': share_group_id})
404 updated_share_group = base.share_group_update_db(
405 context, share_group_id, host)
407 self.share_rpcapi.create_share_group(
408 context, updated_share_group, host)
410 def _get_weighted_hosts_for_share_type(self, context, request_spec,
411 share_type):
412 config_options = self._get_configuration_options()
413 # NOTE(ameade): Find our local list of acceptable hosts by
414 # filtering and weighing our options. We virtually consume
415 # resources on it so subsequent selections can adjust accordingly.
417 # NOTE(ameade): Remember, we are using an iterator here. So only
418 # traverse this list once.
419 all_hosts = self.host_manager.get_all_host_states_share(context)
421 if not all_hosts:
422 return []
424 share_type['extra_specs'] = share_type.get('extra_specs', {})
426 if share_type['extra_specs']: 426 ↛ 434line 426 didn't jump to line 434 because the condition on line 426 was always true
427 for spec_name in share_types.get_required_extra_specs():
428 extra_spec = share_type['extra_specs'].get(spec_name)
430 if extra_spec is not None: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true
431 share_type['extra_specs'][spec_name] = (
432 "<is> %s" % extra_spec)
434 filter_properties = {
435 'context': context,
436 'request_spec': request_spec,
437 'config_options': config_options,
438 'share_type': share_type,
439 'resource_type': share_type,
440 'size': 0,
441 }
442 # Filter local hosts based on requirements ...
443 hosts, last_filter = self.host_manager.get_filtered_hosts(
444 all_hosts, filter_properties)
446 if not hosts:
447 return []
449 LOG.debug("Filtered %s", hosts)
451 # weighted_host = WeightedHost() ... the best host for the job.
452 weighed_hosts = self.host_manager.get_weighed_hosts(
453 hosts,
454 filter_properties)
455 if not weighed_hosts: 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true
456 return []
458 return weighed_hosts
460 def _get_weighted_hosts_for_share_group_type(self, context, request_spec,
461 share_group_type):
462 config_options = self._get_configuration_options()
463 all_hosts = self.host_manager.get_all_host_states_share(context)
465 if not all_hosts:
466 return []
468 filter_properties = {
469 'context': context,
470 'request_spec': request_spec,
471 'config_options': config_options,
472 'share_group_type': share_group_type,
473 'resource_type': share_group_type,
474 }
476 hosts, last_filter = self.host_manager.get_filtered_hosts(
477 all_hosts,
478 filter_properties,
479 CONF.scheduler_default_share_group_filters)
481 if not hosts: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true
482 return []
484 LOG.debug("Filtered %s", hosts)
486 weighed_hosts = self.host_manager.get_weighed_hosts(
487 hosts,
488 filter_properties)
489 if not weighed_hosts: 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true
490 return []
492 return weighed_hosts
494 def _get_weighted_candidates_share_group(self, context, request_spec):
495 """Finds hosts that support the share group.
497 Returns a list of hosts that meet the required specs,
498 ordered by their fitness.
499 """
500 elevated = context.elevated()
502 shr_types = request_spec.get("share_types")
504 weighed_hosts = []
506 for iteration_count, share_type in enumerate(shr_types):
507 temp_weighed_hosts = self._get_weighted_hosts_for_share_type(
508 elevated, request_spec, share_type)
510 # NOTE(ameade): Take the intersection of hosts so we have one that
511 # can support all share types of the share group
512 if iteration_count == 0: 512 ↛ 515line 512 didn't jump to line 515 because the condition on line 512 was always true
513 weighed_hosts = temp_weighed_hosts
514 else:
515 new_weighed_hosts = []
516 for host1 in weighed_hosts:
517 for host2 in temp_weighed_hosts:
518 if host1.obj.host == host2.obj.host:
519 new_weighed_hosts.append(host1)
520 weighed_hosts = new_weighed_hosts
521 if not weighed_hosts:
522 return []
524 # NOTE(ameade): Ensure the hosts support the share group type
525 share_group_type = request_spec.get("resource_type", {})
526 temp_weighed_group_hosts = (
527 self._get_weighted_hosts_for_share_group_type(
528 elevated, request_spec, share_group_type))
529 new_weighed_hosts = []
530 for host1 in weighed_hosts:
531 for host2 in temp_weighed_group_hosts:
532 if host1.obj.host == host2.obj.host:
533 new_weighed_hosts.append(host1)
534 weighed_hosts = new_weighed_hosts
536 return weighed_hosts
538 def _get_best_host_for_share_group(self, context, request_spec):
539 weighed_hosts = self._get_weighted_candidates_share_group(
540 context,
541 request_spec)
543 if not weighed_hosts: 543 ↛ 545line 543 didn't jump to line 545 because the condition on line 543 was always true
544 return None
545 return weighed_hosts[0].obj.host
547 def host_passes_filters(self, context, host, request_spec,
548 filter_properties):
550 elevated = context.elevated()
552 filter_properties, share_properties = self._format_filter_properties(
553 context, filter_properties, request_spec)
555 hosts = self.host_manager.get_all_host_states_share(elevated)
556 filter_class_names = None
557 if request_spec.get('is_share_extend', None): 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true
558 filter_class_names = CONF.scheduler_default_extend_filters
559 hosts, last_filter = self.host_manager.get_filtered_hosts(
560 hosts, filter_properties,
561 filter_class_names=filter_class_names)
562 hosts = self.host_manager.get_weighed_hosts(hosts, filter_properties)
564 for tgt_host in hosts:
565 if tgt_host.obj.host == host:
566 return tgt_host.obj
568 msg = (_('Cannot place share %(id)s on %(host)s, the last executed'
569 ' filter was %(last_filter)s.')
570 % {'id': request_spec['share_id'], 'host': host,
571 'last_filter': last_filter})
572 raise exception.NoValidHost(reason=msg)