Coverage for manila/service.py: 74%

227 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2010 United States Government as represented by the 

2# Administrator of the National Aeronautics and Space Administration. 

3# Copyright 2011 Justin Santa Barbara 

4# All Rights Reserved. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); you may 

7# not use this file except in compliance with the License. You may obtain 

8# a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

15# License for the specific language governing permissions and limitations 

16# under the License. 

17 

18"""Generic Node base class for all workers that run on hosts.""" 

19 

20import inspect 

21import os 

22import random 

23 

24from oslo_config import cfg 

25from oslo_log import log 

26import oslo_messaging as messaging 

27from oslo_service import service 

28from oslo_service import wsgi 

29from oslo_utils import importutils 

30 

31from manila import context 

32from manila import coordination 

33from manila import db 

34from manila import exception 

35from manila import rpc 

36from manila import utils 

37from manila import version 

38 

39osprofiler_initializer = importutils.try_import('osprofiler.initializer') 

40profiler = importutils.try_import('osprofiler.profiler') 

41profiler_opts = importutils.try_import('osprofiler.opts') 

42 

43LOG = log.getLogger(__name__) 

44 

45service_opts = [ 

46 cfg.IntOpt('report_interval', 

47 default=10, 

48 help='Seconds between nodes reporting state to datastore.'), 

49 cfg.IntOpt('cleanup_interval', 

50 min=300, 

51 default=1800, 

52 help='Seconds between cleaning up the stopped nodes.'), 

53 cfg.IntOpt('periodic_interval', 

54 default=60, 

55 help='Seconds between running periodic tasks.'), 

56 cfg.IntOpt('periodic_fuzzy_delay', 

57 default=60, 

58 help='Range of seconds to randomly delay when starting the ' 

59 'periodic task scheduler to reduce stampeding. ' 

60 '(Disable by setting to 0)'), 

61 cfg.HostAddressOpt('osapi_share_listen', 

62 default="::", 

63 help='IP address for OpenStack Share API to listen ' 

64 'on.'), 

65 cfg.PortOpt('osapi_share_listen_port', 

66 default=8786, 

67 help='Port for OpenStack Share API to listen on.'), 

68 cfg.IntOpt('osapi_share_workers', 

69 default=1, 

70 help='Number of workers for OpenStack Share API service.'), 

71 cfg.BoolOpt('osapi_share_use_ssl', 

72 default=False, 

73 help='Wraps the socket in a SSL context if True is set. ' 

74 'A certificate file and key file must be specified.'), 

75] 

76 

77CONF = cfg.CONF 

78CONF.register_opts(service_opts) 

79if profiler_opts: 79 ↛ 83line 79 didn't jump to line 83 because the condition on line 79 was always true

80 profiler_opts.set_defaults(CONF) 

81 

82 

83def setup_profiler(binary, host): 

84 if (osprofiler_initializer is None or 84 ↛ 87line 84 didn't jump to line 87 because the condition on line 84 was never true

85 profiler is None or 

86 profiler_opts is None): 

87 LOG.debug('osprofiler is not present') 

88 return 

89 

90 if CONF.profiler.enabled: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 osprofiler_initializer.init_from_conf( 

92 conf=CONF, 

93 context=context.get_admin_context().to_dict(), 

94 project="manila", 

95 service=binary, 

96 host=host 

97 ) 

98 LOG.warning("OSProfiler is enabled.") 

99 

100 

101class Service(service.Service): 

102 """Service object for binaries running on hosts. 

103 

104 A service takes a manager and enables rpc by listening to queues based 

105 on topic. It also periodically runs tasks on the manager and reports 

106 it state to the database services table. 

107 """ 

108 

109 def __init__(self, host, binary, topic, manager, report_interval=None, 

110 periodic_interval=None, periodic_fuzzy_delay=None, 

111 service_name=None, coordination=False, *args, **kwargs): 

112 super(Service, self).__init__() 

113 if not rpc.initialized(): 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 rpc.init(CONF) 

115 self.host = host 

116 self.binary = binary 

117 self.topic = topic 

118 self.manager_class_name = manager 

119 manager_class = importutils.import_class(self.manager_class_name) 

120 if CONF.profiler.enabled and profiler is not None: 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true

121 manager_class = profiler.trace_cls("rpc")(manager_class) 

122 

123 self.service = None 

