Coverage for manila/service.py: 74%
227 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2010 United States Government as represented by the
2# Administrator of the National Aeronautics and Space Administration.
3# Copyright 2011 Justin Santa Barbara
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License"); you may
7# not use this file except in compliance with the License. You may obtain
8# a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15# License for the specific language governing permissions and limitations
16# under the License.
18"""Generic Node base class for all workers that run on hosts."""
20import inspect
21import os
22import random
24from oslo_config import cfg
25from oslo_log import log
26import oslo_messaging as messaging
27from oslo_service import service
28from oslo_service import wsgi
29from oslo_utils import importutils
31from manila import context
32from manila import coordination
33from manila import db
34from manila import exception
35from manila import rpc
36from manila import utils
37from manila import version
39osprofiler_initializer = importutils.try_import('osprofiler.initializer')
40profiler = importutils.try_import('osprofiler.profiler')
41profiler_opts = importutils.try_import('osprofiler.opts')
43LOG = log.getLogger(__name__)
45service_opts = [
46 cfg.IntOpt('report_interval',
47 default=10,
48 help='Seconds between nodes reporting state to datastore.'),
49 cfg.IntOpt('cleanup_interval',
50 min=300,
51 default=1800,
52 help='Seconds between cleaning up the stopped nodes.'),
53 cfg.IntOpt('periodic_interval',
54 default=60,
55 help='Seconds between running periodic tasks.'),
56 cfg.IntOpt('periodic_fuzzy_delay',
57 default=60,
58 help='Range of seconds to randomly delay when starting the '
59 'periodic task scheduler to reduce stampeding. '
60 '(Disable by setting to 0)'),
61 cfg.HostAddressOpt('osapi_share_listen',
62 default="::",
63 help='IP address for OpenStack Share API to listen '
64 'on.'),
65 cfg.PortOpt('osapi_share_listen_port',
66 default=8786,
67 help='Port for OpenStack Share API to listen on.'),
68 cfg.IntOpt('osapi_share_workers',
69 default=1,
70 help='Number of workers for OpenStack Share API service.'),
71 cfg.BoolOpt('osapi_share_use_ssl',
72 default=False,
73 help='Wraps the socket in a SSL context if True is set. '
74 'A certificate file and key file must be specified.'),
75]
77CONF = cfg.CONF
78CONF.register_opts(service_opts)
79if profiler_opts: 79 ↛ 83line 79 didn't jump to line 83 because the condition on line 79 was always true
80 profiler_opts.set_defaults(CONF)
83def setup_profiler(binary, host):
84 if (osprofiler_initializer is None or 84 ↛ 87line 84 didn't jump to line 87 because the condition on line 84 was never true
85 profiler is None or
86 profiler_opts is None):
87 LOG.debug('osprofiler is not present')
88 return
90 if CONF.profiler.enabled: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 osprofiler_initializer.init_from_conf(
92 conf=CONF,
93 context=context.get_admin_context().to_dict(),
94 project="manila",
95 service=binary,
96 host=host
97 )
98 LOG.warning("OSProfiler is enabled.")
101class Service(service.Service):
102 """Service object for binaries running on hosts.
104 A service takes a manager and enables rpc by listening to queues based
105 on topic. It also periodically runs tasks on the manager and reports
106 it state to the database services table.
107 """
109 def __init__(self, host, binary, topic, manager, report_interval=None,
110 periodic_interval=None, periodic_fuzzy_delay=None,
111 service_name=None, coordination=False, *args, **kwargs):
112 super(Service, self).__init__()
113 if not rpc.initialized(): 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 rpc.init(CONF)
115 self.host = host
116 self.binary = binary
117 self.topic = topic
118 self.manager_class_name = manager
119 manager_class = importutils.import_class(self.manager_class_name)
120 if CONF.profiler.enabled and profiler is not None: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true
121 manager_class = profiler.trace_cls("rpc")(manager_class)
123 self.service = None
124 self.manager = manager_class(host=self.host,
125 service_name=service_name,
126 *args, **kwargs)
127 self.availability_zone = self.manager.availability_zone
128 self.report_interval = report_interval
129 self.cleanup_interval = CONF.cleanup_interval
130 self.periodic_interval = periodic_interval
131 self.periodic_fuzzy_delay = periodic_fuzzy_delay
132 self.saved_args, self.saved_kwargs = args, kwargs
133 self.coordinator = coordination
135 self.rpcserver = None
137 def start(self):
138 version_string = version.version_string()
139 LOG.info('Starting %(topic)s node (version %(version_string)s)',
140 {'topic': self.topic, 'version_string': version_string})
141 self.model_disconnected = False
142 ctxt = context.get_admin_context()
143 setup_profiler(self.binary, self.host)
145 try:
146 service_ref = db.service_get_by_args(ctxt,
147 self.host,
148 self.binary)
149 self.service_id = service_ref['id']
150 db.service_update(ctxt, self.service_id, {'state': 'down'})
151 except exception.NotFound:
152 self._create_service_ref(ctxt)
154 self.manager.init_host(service_id=self.service_id)
156 if self.coordinator: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true
157 coordination.LOCK_COORDINATOR.start()
159 LOG.debug("Creating RPC server for service %s.", self.topic)
161 target = messaging.Target(topic=self.topic, server=self.host)
162 endpoints = [self.manager]
163 endpoints.extend(self.manager.additional_endpoints)
164 self.rpcserver = rpc.get_server(target, endpoints)
165 self.rpcserver.start()
167 self.manager.init_host_with_rpc()
169 if self.report_interval:
170 self.tg.add_timer(self.report_interval, self.report_state,
171 initial_delay=self.report_interval)
173 self.tg.add_timer(self.cleanup_interval, self.cleanup_services,
174 initial_delay=self.cleanup_interval)
176 if self.periodic_interval:
177 if self.periodic_fuzzy_delay: 177 ↛ 180line 177 didn't jump to line 180 because the condition on line 177 was always true
178 initial_delay = random.randint(0, self.periodic_fuzzy_delay)
179 else:
180 initial_delay = None
182 self.tg.add_timer(self.periodic_interval, self.periodic_tasks,
183 initial_delay=initial_delay)
185 def _create_service_ref(self, context):
186 service_args = {
187 'host': self.host,
188 'binary': self.binary,
189 'topic': self.topic,
190 'state': 'up',
191 'report_count': 0,
192 'availability_zone': self.availability_zone
193 }
194 service_ref = db.service_create(context, service_args)
195 self.service_id = service_ref['id']
197 def __getattr__(self, key):
198 manager = self.__dict__.get('manager', None)
199 return getattr(manager, key)
201 @classmethod
202 def create(cls, host=None, binary=None, topic=None, manager=None,
203 report_interval=None, periodic_interval=None,
204 periodic_fuzzy_delay=None, service_name=None,
205 coordination=False):
206 """Instantiates class and passes back application object.
208 :param host: defaults to CONF.host
209 :param binary: defaults to basename of executable
210 :param topic: defaults to bin_name - 'manila-' part
211 :param manager: defaults to CONF.<topic>_manager
212 :param report_interval: defaults to CONF.report_interval
213 :param periodic_interval: defaults to CONF.periodic_interval
214 :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay
216 """
217 if not host: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true
218 host = CONF.host
219 if not binary: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 binary = os.path.basename(inspect.stack()[-1][1])
221 if not topic:
222 topic = binary
223 if not manager: 223 ↛ 226line 223 didn't jump to line 226 because the condition on line 223 was always true
224 subtopic = topic.rpartition('manila-')[2]
225 manager = CONF.get('%s_manager' % subtopic, None)
226 if report_interval is None: 226 ↛ 228line 226 didn't jump to line 228 because the condition on line 226 was always true
227 report_interval = CONF.report_interval
228 if periodic_interval is None: 228 ↛ 230line 228 didn't jump to line 230 because the condition on line 228 was always true
229 periodic_interval = CONF.periodic_interval
230 if periodic_fuzzy_delay is None: 230 ↛ 232line 230 didn't jump to line 232 because the condition on line 230 was always true
231 periodic_fuzzy_delay = CONF.periodic_fuzzy_delay
232 service_obj = cls(host, binary, topic, manager,
233 report_interval=report_interval,
234 periodic_interval=periodic_interval,
235 periodic_fuzzy_delay=periodic_fuzzy_delay,
236 service_name=service_name,
237 coordination=coordination)
239 return service_obj
241 def kill(self):
242 """Destroy the service object in the datastore."""
243 self.stop()
244 try:
245 db.service_destroy(context.get_admin_context(), self.service_id)
246 except exception.NotFound:
247 LOG.warning('Service killed that has no database entry.')
249 def stop(self):
250 # Try to shut the connection down, but if we get any sort of
251 # errors, go ahead and ignore them.. as we're shutting down anyway
252 try:
253 self.rpcserver.stop()
254 except Exception:
255 pass
257 try:
258 db.service_update(context.get_admin_context(),
259 self.service_id, {'state': 'stopped'})
260 except exception.NotFound:
261 LOG.warning('Service stopped that has no database entry.')
263 if self.coordinator: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true
264 try:
265 coordination.LOCK_COORDINATOR.stop()
266 except Exception:
267 LOG.exception("Unable to stop the Tooz Locking "
268 "Coordinator.")
270 super(Service, self).stop(graceful=True)
272 def wait(self):
273 if self.rpcserver:
274 self.rpcserver.wait()
275 super(Service, self).wait()
277 def periodic_tasks(self, raise_on_error=False):
278 """Tasks to be run at a periodic interval."""
279 ctxt = context.get_admin_context()
280 self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
282 def report_state(self):
283 """Update the state of this service in the datastore."""
284 if not self.manager.is_service_ready():
285 # NOTE(haixin): If the service is still initializing or failed to
286 # intialize.
287 LOG.error('Manager for service %s is not ready yet, skipping state'
288 ' update routine. Service will appear "down".',
289 self.binary)
290 return
292 ctxt = context.get_admin_context()
293 state_catalog = {}
294 try:
295 try:
296 service_ref = db.service_get(ctxt, self.service_id)
297 except exception.NotFound:
298 LOG.debug('The service database object disappeared, '
299 'Recreating it.')
300 self._create_service_ref(ctxt)
301 service_ref = db.service_get(ctxt, self.service_id)
303 state_catalog['report_count'] = service_ref['report_count'] + 1
304 if (self.availability_zone !=
305 service_ref['availability_zone']['name']):
306 state_catalog['availability_zone'] = self.availability_zone
308 if utils.service_is_up(service_ref): 308 ↛ 311line 308 didn't jump to line 311 because the condition on line 308 was always true
309 state_catalog['state'] = 'up'
310 else:
311 if service_ref['state'] != 'stopped':
312 state_catalog['state'] = 'down'
313 db.service_update(ctxt, self.service_id, state_catalog)
315 # TODO(termie): make this pattern be more elegant.
316 if getattr(self, 'model_disconnected', False): 316 ↛ exitline 316 didn't return from function 'report_state' because the condition on line 316 was always true
317 self.model_disconnected = False
318 LOG.error('Recovered model server connection!')
320 # TODO(vish): this should probably only catch connection errors
321 except Exception: # pylint: disable=W0702
322 if not getattr(self, 'model_disconnected', False): 322 ↛ exitline 322 didn't return from function 'report_state' because the condition on line 322 was always true
323 self.model_disconnected = True
324 LOG.exception('model server went away')
326 def cleanup_services(self):
327 """Remove the stopped services of same topic from the datastore."""
328 ctxt = context.get_admin_context()
329 try:
330 services = db.service_get_all_by_topic(ctxt, self.topic)
331 except exception.NotFound:
332 LOG.debug('The service database object disappeared,'
333 'Exiting from cleanup.')
334 return
336 for svc in services:
337 if (svc['state'] == 'stopped' and
338 not utils.service_is_up(svc)):
339 db.service_destroy(ctxt, svc['id'])
342class WSGIService(service.ServiceBase):
343 """Provides ability to launch API from a 'paste' configuration."""
345 def __init__(self, name, loader=None):
346 """Initialize, but do not start the WSGI server.
348 :param name: The name of the WSGI server given to the loader.
349 :param loader: Loads the WSGI application using the given name.
350 :returns: None
352 """
353 self.name = name
354 self.manager = self._get_manager()
355 self.loader = loader or wsgi.Loader(CONF)
356 if not rpc.initialized(): 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true
357 rpc.init(CONF)
358 self.app = self.loader.load_app(name)
359 self.host = getattr(CONF, '%s_listen' % name, "0.0.0.0") # nosec B104
360 self.port = getattr(CONF, '%s_listen_port' % name, 0)
361 self.workers = getattr(CONF, '%s_workers' % name, None)
362 self.use_ssl = getattr(CONF, '%s_use_ssl' % name, False)
363 if self.workers is not None and self.workers < 1: 363 ↛ 364line 363 didn't jump to line 364 because the condition on line 363 was never true
364 LOG.warning(
365 "Value of config option %(name)s_workers must be integer "
366 "greater than 1. Input value ignored.", {'name': name})
367 # Reset workers to default
368 self.workers = None
370 self.server = wsgi.Server(
371 CONF,
372 name,
373 self.app,
374 host=self.host,
375 port=self.port,
376 use_ssl=self.use_ssl
377 )
379 def _get_manager(self):
380 """Initialize a Manager object appropriate for this service.
382 Use the service name to look up a Manager subclass from the
383 configuration and initialize an instance. If no class name
384 is configured, just return None.
386 :returns: a Manager instance, or None.
388 """
389 fl = '%s_manager' % self.name
390 if fl not in CONF: 390 ↛ 393line 390 didn't jump to line 393 because the condition on line 390 was always true
391 return None
393 manager_class_name = CONF.get(fl, None)
394 if not manager_class_name:
395 return None
397 manager_class = importutils.import_class(manager_class_name)
398 return manager_class()
400 def start(self):
401 """Start serving this service using loaded configuration.
403 Also, retrieve updated port number in case '0' was passed in, which
404 indicates a random port should be used.
406 :returns: None
408 """
409 setup_profiler(self.name, self.host)
410 if self.manager: 410 ↛ 411line 410 didn't jump to line 411 because the condition on line 410 was never true
411 self.manager.init_host()
412 self.server.start()
413 self.port = self.server.port
415 def stop(self):
416 """Stop serving this API.
418 :returns: None
420 """
421 self.server.stop()
423 def wait(self):
424 """Wait for the service to stop serving this API.
426 :returns: None
428 """
429 self.server.wait()
431 def reset(self):
432 """Reset server greenpool size to default.
434 :returns: None
435 """
436 self.server.reset()
439def process_launcher():
440 return service.ProcessLauncher(CONF, restart_method='mutate')
443# NOTE(vish): the global launcher is to maintain the existing
444# functionality of calling service.serve +
445# service.wait
446_launcher = None
449def serve(server, workers=None):
450 global _launcher
451 if _launcher:
452 raise RuntimeError('serve() can only be called once')
453 _launcher = service.launch(CONF, server, workers=workers,
454 restart_method='mutate')
457def wait():
458 CONF.log_opt_values(LOG, log.DEBUG)
459 try:
460 _launcher.wait()
461 except KeyboardInterrupt:
462 _launcher.stop()
463 rpc.cleanup()