Coverage for manila/scheduler/manager.py: 89%
168 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2010 OpenStack, LLC.
2# Copyright 2010 United States Government as represented by the
3# Administrator of the National Aeronautics and Space Administration.
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
18"""
19Scheduler Service
20"""
22from datetime import datetime
24from oslo_config import cfg
25from oslo_log import log
26from oslo_service import periodic_task
27from oslo_utils import excutils
28from oslo_utils import importutils
29from oslo_utils import timeutils
31from manila.common import constants
32from manila import context
33from manila import coordination
34from manila import db
35from manila import exception
36from manila import manager
37from manila.message import api as message_api
38from manila.message import message_field
39from manila import quota
40from manila import rpc
41from manila.share import rpcapi as share_rpcapi
42from manila.share import share_types
43from manila import utils
45LOG = log.getLogger(__name__)
46QUOTAS = quota.QUOTAS
48scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
49 default='manila.scheduler.drivers.'
50 'filter.FilterScheduler',
51 help='Default scheduler driver to use.')
53CONF = cfg.CONF
54CONF.register_opt(scheduler_driver_opt)
56# Drivers that need to change module paths or class names can add their
57# old/new path here to maintain backward compatibility.
58MAPPING = {
59 'manila.scheduler.chance.ChanceScheduler':
60 'manila.scheduler.drivers.chance.ChanceScheduler',
61 'manila.scheduler.filter_scheduler.FilterScheduler':
62 'manila.scheduler.drivers.filter.FilterScheduler',
63 'manila.scheduler.simple.SimpleScheduler':
64 'manila.scheduler.drivers.simple.SimpleScheduler',
65}
68class SchedulerManager(manager.Manager):
69 """Chooses a host to create shares."""
71 RPC_API_VERSION = '1.11'
73 def __init__(self, scheduler_driver=None, service_name=None,
74 *args, **kwargs):
76 if not scheduler_driver:
77 scheduler_driver = CONF.scheduler_driver
78 if scheduler_driver in MAPPING:
79 msg_args = {
80 'old': scheduler_driver,
81 'new': MAPPING[scheduler_driver],
82 }
83 LOG.warning("Scheduler driver path %(old)s is deprecated, "
84 "update your configuration to the new path "
85 "%(new)s", msg_args)
86 scheduler_driver = MAPPING[scheduler_driver]
88 self.driver = importutils.import_object(scheduler_driver)
89 self.message_api = message_api.API()
90 super(SchedulerManager, self).__init__(*args, **kwargs)
91 self.service_id = None
93 def init_host_with_rpc(self, service_id=None):
94 self.service_id = service_id
95 ctxt = context.get_admin_context()
96 self.request_service_capabilities(ctxt)
98 def get_host_list(self, context):
99 """Get a list of hosts from the HostManager."""
100 return self.driver.get_host_list()
102 def get_service_capabilities(self, context):
103 """Get the normalized set of capabilities for this zone."""
104 return self.driver.get_service_capabilities()
106 def update_service_capabilities(self, context, service_name=None,
107 host=None, capabilities=None,
108 timestamp=None, **kwargs):
109 """Process a capability update from a service node."""
110 if capabilities is None:
111 capabilities = {}
112 elif timestamp: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true
113 timestamp = datetime.strptime(timestamp,
114 timeutils.PERFECT_TIME_FORMAT)
115 self.driver.update_service_capabilities(service_name,
116 host,
117 capabilities,
118 timestamp)
120 def create_share_instance(self, context, request_spec=None,
121 filter_properties=None):
122 try:
123 self.driver.schedule_create_share(context, request_spec,
124 filter_properties)
125 except exception.NoValidHost as ex:
126 self._set_share_state_and_notify(
127 'create_share', {'status': constants.STATUS_ERROR},
128 context, ex, request_spec,
129 message_field.Action.ALLOCATE_HOST)
130 except Exception as ex:
131 with excutils.save_and_reraise_exception():
132 self._set_share_state_and_notify(
133 'create_share', {'status': constants.STATUS_ERROR},
134 context, ex, request_spec)
136 def get_pools(self, context, filters=None, cached=False):
137 """Get active pools from the scheduler's cache."""
138 return self.driver.get_pools(context, filters, cached)
140 def manage_share(self, context, share_id, driver_options, request_spec,
141 filter_properties=None):
142 """Ensure that the host exists and can accept the share."""
144 def _manage_share_set_error(self, context, ex, request_spec):
145 # NOTE(haixin) if failed to scheduler backend for manage share,
146 # and we do not commit quota usages here, so we should set size 0
147 # because we don't know the real size of the size, and we will
148 # skip quota cuts when unmanage share with manage_error status.
149 self._set_share_state_and_notify(
150 'manage_share',
151 {'status': constants.STATUS_MANAGE_ERROR, 'size': 0},
152 context, ex, request_spec)
154 share_ref = db.share_get(context, share_id)
156 try:
157 self.driver.host_passes_filters(
158 context, share_ref['host'], request_spec, filter_properties)
159 except Exception as ex:
160 with excutils.save_and_reraise_exception():
161 _manage_share_set_error(self, context, ex, request_spec)
162 else:
163 share_rpcapi.ShareAPI().manage_share(context, share_ref,
164 driver_options)
166 def migrate_share_to_host(
167 self, context, share_id, host, force_host_assisted_migration,
168 preserve_metadata, writable, nondisruptive, preserve_snapshots,
169 new_share_network_id, new_share_type_id, request_spec,
170 filter_properties=None):
171 """Ensure that the host exists and can accept the share."""
173 share_ref = db.share_get(context, share_id)
175 def _migrate_share_set_error(self, context, ex, request_spec):
176 instance = next((x for x in share_ref.instances
177 if x['status'] == constants.STATUS_MIGRATING),
178 None)
179 if instance:
180 db.share_instance_update(
181 context, instance['id'],
182 {'status': constants.STATUS_AVAILABLE})
183 self._set_share_state_and_notify(
184 'migrate_share_to_host',
185 {'task_state': constants.TASK_STATE_MIGRATION_ERROR},
186 context, ex, request_spec)
187 share_types.revert_allocated_share_type_quotas_during_migration(
188 context, share_ref, new_share_type_id)
190 try:
191 tgt_host = self.driver.host_passes_filters(
192 context, host, request_spec, filter_properties)
194 except Exception as ex:
195 with excutils.save_and_reraise_exception():
196 _migrate_share_set_error(self, context, ex, request_spec)
197 else:
199 try:
200 share_rpcapi.ShareAPI().migration_start(
201 context, share_ref, tgt_host.host,
202 force_host_assisted_migration, preserve_metadata, writable,
203 nondisruptive, preserve_snapshots, new_share_network_id,
204 new_share_type_id)
205 except Exception as ex:
206 with excutils.save_and_reraise_exception():
207 _migrate_share_set_error(self, context, ex, request_spec)
209 def _set_share_state_and_notify(self, method, state, context, ex,
210 request_spec, action=None):
212 LOG.error("Failed to schedule %(method)s: %(ex)s",
213 {"method": method, "ex": ex})
215 properties = request_spec.get('share_properties', {})
217 share_id = request_spec.get('share_id', None)
219 if share_id:
220 db.share_update(context, share_id, state)
222 if action:
223 self.message_api.create(
224 context, action, context.project_id,
225 resource_type=message_field.Resource.SHARE,
226 resource_id=share_id, exception=ex)
228 payload = dict(request_spec=request_spec,
229 share_properties=properties,
230 share_id=share_id,
231 state=state,
232 method=method,
233 reason=ex)
235 rpc.get_notifier("scheduler").error(
236 context, 'scheduler.' + method, payload)
238 def request_service_capabilities(self, context):
239 share_rpcapi.ShareAPI().publish_service_capabilities(context)
241 def _set_share_group_error_state(self, method, context, ex,
242 request_spec, action=None):
243 LOG.warning("Failed to schedule_%(method)s: %(ex)s",
244 {"method": method, "ex": ex})
246 share_group_state = {'status': constants.STATUS_ERROR}
248 share_group_id = request_spec.get('share_group_id')
250 if share_group_id: 250 ↛ 253line 250 didn't jump to line 253 because the condition on line 250 was always true
251 db.share_group_update(context, share_group_id, share_group_state)
253 if action:
254 self.message_api.create(
255 context, action, context.project_id,
256 resource_type=message_field.Resource.SHARE_GROUP,
257 resource_id=share_group_id, exception=ex)
259 @periodic_task.periodic_task(spacing=600, run_immediately=True)
260 def _expire_reservations(self, context):
261 quota.QUOTAS.expire(context)
263 def create_share_group(self, context, share_group_id, request_spec=None,
264 filter_properties=None):
265 try:
266 self.driver.schedule_create_share_group(
267 context, share_group_id, request_spec, filter_properties)
268 except exception.NoValidHost as ex:
269 self._set_share_group_error_state(
270 'create_share_group', context, ex, request_spec,
271 message_field.Action.ALLOCATE_HOST)
272 except Exception as ex:
273 with excutils.save_and_reraise_exception():
274 self._set_share_group_error_state(
275 'create_share_group', context, ex, request_spec)
277 def _set_share_replica_error_state(self, context, method, exc,
278 request_spec, action=None):
280 LOG.warning("Failed to schedule_%(method)s: %(exc)s",
281 {'method': method, 'exc': exc})
282 status_updates = {
283 'status': constants.STATUS_ERROR,
284 'replica_state': constants.STATUS_ERROR,
285 }
286 share_replica_id = request_spec.get(
287 'share_instance_properties').get('id')
289 # Set any snapshot instances to 'error'.
290 replica_snapshots = db.share_snapshot_instance_get_all_with_filters(
291 context, {'share_instance_ids': share_replica_id})
292 for snapshot_instance in replica_snapshots:
293 db.share_snapshot_instance_update(
294 context, snapshot_instance['id'],
295 {'status': constants.STATUS_ERROR})
297 db.share_replica_update(context, share_replica_id, status_updates)
299 if action:
300 self.message_api.create(
301 context, action, context.project_id,
302 resource_type=message_field.Resource.SHARE_REPLICA,
303 resource_id=share_replica_id, exception=exc)
305 def create_share_replica(self, context, request_spec=None,
306 filter_properties=None):
307 try:
308 self.driver.schedule_create_replica(context, request_spec,
309 filter_properties)
311 except exception.NoValidHost as exc:
312 self._set_share_replica_error_state(
313 context, 'create_share_replica', exc, request_spec,
314 message_field.Action.ALLOCATE_HOST)
316 except Exception as exc:
317 with excutils.save_and_reraise_exception():
318 self._set_share_replica_error_state(
319 context, 'create_share_replica', exc, request_spec)
321 @periodic_task.periodic_task(spacing=CONF.message_reap_interval,
322 run_immediately=True)
323 @coordination.synchronized('locked-clean-expired-messages')
324 def _clean_expired_messages(self, context):
325 self.message_api.cleanup_expired_messages(context)
327 @periodic_task.periodic_task(spacing=CONF.service_down_time,
328 run_immediately=True)
329 @coordination.synchronized('locked-mark-services-as-down')
330 def _mark_services_as_down(self, context):
331 for svc in db.service_get_all(context):
332 if not utils.service_is_up(svc):
333 if svc["state"] not in ("down", "stopped"):
334 db.service_update(context, svc['id'], {"state": "down"})
336 def extend_share(self, context, share_id, new_size, reservations,
337 request_spec=None, filter_properties=None):
339 def _extend_share_set_error(self, context, ex, request_spec):
340 share_state = {'status': constants.STATUS_AVAILABLE}
341 self._set_share_state_and_notify('extend_share', share_state,
342 context, ex, request_spec)
344 share = db.share_get(context, share_id)
345 try:
346 size_increase = int(new_size) - share['size']
347 if filter_properties:
348 filter_properties['size_increase'] = size_increase
349 else:
350 filter_properties = {'size_increase': size_increase}
351 target_host = self.driver.host_passes_filters(
352 context,
353 share['host'],
354 request_spec, filter_properties)
355 target_host.consume_from_share({'size': size_increase})
356 share_rpcapi.ShareAPI().extend_share(context, share, new_size,
357 reservations)
358 except exception.NoValidHost as ex:
359 quota.QUOTAS.rollback(context, reservations,
360 project_id=share['project_id'],
361 user_id=share['user_id'],
362 share_type_id=share['share_type_id'])
363 _extend_share_set_error(self, context, ex, request_spec)
364 self.message_api.create(
365 context,
366 message_field.Action.EXTEND,
367 share['project_id'],
368 resource_type=message_field.Resource.SHARE,
369 resource_id=share['id'],
370 exception=ex)