Coverage for manila/scheduler/drivers/filter.py: 86%

265 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2011 Intel Corporation 

2# Copyright (c) 2011 OpenStack, LLC. 

3# All Rights Reserved. 

4# 

5# Licensed under the Apache License, Version 2.0 (the "License"); you may 

6# not use this file except in compliance with the License. You may obtain 

7# a copy of the License at 

8# 

9# http://www.apache.org/licenses/LICENSE-2.0 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

14# License for the specific language governing permissions and limitations 

15# under the License. 

16 

17""" 

18The FilterScheduler is for scheduling of share and share group creation. 

19You can customize this scheduler by specifying your own share/share group 

20filters and weighing functions. 

21""" 

22 

23from oslo_config import cfg 

24from oslo_log import log 

25 

26from manila import exception 

27from manila.i18n import _ 

28from manila.message import api as message_api 

29from manila.message import message_field 

30from manila import policy 

31from manila.scheduler.drivers import base 

32from manila.scheduler import scheduler_options 

33from manila.share import share_types 

34 

35CONF = cfg.CONF 

36LOG = log.getLogger(__name__) 

37 

38AFFINITY_HINT = 'same_host' 

39ANTI_AFFINITY_HINT = 'different_host' 

40AFFINITY_KEY = "__affinity_same_host" 

41ANTI_AFFINITY_KEY = "__affinity_different_host" 

42 

43 

44class FilterScheduler(base.Scheduler): 

45 """Scheduler that can be used for filtering and weighing.""" 

46 def __init__(self, *args, **kwargs): 

47 super(FilterScheduler, self).__init__(*args, **kwargs) 

48 self.cost_function_cache = None 

49 self.options = scheduler_options.SchedulerOptions() 

50 self.max_attempts = self._max_attempts() 

51 self.message_api = message_api.API() 

52 

53 def _get_configuration_options(self): 

54 """Fetch options dictionary. Broken out for testing.""" 

55 return self.options.get_configuration() 

56 

57 def get_pools(self, context, filters, cached): 

58 return self.host_manager.get_pools(context, filters, cached) 

59 

60 def _post_select_populate_filter_properties(self, filter_properties, 

61 host_state): 

62 """Add additional information to filter properties. 

63 

64 Add additional information to the filter properties after a host has 

65 been selected by the scheduling process. 

66 """ 

67 # Add a retry entry for the selected volume backend: 

68 self._add_retry_host(filter_properties, host_state.host) 

69 

70 def _add_retry_host(self, filter_properties, host): 

71 """Add retry entry for the selected volume backend. 

72 

73 In the event that the request gets re-scheduled, this entry 

74 will signal that the given backend has already been tried. 

75 """ 

76 retry = filter_properties.get('retry') 

77 if not retry: 

78 return 

79 hosts = retry['hosts'] 

80 hosts.append(host) 

81 

82 def _max_attempts(self): 

83 max_attempts = CONF.scheduler_max_attempts 

84 if max_attempts < 1: 

85 msg = _("Invalid value for 'scheduler_max_attempts', " 

86 "must be >=1") 

87 raise exception.InvalidParameterValue(err=msg) 

88 return max_attempts 

89 

90 def schedule_create_share(self, context, request_spec, filter_properties): 

91 weighed_host = self._schedule_share(context, 

92 request_spec, 

93 filter_properties) 

94 

95 host = weighed_host.obj.host 

96 share_id = request_spec['share_id'] 

97 snapshot_id = request_spec['snapshot_id'] 

98 

99 updated_share = base.share_update_db(context, share_id, host) 

100 self._post_select_populate_filter_properties(filter_properties, 

101 weighed_host.obj) 

102 

103 # context is not serializable 

104 filter_properties.pop('context', None) 

105 

106 self.share_rpcapi.create_share_instance( 

107 context, updated_share.instance, host, 

108 request_spec=request_spec, 

109 filter_properties=filter_properties, 

110 snapshot_id=snapshot_id 

111 ) 

112 

113 def schedule_create_replica(self, context, request_spec, 

114 filter_properties): 

115 share_replica_id = request_spec['share_instance_properties'].get('id') 

116 

117 weighed_host = self._schedule_share( 

118 context, request_spec, filter_properties) 

119 

120 host = weighed_host.obj.host 

121 

122 updated_share_replica = base.share_replica_update_db( 

123 context, share_replica_id, host) 