124 self.manager = manager_class(host=self.host, 

125 service_name=service_name, 

126 *args, **kwargs) 

127 self.availability_zone = self.manager.availability_zone 

128 self.report_interval = report_interval 

129 self.cleanup_interval = CONF.cleanup_interval 

130 self.periodic_interval = periodic_interval 

131 self.periodic_fuzzy_delay = periodic_fuzzy_delay 

132 self.saved_args, self.saved_kwargs = args, kwargs 

133 self.coordinator = coordination 

134 

135 self.rpcserver = None 

136 

137 def start(self): 

138 version_string = version.version_string() 

139 LOG.info('Starting %(topic)s node (version %(version_string)s)', 

140 {'topic': self.topic, 'version_string': version_string}) 

141 self.model_disconnected = False 

142 ctxt = context.get_admin_context() 

143 setup_profiler(self.binary, self.host) 

144 

145 try: 

146 service_ref = db.service_get_by_args(ctxt, 

147 self.host, 

148 self.binary) 

149 self.service_id = service_ref['id'] 

150 db.service_update(ctxt, self.service_id, {'state': 'down'}) 

151 except exception.NotFound: 

152 self._create_service_ref(ctxt) 

153 

154 self.manager.init_host(service_id=self.service_id) 

155 

156 if self.coordinator: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true

157 coordination.LOCK_COORDINATOR.start() 

158 

159 LOG.debug("Creating RPC server for service %s.", self.topic) 

160 

161 target = messaging.Target(topic=self.topic, server=self.host) 

162 endpoints = [self.manager] 

163 endpoints.extend(self.manager.additional_endpoints) 

164 self.rpcserver = rpc.get_server(target, endpoints) 

165 self.rpcserver.start() 

166 

167 self.manager.init_host_with_rpc() 

168 

169 if self.report_interval: 

170 self.tg.add_timer(self.report_interval, self.report_state, 

171 initial_delay=self.report_interval) 

172 

173 self.tg.add_timer(self.cleanup_interval, self.cleanup_services, 

174 initial_delay=self.cleanup_interval) 

175 

176 if self.periodic_interval: 

177 if self.periodic_fuzzy_delay: 177 ↛ 180line 177 didn't jump to line 180 because the condition on line 177 was always true

178 initial_delay = random.randint(0, self.periodic_fuzzy_delay) 

179 else: 

180 initial_delay = None 

181 

182 self.tg.add_timer(self.periodic_interval, self.periodic_tasks, 

183 initial_delay=initial_delay) 

184 

185 def _create_service_ref(self, context): 

186 service_args = { 

187 'host': self.host, 

188 'binary': self.binary, 

189 'topic': self.topic, 

190 'state': 'up', 

191 'report_count': 0, 

192 'availability_zone': self.availability_zone 

193 } 

194 service_ref = db.service_create(context, service_args) 

195 self.service_id = service_ref['id'] 

196 

197 def __getattr__(self, key): 

198 manager = self.__dict__.get('manager', None) 

199 return getattr(manager, key) 

200 

201 @classmethod 

202 def create(cls, host=None, binary=None, topic=None, manager=None, 

203 report_interval=None, periodic_interval=None, 

204 periodic_fuzzy_delay=None, service_name=None, 

205 coordination=False): 

206 """Instantiates class and passes back application object. 

207 

208 :param host: defaults to CONF.host 

209 :param binary: defaults to basename of executable 

210 :param topic: defaults to bin_name - 'manila-' part 

211 :param manager: defaults to CONF.<topic>_manager 

212 :param report_interval: defaults to CONF.report_interval 

213 :param periodic_interval: defaults to CONF.periodic_interval 

214 :param periodic_fuzzy_delay: defaults to CONF.periodic_fuzzy_delay 

215 

216 """ 

217 if not host: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 host = CONF.host 

219 if not binary: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 binary = os.path.basename(inspect.stack()[-1][1]) 

221 if not topic: 

222 topic = binary 

223 if not manager: 223 ↛ 226line 223 didn't jump to line 226 because the condition on line 223 was always true

224 subtopic = topic.rpartition('manila-')[2] 

225 manager = CONF.get('%s_manager' % subtopic, None) 

226 if report_interval is None: 226 ↛ 228line 226 didn't jump to line 228 because the condition on line 226 was always true

