Coverage for manila/scheduler/manager.py: 89%

168 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2010 OpenStack, LLC. 

2# Copyright 2010 United States Government as represented by the 

3# Administrator of the National Aeronautics and Space Administration. 

4# All Rights Reserved. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); you may 

7# not use this file except in compliance with the License. You may obtain 

8# a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

15# License for the specific language governing permissions and limitations 

16# under the License. 

17 

18""" 

19Scheduler Service 

20""" 

21 

22from datetime import datetime 

23 

24from oslo_config import cfg 

25from oslo_log import log 

26from oslo_service import periodic_task 

27from oslo_utils import excutils 

28from oslo_utils import importutils 

29from oslo_utils import timeutils 

30 

31from manila.common import constants 

32from manila import context 

33from manila import coordination 

34from manila import db 

35from manila import exception 

36from manila import manager 

37from manila.message import api as message_api 

38from manila.message import message_field 

39from manila import quota 

40from manila import rpc 

41from manila.share import rpcapi as share_rpcapi 

42from manila.share import share_types 

43from manila import utils 

44 

45LOG = log.getLogger(__name__) 

46QUOTAS = quota.QUOTAS 

47 

48scheduler_driver_opt = cfg.StrOpt('scheduler_driver', 

49 default='manila.scheduler.drivers.' 

50 'filter.FilterScheduler', 

51 help='Default scheduler driver to use.') 

52 

53CONF = cfg.CONF 

54CONF.register_opt(scheduler_driver_opt) 

55 

56# Drivers that need to change module paths or class names can add their 

57# old/new path here to maintain backward compatibility. 

58MAPPING = { 

59 'manila.scheduler.chance.ChanceScheduler': 

60 'manila.scheduler.drivers.chance.ChanceScheduler', 

61 'manila.scheduler.filter_scheduler.FilterScheduler': 

62 'manila.scheduler.drivers.filter.FilterScheduler', 

63 'manila.scheduler.simple.SimpleScheduler': 

64 'manila.scheduler.drivers.simple.SimpleScheduler', 

65} 

66 

67 

68class SchedulerManager(manager.Manager): 

69 """Chooses a host to create shares.""" 

70 

71 RPC_API_VERSION = '1.11' 

72 

73 def __init__(self, scheduler_driver=None, service_name=None, 

74 *args, **kwargs): 

75 

76 if not scheduler_driver: 

77 scheduler_driver = CONF.scheduler_driver 

78 if scheduler_driver in MAPPING: 

79 msg_args = { 

80 'old': scheduler_driver, 

81 'new': MAPPING[scheduler_driver], 

82 } 

83 LOG.warning("Scheduler driver path %(old)s is deprecated, " 

84 "update your configuration to the new path " 

85 "%(new)s", msg_args) 

86 scheduler_driver = MAPPING[scheduler_driver] 

87 

88 self.driver = importutils.import_object(scheduler_driver) 

89 self.message_api = message_api.API() 

90 super(SchedulerManager, self).__init__(*args, **kwargs) 

91 self.service_id = None 

92 

93 def init_host_with_rpc(self, service_id=None): 

94 self.service_id = service_id 

95 ctxt = context.get_admin_context() 

96 self.request_service_capabilities(ctxt) 

97 

98 def get_host_list(self, context): 

99 """Get a list of hosts from the HostManager.""" 

100 return self.driver.get_host_list() 

101 

102 def get_service_capabilities(self, context): 

103 """Get the normalized set of capabilities for this zone.""" 

104 return self.driver.get_service_capabilities() 

105 

106 def update_service_capabilities(self, context, service_name=None, 

107 host=None, capabilities=None, 

108 timestamp=None, **kwargs): 

109 """Process a capability update from a service node.""" 

110 if capabilities is None: 

111 capabilities = {} 

112 elif timestamp: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 timestamp = datetime.strptime(timestamp, 

114 timeutils.PERFECT_TIME_FORMAT) 

115 self.driver.update_service_capabilities(service_name, 

116 host, 

117 capabilities, 

118 timestamp) 

119 

120 def create_share_instance(self, context, request_spec=None, 

121 filter_properties=None): 

122 try: 

123 self.driver.schedule_create_share(context, request_spec, 

124 filter_properties) 

125 except exception.NoValidHost as ex: 

126 self._set_share_state_and_notify( 

127 'create_share', {'status': constants.STATUS_ERROR}, 

128 context, ex, request_spec, 

129 message_field.Action.ALLOCATE_HOST) 

130 except Exception as ex: 

131 with excutils.save_and_reraise_exception(): 

132 self._set_share_state_and_notify( 

133 'create_share', {'status': constants.STATUS_ERROR}, 

134 context, ex, request_spec) 

135 

136 def get_pools(self, context, filters=None, cached=False): 