124 self._post_select_populate_filter_properties(filter_properties, 

125 weighed_host.obj) 

126 

127 # context is not serializable 

128 filter_properties.pop('context', None) 

129 

130 self.share_rpcapi.create_share_replica( 

131 context, updated_share_replica, host, request_spec=request_spec, 

132 filter_properties=filter_properties) 

133 

134 def _format_filter_properties(self, context, filter_properties, 

135 request_spec): 

136 

137 elevated = context.elevated() 

138 

139 share_properties = request_spec['share_properties'] 

140 share_instance_properties = (request_spec.get( 

141 'share_instance_properties', {})) 

142 share_proto = request_spec.get('share_proto', 

143 share_properties.get('share_proto')) 

144 

145 resource_properties = share_properties.copy() 

146 resource_properties.update(share_instance_properties.copy()) 

147 share_type = request_spec.get("share_type", {}) 

148 if not share_type: 

149 msg = _("You must create a share type in advance," 

150 " and specify in request body or" 

151 " set default_share_type in manila.conf.") 

152 LOG.error(msg) 

153 self.message_api.create( 

154 context, 

155 message_field.Action.CREATE, 

156 context.project_id, 

157 resource_type=message_field.Resource.SHARE, 

158 resource_id=request_spec.get('share_id', None), 

159 detail=message_field.Detail.NO_DEFAULT_SHARE_TYPE) 

160 raise exception.InvalidParameterValue(err=msg) 

161 

162 share_type['extra_specs'] = share_type.get('extra_specs') or {} 

163 

164 if share_type['extra_specs']: 

165 for extra_spec_name in share_types.get_boolean_extra_specs(): 

166 extra_spec = share_type['extra_specs'].get(extra_spec_name) 

167 if extra_spec is not None: 

168 if not extra_spec.startswith("<is>"): 

169 extra_spec = "<is> %s" % extra_spec 

170 share_type['extra_specs'][extra_spec_name] = extra_spec 

171 

172 storage_protocol_spec = ( 

173 share_type['extra_specs'].get('storage_protocol') 

174 ) 

175 if storage_protocol_spec is None and share_proto is not None: 

176 # a host can report multiple protocols as "storage_protocol" 

177 spec_value = "<in> %s" % share_proto 

178 share_type['extra_specs']['storage_protocol'] = spec_value 

179 

180 resource_type = share_type 

181 request_spec.update({'resource_properties': resource_properties}) 

182 

183 config_options = self._get_configuration_options() 

184 

185 share_group = request_spec.get('share_group') 

186 

187 # NOTE(gouthamr): If 'active_replica_host' or 'snapshot_host' is 

188 # present in the request spec, pass that host's 'replication_domain' to 

189 # the ShareReplication and CreateFromSnapshot filters. 

190 active_replica_host = request_spec.get('active_replica_host') 

191 snapshot_host = request_spec.get('snapshot_host') 

192 allowed_hosts = [] 

193 if active_replica_host: 

194 allowed_hosts.append(active_replica_host) 

195 if snapshot_host: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 allowed_hosts.append(snapshot_host) 

197 replication_domain = None 

198 if active_replica_host or snapshot_host: 

199 temp_hosts = self.host_manager.get_all_host_states_share(elevated) 

200 matching_host = next((host for host in temp_hosts 

201 if host.host in allowed_hosts), None) 

202 if matching_host: 

203 replication_domain = matching_host.replication_domain 

204 

205 # NOTE(zengyingzhe): remove the 'share_backend_name' extra spec, 

206 # let scheduler choose the available host for this replica or 

207 # snapshot clone creation request. 

208 share_type.get('extra_specs', {}).pop('share_backend_name', None) 

209 

210 if filter_properties is None: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 filter_properties = {} 

212 self._populate_retry_share(filter_properties, resource_properties) 

213 

214 filter_properties.update({'context': context, 

215 'request_spec': request_spec, 

216 'config_options': config_options, 

217 'share_type': share_type, 

218 'resource_type': resource_type, 

219 'share_group': share_group, 

220 'replication_domain': replication_domain, 

221 }) 

222 

223 self.populate_filter_properties_share(context, request_spec, 

224 filter_properties) 

225 

226 return filter_properties, share_properties 

227 

228 def _schedule_share(self, context, request_spec, filter_properties=None): 

229 """Returns a list of hosts that meet the required specs. 

230 

231 The list is ordered by their fitness. 

232 """ 

233 elevated = context.elevated() 

234 