227 report_interval = CONF.report_interval 

228 if periodic_interval is None: 228 ↛ 230line 228 didn't jump to line 230 because the condition on line 228 was always true

229 periodic_interval = CONF.periodic_interval 

230 if periodic_fuzzy_delay is None: 230 ↛ 232line 230 didn't jump to line 232 because the condition on line 230 was always true

231 periodic_fuzzy_delay = CONF.periodic_fuzzy_delay 

232 service_obj = cls(host, binary, topic, manager, 

233 report_interval=report_interval, 

234 periodic_interval=periodic_interval, 

235 periodic_fuzzy_delay=periodic_fuzzy_delay, 

236 service_name=service_name, 

237 coordination=coordination) 

238 

239 return service_obj 

240 

241 def kill(self): 

242 """Destroy the service object in the datastore.""" 

243 self.stop() 

244 try: 

245 db.service_destroy(context.get_admin_context(), self.service_id) 

246 except exception.NotFound: 

247 LOG.warning('Service killed that has no database entry.') 

248 

249 def stop(self): 

250 # Try to shut the connection down, but if we get any sort of 

251 # errors, go ahead and ignore them.. as we're shutting down anyway 

252 try: 

253 self.rpcserver.stop() 

254 except Exception: 

255 pass 

256 

257 try: 

258 db.service_update(context.get_admin_context(), 

259 self.service_id, {'state': 'stopped'}) 

260 except exception.NotFound: 

261 LOG.warning('Service stopped that has no database entry.') 

262 

263 if self.coordinator: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true

264 try: 

265 coordination.LOCK_COORDINATOR.stop() 

266 except Exception: 

267 LOG.exception("Unable to stop the Tooz Locking " 

268 "Coordinator.") 

269 

270 super(Service, self).stop(graceful=True) 

271 

272 def wait(self): 

273 if self.rpcserver: 

274 self.rpcserver.wait() 

275 super(Service, self).wait() 

276 

277 def periodic_tasks(self, raise_on_error=False): 

278 """Tasks to be run at a periodic interval.""" 

279 ctxt = context.get_admin_context() 

280 self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error) 

281 

282 def report_state(self): 

283 """Update the state of this service in the datastore.""" 

284 if not self.manager.is_service_ready(): 

285 # NOTE(haixin): If the service is still initializing or failed to 

286 # intialize. 

287 LOG.error('Manager for service %s is not ready yet, skipping state' 

288 ' update routine. Service will appear "down".', 

289 self.binary) 

290 return 

291 

292 ctxt = context.get_admin_context() 

293 state_catalog = {} 

294 try: 

295 try: 

296 service_ref = db.service_get(ctxt, self.service_id) 

297 except exception.NotFound: 

298 LOG.debug('The service database object disappeared, ' 

299 'Recreating it.') 

300 self._create_service_ref(ctxt) 

301 service_ref = db.service_get(ctxt, self.service_id) 

302 

303 state_catalog['report_count'] = service_ref['report_count'] + 1 

304 if (self.availability_zone != 

305 service_ref['availability_zone']['name']): 

306 state_catalog['availability_zone'] = self.availability_zone 

307 

308 if utils.service_is_up(service_ref): 308 ↛ 311line 308 didn't jump to line 311 because the condition on line 308 was always true

309 state_catalog['state'] = 'up' 

310 else: 

311 if service_ref['state'] != 'stopped': 

312 state_catalog['state'] = 'down' 

313 db.service_update(ctxt, self.service_id, state_catalog) 

314 

315 # TODO(termie): make this pattern be more elegant. 

316 if getattr(self, 'model_disconnected', False): 316 ↛ exitline 316 didn't return from function 'report_state' because the condition on line 316 was always true

317 self.model_disconnected = False 

318 LOG.error('Recovered model server connection!') 

319 

320 # TODO(vish): this should probably only catch connection errors 

321 except Exception: # pylint: disable=W0702 

322 if not getattr(self, 'model_disconnected', False): 322 ↛ exitline 322 didn't return from function 'report_state' because the condition on line 322 was always true

323 self.model_disconnected = True 

324 LOG.exception('model server went away') 

325 

326 def cleanup_services(self): 

327 """Remove the stopped services of same topic from the datastore.""" 

328 ctxt = context.get_admin_context() 

329 try: 

330 services = db.service_get_all_by_topic(ctxt, self.topic) 