137 """Get active pools from the scheduler's cache.""" 

138 return self.driver.get_pools(context, filters, cached) 

139 

140 def manage_share(self, context, share_id, driver_options, request_spec, 

141 filter_properties=None): 

142 """Ensure that the host exists and can accept the share.""" 

143 

144 def _manage_share_set_error(self, context, ex, request_spec): 

145 # NOTE(haixin) if failed to scheduler backend for manage share, 

146 # and we do not commit quota usages here, so we should set size 0 

147 # because we don't know the real size of the size, and we will 

148 # skip quota cuts when unmanage share with manage_error status. 

149 self._set_share_state_and_notify( 

150 'manage_share', 

151 {'status': constants.STATUS_MANAGE_ERROR, 'size': 0}, 

152 context, ex, request_spec) 

153 

154 share_ref = db.share_get(context, share_id) 

155 

156 try: 

157 self.driver.host_passes_filters( 

158 context, share_ref['host'], request_spec, filter_properties) 

159 except Exception as ex: 

160 with excutils.save_and_reraise_exception(): 

161 _manage_share_set_error(self, context, ex, request_spec) 

162 else: 

163 share_rpcapi.ShareAPI().manage_share(context, share_ref, 

164 driver_options) 

165 

166 def migrate_share_to_host( 

167 self, context, share_id, host, force_host_assisted_migration, 

168 preserve_metadata, writable, nondisruptive, preserve_snapshots, 

169 new_share_network_id, new_share_type_id, request_spec, 

170 filter_properties=None): 

171 """Ensure that the host exists and can accept the share.""" 

172 

173 share_ref = db.share_get(context, share_id) 

174 

175 def _migrate_share_set_error(self, context, ex, request_spec): 

176 instance = next((x for x in share_ref.instances 

177 if x['status'] == constants.STATUS_MIGRATING), 

178 None) 

179 if instance: 

180 db.share_instance_update( 

181 context, instance['id'], 

182 {'status': constants.STATUS_AVAILABLE}) 

183 self._set_share_state_and_notify( 

184 'migrate_share_to_host', 

185 {'task_state': constants.TASK_STATE_MIGRATION_ERROR}, 

186 context, ex, request_spec) 

187 share_types.revert_allocated_share_type_quotas_during_migration( 

188 context, share_ref, new_share_type_id) 

189 

190 try: 

191 tgt_host = self.driver.host_passes_filters( 

192 context, host, request_spec, filter_properties) 

193 

194 except Exception as ex: 

195 with excutils.save_and_reraise_exception(): 

196 _migrate_share_set_error(self, context, ex, request_spec) 

197 else: 

198 

199 try: 

200 share_rpcapi.ShareAPI().migration_start( 

201 context, share_ref, tgt_host.host, 

202 force_host_assisted_migration, preserve_metadata, writable, 

203 nondisruptive, preserve_snapshots, new_share_network_id, 

204 new_share_type_id) 

205 except Exception as ex: 

206 with excutils.save_and_reraise_exception(): 

207 _migrate_share_set_error(self, context, ex, request_spec) 

208 

209 def _set_share_state_and_notify(self, method, state, context, ex, 

210 request_spec, action=None): 

211 

212 LOG.error("Failed to schedule %(method)s: %(ex)s", 

213 {"method": method, "ex": ex}) 

214 

215 properties = request_spec.get('share_properties', {}) 

216 

217 share_id = request_spec.get('share_id', None) 

218 

219 if share_id: 

220 db.share_update(context, share_id, state) 

221 

222 if action: 

223 self.message_api.create( 

224 context, action, context.project_id, 

225 resource_type=message_field.Resource.SHARE, 

226 resource_id=share_id, exception=ex) 

227 

228 payload = dict(request_spec=request_spec, 

229 share_properties=properties, 

230 share_id=share_id, 

231 state=state, 

232 method=method, 

233 reason=ex) 

234 

235 rpc.get_notifier("scheduler").error( 

236 context, 'scheduler.' + method, payload) 

237 

238 def request_service_capabilities(self, context): 

239 share_rpcapi.ShareAPI().publish_service_capabilities(context) 

240 

241 def _set_share_group_error_state(self, method, context, ex, 

242 request_spec, action=None): 

243 LOG.warning("Failed to schedule_%(method)s: %(ex)s", 

244 {"method": method, "ex": ex}) 

245 

246 share_group_state = {'status': constants.STATUS_ERROR} 

247 

248 share_group_id = request_spec.get('share_group_id') 

249 

250 if share_group_id: 250 ↛ 253line 250 didn't jump to line 253 because the condition on line 250 was always true

251 db.share_group_update(context, share_group_id, share_group_state) 

252 

253 if action: 

254 self.message_api.create( 

255 context, action, context.project_id, 

256 resource_type=message_field.Resource.SHARE_GROUP, 

257 resource_id=share_group_id, exception=ex) 