235 filter_properties, share_properties = self._format_filter_properties( 

236 context, filter_properties, request_spec) 

237 

238 # Find our local list of acceptable hosts by filtering and 

239 # weighing our options. we virtually consume resources on 

240 # it so subsequent selections can adjust accordingly. 

241 

242 # Note: remember, we are using an iterator here. So only 

243 # traverse this list once. 

244 consider_disabled = False 

245 if policy.check_is_host_admin(context) and filter_properties.get( 

246 'scheduler_hints', {}).get('only_host'): 

247 # Admin user can schedule share on disabled host 

248 consider_disabled = True 

249 

250 hosts = self.host_manager.get_all_host_states_share( 

251 elevated, 

252 consider_disabled=consider_disabled 

253 ) 

254 if not hosts: 

255 msg = _("There are no hosts to fulfill this " 

256 "provisioning request. Are share " 

257 "backend services down?") 

258 self.message_api.create( 

259 context, 

260 message_field.Action.CREATE, 

261 context.project_id, 

262 resource_type=message_field.Resource.SHARE, 

263 resource_id=request_spec.get('share_id', None), 

264 detail=message_field.Detail.SHARE_BACKEND_NOT_READY_YET) 

265 raise exception.WillNotSchedule(msg) 

266 

267 # Filter local hosts based on requirements ... 

268 hosts, last_filter = self.host_manager.get_filtered_hosts( 

269 hosts, filter_properties) 

270 

271 if not hosts: 

272 msg = _('Failed to find a weighted host, the last executed filter' 

273 ' was %s.') 

274 raise exception.NoValidHost( 

275 reason=msg % last_filter, 

276 detail_data={'last_filter': last_filter}) 

277 

278 LOG.debug("Filtered share %(hosts)s", {"hosts": hosts}) 

279 # weighted_host = WeightedHost() ... the best 

280 # host for the job. 

281 weighed_hosts = self.host_manager.get_weighed_hosts(hosts, 

282 filter_properties) 

283 best_host = weighed_hosts[0] 

284 LOG.debug("Choosing for share: %(best_host)s", 

285 {"best_host": best_host}) 

286 # NOTE(rushiagr): updating the available space parameters at same place 

287 best_host.obj.consume_from_share(share_properties) 

288 return best_host 

289 

290 def _populate_retry_share(self, filter_properties, properties): 

291 """Populate filter properties with retry history. 

292 

293 Populate filter properties with history of retries for this 

294 request. If maximum retries is exceeded, raise NoValidHost. 

295 """ 

296 max_attempts = self.max_attempts 

297 retry = filter_properties.pop('retry', {}) 

298 

299 if max_attempts == 1: 

300 # re-scheduling is disabled. 

301 return 

302 

303 # retry is enabled, update attempt count: 

304 if retry: 

305 retry['num_attempts'] += 1 

306 else: 

307 retry = { 

308 'num_attempts': 1, 

309 'hosts': [] # list of share service hosts tried 

310 } 

311 filter_properties['retry'] = retry 

312 

313 share_id = properties.get('share_id') 

314 self._log_share_error(share_id, retry) 

315 

316 if retry['num_attempts'] > max_attempts: 

317 msg = _("Exceeded max scheduling attempts %(max_attempts)d for " 

318 "share %(share_id)s") % { 

319 "max_attempts": max_attempts, 

320 "share_id": share_id 

321 } 

322 raise exception.NoValidHost(reason=msg) 

323 

324 def _log_share_error(self, share_id, retry): 

325 """Log any exceptions from a previous share create operation. 

326 

327 If the request contained an exception from a previous share 

328 create operation, log it to aid debugging. 

329 """ 

330 exc = retry.pop('exc', None) # string-ified exception from share 

331 if not exc: 331 ↛ 334line 331 didn't jump to line 334 because the condition on line 331 was always true

332 return # no exception info from a previous attempt, skip 

333 

334 hosts = retry.get('hosts') 

335 if not hosts: 

336 return # no previously attempted hosts, skip 

337 

338 last_host = hosts[-1] 

339 LOG.error("Error scheduling %(share_id)s from last share-service: " 

340 "%(last_host)s : %(exc)s", { 

341 "share_id": share_id, 

342 "last_host": last_host, 

343 "exc": "exc" 

344 }) 

345 

346 def _populate_scheduler_hint(self, request_spec, hints, key, hint): 

347 share_properties = request_spec.get('share_properties', {}) 