331 except exception.NotFound: 

332 LOG.debug('The service database object disappeared,' 

333 'Exiting from cleanup.') 

334 return 

335 

336 for svc in services: 

337 if (svc['state'] == 'stopped' and 

338 not utils.service_is_up(svc)): 

339 db.service_destroy(ctxt, svc['id']) 

340 

341 

342class WSGIService(service.ServiceBase): 

343 """Provides ability to launch API from a 'paste' configuration.""" 

344 

345 def __init__(self, name, loader=None): 

346 """Initialize, but do not start the WSGI server. 

347 

348 :param name: The name of the WSGI server given to the loader. 

349 :param loader: Loads the WSGI application using the given name. 

350 :returns: None 

351 

352 """ 

353 self.name = name 

354 self.manager = self._get_manager() 

355 self.loader = loader or wsgi.Loader(CONF) 

356 if not rpc.initialized(): 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true

357 rpc.init(CONF) 

358 self.app = self.loader.load_app(name) 

359 self.host = getattr(CONF, '%s_listen' % name, "0.0.0.0") # nosec B104 

360 self.port = getattr(CONF, '%s_listen_port' % name, 0) 

361 self.workers = getattr(CONF, '%s_workers' % name, None) 

362 self.use_ssl = getattr(CONF, '%s_use_ssl' % name, False) 

363 if self.workers is not None and self.workers < 1: 363 ↛ 364line 363 didn't jump to line 364 because the condition on line 363 was never true

364 LOG.warning( 

365 "Value of config option %(name)s_workers must be integer " 

366 "greater than 1. Input value ignored.", {'name': name}) 

367 # Reset workers to default 

368 self.workers = None 

369 

370 self.server = wsgi.Server( 

371 CONF, 

372 name, 

373 self.app, 

374 host=self.host, 

375 port=self.port, 

376 use_ssl=self.use_ssl 

377 ) 

378 

379 def _get_manager(self): 

380 """Initialize a Manager object appropriate for this service. 

381 

382 Use the service name to look up a Manager subclass from the 

383 configuration and initialize an instance. If no class name 

384 is configured, just return None. 

385 

386 :returns: a Manager instance, or None. 

387 

388 """ 

389 fl = '%s_manager' % self.name 

390 if fl not in CONF: 390 ↛ 393line 390 didn't jump to line 393 because the condition on line 390 was always true

391 return None 

392 

393 manager_class_name = CONF.get(fl, None) 

394 if not manager_class_name: 

395 return None 

396 

397 manager_class = importutils.import_class(manager_class_name) 

398 return manager_class() 

399 

400 def start(self): 

401 """Start serving this service using loaded configuration. 

402 

403 Also, retrieve updated port number in case '0' was passed in, which 

404 indicates a random port should be used. 

405 

406 :returns: None 

407 

408 """ 

409 setup_profiler(self.name, self.host) 

410 if self.manager: 410 ↛ 411line 410 didn't jump to line 411 because the condition on line 410 was never true

411 self.manager.init_host() 

412 self.server.start() 

413 self.port = self.server.port 

414 

415 def stop(self): 

416 """Stop serving this API. 

417 

418 :returns: None 

419 

420 """ 

421 self.server.stop() 

422 

423 def wait(self): 

424 """Wait for the service to stop serving this API. 

425 

426 :returns: None 

427 

428 """ 

429 self.server.wait() 

430 

431 def reset(self): 

432 """Reset server greenpool size to default. 

433 

434 :returns: None 

435 """ 

436 self.server.reset() 

437 

438 

439def process_launcher(): 

440 return service.ProcessLauncher(CONF, restart_method='mutate') 

441 

442 

443# NOTE(vish): the global launcher is to maintain the existing 

444# functionality of calling service.serve + 

445# service.wait 

446_launcher = None 

447 

448 

449def serve(server, workers=None): 

450 global _launcher 

451 if _launcher: 

452 raise RuntimeError('serve() can only be called once') 

453 _launcher = service.launch(CONF, server, workers=workers, 

454 restart_method='mutate') 

455 

456 

457def wait(): 

458 CONF.log_opt_values(LOG, log.DEBUG) 

459 try: 

460 _launcher.wait() 

461 except KeyboardInterrupt: 

462 _launcher.stop() 

463 rpc.cleanup()