258 

259 @periodic_task.periodic_task(spacing=600, run_immediately=True) 

260 def _expire_reservations(self, context): 

261 quota.QUOTAS.expire(context) 

262 

263 def create_share_group(self, context, share_group_id, request_spec=None, 

264 filter_properties=None): 

265 try: 

266 self.driver.schedule_create_share_group( 

267 context, share_group_id, request_spec, filter_properties) 

268 except exception.NoValidHost as ex: 

269 self._set_share_group_error_state( 

270 'create_share_group', context, ex, request_spec, 

271 message_field.Action.ALLOCATE_HOST) 

272 except Exception as ex: 

273 with excutils.save_and_reraise_exception(): 

274 self._set_share_group_error_state( 

275 'create_share_group', context, ex, request_spec) 

276 

277 def _set_share_replica_error_state(self, context, method, exc, 

278 request_spec, action=None): 

279 

280 LOG.warning("Failed to schedule_%(method)s: %(exc)s", 

281 {'method': method, 'exc': exc}) 

282 status_updates = { 

283 'status': constants.STATUS_ERROR, 

284 'replica_state': constants.STATUS_ERROR, 

285 } 

286 share_replica_id = request_spec.get( 

287 'share_instance_properties').get('id') 

288 

289 # Set any snapshot instances to 'error'. 

290 replica_snapshots = db.share_snapshot_instance_get_all_with_filters( 

291 context, {'share_instance_ids': share_replica_id}) 

292 for snapshot_instance in replica_snapshots: 

293 db.share_snapshot_instance_update( 

294 context, snapshot_instance['id'], 

295 {'status': constants.STATUS_ERROR}) 

296 

297 db.share_replica_update(context, share_replica_id, status_updates) 

298 

299 if action: 

300 self.message_api.create( 

301 context, action, context.project_id, 

302 resource_type=message_field.Resource.SHARE_REPLICA, 

303 resource_id=share_replica_id, exception=exc) 

304 

305 def create_share_replica(self, context, request_spec=None, 

306 filter_properties=None): 

307 try: 

308 self.driver.schedule_create_replica(context, request_spec, 

309 filter_properties) 

310 

311 except exception.NoValidHost as exc: 

312 self._set_share_replica_error_state( 

313 context, 'create_share_replica', exc, request_spec, 

314 message_field.Action.ALLOCATE_HOST) 

315 

316 except Exception as exc: 

317 with excutils.save_and_reraise_exception(): 

318 self._set_share_replica_error_state( 

319 context, 'create_share_replica', exc, request_spec) 

320 

321 @periodic_task.periodic_task(spacing=CONF.message_reap_interval, 

322 run_immediately=True) 

323 @coordination.synchronized('locked-clean-expired-messages') 

324 def _clean_expired_messages(self, context): 

325 self.message_api.cleanup_expired_messages(context) 

326 

327 @periodic_task.periodic_task(spacing=CONF.service_down_time, 

328 run_immediately=True) 

329 @coordination.synchronized('locked-mark-services-as-down') 

330 def _mark_services_as_down(self, context): 

331 for svc in db.service_get_all(context): 

332 if not utils.service_is_up(svc): 

333 if svc["state"] not in ("down", "stopped"): 

334 db.service_update(context, svc['id'], {"state": "down"}) 

335 

336 def extend_share(self, context, share_id, new_size, reservations, 

337 request_spec=None, filter_properties=None): 

338 

339 def _extend_share_set_error(self, context, ex, request_spec): 

340 share_state = {'status': constants.STATUS_AVAILABLE} 

341 self._set_share_state_and_notify('extend_share', share_state, 

342 context, ex, request_spec) 

343 

344 share = db.share_get(context, share_id) 

345 try: 

346 size_increase = int(new_size) - share['size'] 

347 if filter_properties: 

348 filter_properties['size_increase'] = size_increase 

349 else: 

350 filter_properties = {'size_increase': size_increase} 

351 target_host = self.driver.host_passes_filters( 

352 context, 

353 share['host'], 

354 request_spec, filter_properties) 

355 target_host.consume_from_share({'size': size_increase}) 

356 share_rpcapi.ShareAPI().extend_share(context, share, new_size, 

357 reservations) 

358 except exception.NoValidHost as ex: 

359 quota.QUOTAS.rollback(context, reservations, 

360 project_id=share['project_id'], 

361 user_id=share['user_id'], 

362 share_type_id=share['share_type_id']) 

363 _extend_share_set_error(self, context, ex, request_spec) 

364 self.message_api.create( 

365 context, 

366 message_field.Action.EXTEND, 

367 share['project_id'], 

368 resource_type=message_field.Resource.SHARE, 

369 resource_id=share['id'], 

370 exception=ex)