348 value = share_properties.get('metadata', {}).get(key, None) 

349 if value: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true

350 hints.update({hint: value}) 

351 

352 def populate_filter_properties_scheduler_hints(self, context, request_spec, 

353 filter_properties): 

354 share_id = request_spec.get('share_id', None) 

355 if not share_id: 

356 filter_properties['scheduler_hints'] = {} 

357 return 

358 else: 

359 if filter_properties.get('scheduler_hints', None): 

360 return 

361 hints = {} 

362 self._populate_scheduler_hint(request_spec, hints, 

363 AFFINITY_KEY, 

364 AFFINITY_HINT) 

365 self._populate_scheduler_hint(request_spec, hints, 

366 ANTI_AFFINITY_KEY, 

367 ANTI_AFFINITY_HINT) 

368 filter_properties['scheduler_hints'] = hints 

369 

370 def populate_filter_properties_share(self, context, request_spec, 

371 filter_properties): 

372 """Stuff things into filter_properties. 

373 

374 Can be overridden in a subclass to add more data. 

375 """ 

376 shr = request_spec['share_properties'] 

377 inst = request_spec['share_instance_properties'] 

378 filter_properties['size'] = shr['size'] 

379 filter_properties['availability_zone_id'] = ( 

380 inst.get('availability_zone_id') 

381 ) 

382 filter_properties['user_id'] = shr.get('user_id') 

383 filter_properties['metadata'] = shr.get('metadata') 

384 filter_properties['snapshot_id'] = shr.get('snapshot_id') 

385 filter_properties['is_share_extend'] = ( 

386 request_spec.get('is_share_extend') 

387 ) 

388 self.populate_filter_properties_scheduler_hints(context, request_spec, 

389 filter_properties) 

390 

391 def schedule_create_share_group(self, context, share_group_id, 

392 request_spec, filter_properties): 

393 

394 LOG.info("Scheduling share group %s.", share_group_id) 

395 host = self._get_best_host_for_share_group(context, request_spec) 

396 

397 if not host: 

398 msg = _("No hosts available for share group %s.") % share_group_id 

399 raise exception.NoValidHost(reason=msg) 

400 

401 msg = "Chose host %(host)s for create_share_group %(group)s." 

402 LOG.info(msg, {'host': host, 'group': share_group_id}) 

403 

404 updated_share_group = base.share_group_update_db( 

405 context, share_group_id, host) 

406 

407 self.share_rpcapi.create_share_group( 

408 context, updated_share_group, host) 

409 

410 def _get_weighted_hosts_for_share_type(self, context, request_spec, 

411 share_type): 

412 config_options = self._get_configuration_options() 

413 # NOTE(ameade): Find our local list of acceptable hosts by 

414 # filtering and weighing our options. We virtually consume 

415 # resources on it so subsequent selections can adjust accordingly. 

416 

417 # NOTE(ameade): Remember, we are using an iterator here. So only 

418 # traverse this list once. 

419 all_hosts = self.host_manager.get_all_host_states_share(context) 

420 

421 if not all_hosts: 

422 return [] 

423 

424 share_type['extra_specs'] = share_type.get('extra_specs', {}) 

425 

426 if share_type['extra_specs']: 426 ↛ 434line 426 didn't jump to line 434 because the condition on line 426 was always true

427 for spec_name in share_types.get_required_extra_specs(): 

428 extra_spec = share_type['extra_specs'].get(spec_name) 

429 

430 if extra_spec is not None: 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true

431 share_type['extra_specs'][spec_name] = ( 

432 "<is> %s" % extra_spec) 

433 

434 filter_properties = { 

435 'context': context, 

436 'request_spec': request_spec, 

437 'config_options': config_options, 

438 'share_type': share_type, 

439 'resource_type': share_type, 

440 'size': 0, 

441 } 

442 # Filter local hosts based on requirements ... 

443 hosts, last_filter = self.host_manager.get_filtered_hosts( 

444 all_hosts, filter_properties) 

445 

446 if not hosts: 

447 return [] 

448 

449 LOG.debug("Filtered %s", hosts) 

450 

451 # weighted_host = WeightedHost() ... the best host for the job. 

452 weighed_hosts = self.host_manager.get_weighed_hosts( 

453 hosts, 

454 filter_properties) 

455 if not weighed_hosts: 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true

456 return [] 

457 

458 return weighed_hosts 

459 

460 def _get_weighted_hosts_for_share_group_type(self, context, request_spec, 

461 share_group_type): 

462 config_options = self._get_configuration_options() 

463 all_hosts = self.host_manager.get_all_host_states_share(context) 

464 

465 if not all_hosts: 

466 return [] 

467 

468 filter_properties = { 

469 'context': context, 

470 'request_spec': request_spec, 

471 'config_options': config_options, 

472 'share_group_type': share_group_type, 

473 'resource_type': share_group_type, 

474 } 

475 

476 hosts, last_filter = self.host_manager.get_filtered_hosts( 

477 all_hosts, 

478 filter_properties, 

479 CONF.scheduler_default_share_group_filters) 

480 

481 if not hosts: 481 ↛ 482line 481 didn't jump to line 482 because the condition on line 481 was never true

482 return [] 

483 

484 LOG.debug("Filtered %s", hosts) 

485 

486 weighed_hosts = self.host_manager.get_weighed_hosts( 

487 hosts, 

488 filter_properties) 

489 if not weighed_hosts: 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true

490 return [] 

491 

492 return weighed_hosts 

493 

494 def _get_weighted_candidates_share_group(self, context, request_spec): 

495 """Finds hosts that support the share group. 

496 

497 Returns a list of hosts that meet the required specs, 

498 ordered by their fitness. 

499 """ 

500 elevated = context.elevated() 

501 

502 shr_types = request_spec.get("share_types") 

503 

504 weighed_hosts = [] 

505 

506 for iteration_count, share_type in enumerate(shr_types): 

507 temp_weighed_hosts = self._get_weighted_hosts_for_share_type( 

508 elevated, request_spec, share_type) 

509 

510 # NOTE(ameade): Take the intersection of hosts so we have one that 

511 # can support all share types of the share group 

512 if iteration_count == 0: 512 ↛ 515line 512 didn't jump to line 515 because the condition on line 512 was always true

513 weighed_hosts = temp_weighed_hosts 

514 else: 

515 new_weighed_hosts = [] 

516 for host1 in weighed_hosts: 

517 for host2 in temp_weighed_hosts: 

518 if host1.obj.host == host2.obj.host: 

519 new_weighed_hosts.append(host1) 

520 weighed_hosts = new_weighed_hosts 

521 if not weighed_hosts: 

522 return [] 

523 

524 # NOTE(ameade): Ensure the hosts support the share group type 

525 share_group_type = request_spec.get("resource_type", {}) 

526 temp_weighed_group_hosts = ( 

527 self._get_weighted_hosts_for_share_group_type( 

528 elevated, request_spec, share_group_type)) 

529 new_weighed_hosts = [] 

530 for host1 in weighed_hosts: 

531 for host2 in temp_weighed_group_hosts: 

532 if host1.obj.host == host2.obj.host: 

533 new_weighed_hosts.append(host1) 

534 weighed_hosts = new_weighed_hosts 

535 

536 return weighed_hosts 

537 

538 def _get_best_host_for_share_group(self, context, request_spec): 

539 weighed_hosts = self._get_weighted_candidates_share_group( 

540 context, 

541 request_spec) 

542 

543 if not weighed_hosts: 543 ↛ 545line 543 didn't jump to line 545 because the condition on line 543 was always true

544 return None 

545 return weighed_hosts[0].obj.host 

546 

547 def host_passes_filters(self, context, host, request_spec, 

548 filter_properties): 

549 

550 elevated = context.elevated() 

551 

552 filter_properties, share_properties = self._format_filter_properties( 

553 context, filter_properties, request_spec) 

554 

555 hosts = self.host_manager.get_all_host_states_share(elevated) 

556 filter_class_names = None 

557 if request_spec.get('is_share_extend', None): 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 filter_class_names = CONF.scheduler_default_extend_filters 

559 hosts, last_filter = self.host_manager.get_filtered_hosts( 

560 hosts, filter_properties, 

561 filter_class_names=filter_class_names) 

562 hosts = self.host_manager.get_weighed_hosts(hosts, filter_properties) 

563 

564 for tgt_host in hosts: 

565 if tgt_host.obj.host == host: 

566 return tgt_host.obj 

567 

568 msg = (_('Cannot place share %(id)s on %(host)s, the last executed' 

569 ' filter was %(last_filter)s.') 

570 % {'id': request_spec['share_id'], 'host': host, 

571 'last_filter': last_filter}) 

572 raise exception.NoValidHost(reason=msg)