Coverage for manila/scheduler/weighers/netapp_aiq.py: 85%

156 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2023 NetApp, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16from oslo_config import cfg 

17from oslo_log import log as logging 

18from oslo_serialization import jsonutils 

19from oslo_utils import netutils 

20import requests 

21from requests.adapters import HTTPAdapter 

22from requests import auth 

23from urllib3.util import retry 

24 

25from manila import exception 

26from manila.scheduler.weighers import base_host 

27 

28ACTIVE_IQ_WEIGHER_GROUP = 'netapp_active_iq' 

29 

30 

31active_iq_weight_opts = [ 

32 cfg.HostAddressOpt('aiq_hostname', 

33 help='The hostname (or IP address) for the Active IQ.'), 

34 cfg.PortOpt('aiq_port', 

35 help=('The TCP port to use for communication with the Active ' 

36 'IQ. If not specified, the weigher driver will use 80 ' 

37 'for HTTP and 443 for HTTPS.')), 

38 cfg.StrOpt('aiq_transport_type', 

39 default='https', 

40 choices=['http', 'https'], 

41 help=('The transport protocol used when communicating with ' 

42 'the Active IQ. Valid values are ' 

43 'http or https.')), 

44 cfg.BoolOpt('aiq_ssl_verify', 

45 default=False, 

46 help='Verifying the SSL certificate. Default is False.'), 

47 cfg.StrOpt('aiq_ssl_cert_path', 

48 help=("The path to a CA_BUNDLE file or directory with " 

49 "certificates of trusted CA. If set to a directory, it " 

50 "must have been processed using the c_rehash utility " 

51 "supplied with OpenSSL. If not informed, it will use the " 

52 "Mozilla's carefully curated collection of Root " 

53 "Certificates for validating the trustworthiness of SSL " 

54 "certificates.")), 

55 cfg.StrOpt('aiq_username', 

56 help=('Administrative user account name used to access the ' 

57 'Active IQ.')), 

58 cfg.StrOpt('aiq_password', 

59 help=('Password for the administrative user account ' 

60 'specified in the aiq_username option.'), 

61 secret=True), 

62 cfg.IntOpt('aiq_eval_method', 

63 default=0, 

64 help='Integer indicator of which evaluation method, defaults ' 

65 'to 0 (0 - by index, 1 - normalized value, 2 - by ' 

66 'literal value).'), 

67 cfg.ListOpt('aiq_priority_order', 

68 default=[ 

69 'ops', 

70 'latency', 

71 'volume_count', 

72 'size' 

73 ], 

74 help='Permutation of the list ["volume_count", "size", ' 

75 '"latency", “ops”]. Note that for volume_count and ' 

76 'latency, the higher the values, the less optimal the ' 

77 'resources. For capacity and ops, the higher the value ' 

78 'the more desirable the resources. If metrics are to be ' 

79 'considered with equal weights, concatenate the strings, ' 

80 'separated by ":".' 

81 'An example is ["volume_count", "size", “latency:ops”] ' 

82 'if latency and ops want to have equal but minimum ' 

83 'weights, or ["volume_count:size", "latency", “ops”] ' 

84 'if volume_count and size have equal maximum weights. ' 

85 'If not provided, the default order is ' 

86 '["volume_count", "size", "latency", “ops”].'), 

87] 

88CONF = cfg.CONF 

89CONF.register_opts(active_iq_weight_opts, ACTIVE_IQ_WEIGHER_GROUP) 

90 

91LOG = logging.getLogger(__name__) 

92 

93 

94class NetAppAIQWeigher(base_host.BaseHostWeigher): 

95 """AIQ Weigher. Assign weights based on NetApp Active IQ tool.""" 

96 

97 def __init__(self, *args, **kwargs): 

98 super(NetAppAIQWeigher, self).__init__(*args, **kwargs) 

99 

100 self.configuration = CONF[ACTIVE_IQ_WEIGHER_GROUP] 

101 

102 self.host = self.configuration.aiq_hostname 

103 if not self.host: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true

104 raise exception.NetappActiveIQWeigherRequiredParameter( 

105 config="aiq_hostname") 

106 

107 self.username = self.configuration.aiq_username 

108 if not self.username: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 raise exception.NetappActiveIQWeigherRequiredParameter( 

110 config="aiq_username") 

111 

112 self.password = self.configuration.aiq_password 

113 if not self.password: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 raise exception.NetappActiveIQWeigherRequiredParameter( 

115 config="aiq_password") 

116 

117 self.protocol = self.configuration.aiq_transport_type 

118 self.port = self.configuration.aiq_port 

119 if not self.port: 119 ↛ 122line 119 didn't jump to line 122 because the condition on line 119 was always true

120 self.port = "80" if self.protocol == "http" else "443" 

121 

122 self.ssl_verify = self.configuration.aiq_ssl_verify 

123 if self.ssl_verify and self.configuration.aiq_ssl_cert_path: 123 ↛ 126line 123 didn't jump to line 126 because the condition on line 123 was always true

124 self.ssl_verify = self.configuration.aiq_ssl_cert_path 

125 

126 self.eval_method = self.configuration.aiq_eval_method 

127 self.priority_order = self.configuration.aiq_priority_order 

128 

129 def _weigh_object(self, host_state, weight_properties): 

130 """Weight for a specific object from parent abstract class""" 

131 # NOTE(felipe_rodrigues): this abstract class method is not called for 

132 # the AIQ weigher, since it does not weigh one single object. 

133 raise NotImplementedError() 

134 

135 def _weigh_active_iq(self, netapp_aggregates_location, weight_properties): 

136 """Determine host's rating based on a Active IQ.""" 

137 size = weight_properties.get('size') 

138 share_type = weight_properties.get('share_type', {}) 

139 performance_level_name = share_type.get('extra_specs', {}).get( 

140 'netapp:performance_service_level_name') 

141 

142 # retrieves the performance service level key if a PSL name is given. 

143 performance_level_id = None 

144 if performance_level_name: 144 ↛ 151line 144 didn't jump to line 151 because the condition on line 144 was always true

145 performance_level_id = self._get_performance_level_id( 

146 performance_level_name) 

147 if not performance_level_id: 

148 return [] 

149 

150 # retrieves the equivalent active IQ keys of the pools. 

151 resource_keys = self._get_resource_keys(netapp_aggregates_location) 

152 if len(resource_keys) == 0: 

153 return [] 

154 

155 result = self._balance_aggregates(resource_keys, size, 

156 performance_level_id) 

157 

158 return result 

159 

160 def _get_url(self): 

161 """Get the base URL for REST requests.""" 

162 host = netutils.escape_ipv6(self.host) 

163 return f'{self.protocol}://{host}:{self.port}/api/' 

164 

165 def _get_request_method(self, method, session): 

166 """Returns the request method to be used in the REST call.""" 

167 

168 request_methods = { 

169 'post': session.post, 

170 'get': session.get, 

171 'put': session.put, 

172 'delete': session.delete, 

173 'patch': session.patch, 

174 } 

175 return request_methods[method] 

176 

177 def _get_session_method(self, method): 

178 """Get the REST method from the session.""" 

179 

180 # NOTE(felipe_rodrigues): request resilient of temporary network 

181 # failures (like name resolution failure), retrying until 5 times. 

182 _session = requests.Session() 

183 max_retries = retry.Retry(total=5, connect=5, read=2, backoff_factor=1) 

184 adapter = HTTPAdapter(max_retries=max_retries) 

185 _session.mount('%s://' % self.protocol, adapter) 

186 

187 _session.auth = auth.HTTPBasicAuth(self.username, self.password) 

188 _session.verify = self.ssl_verify 

189 _session.headers = {} 

190 

191 return self._get_request_method(method, _session) 

192 

193 def _call_active_iq(self, action_path, method, body=None): 

194 """Call the Active IQ REST API.""" 

195 rest_method = self._get_session_method(method) 

196 url = self._get_url() + action_path 

197 

198 msg_args = { 

199 "method": method.upper(), 

200 "url": url, 

201 "body": body, 

202 } 

203 LOG.debug("REQUEST: %(method)s %(url)s Body=%(body)s", msg_args) 

204 

205 response = rest_method(url, json=body) 

206 

207 code = response.status_code 

208 response_body = response.content 

209 msg_args = { 

210 "code": code, 

211 "body": response_body, 

212 } 

213 LOG.debug("RESPONSE: %(code)s Body=%(body)s", msg_args) 

214 

215 return code, response_body 

216 

217 def _get_performance_level_id(self, performance_level_name): 

218 """Gets the ID of a performance level name.""" 

219 psl_endpoint = (f'storage-provider/performance-service-levels?' 

220 f'name={performance_level_name}') 

221 try: 

222 code, res = self._call_active_iq(psl_endpoint, "get") 

223 except Exception as e: 

224 LOG.error("Could not retrieve the key of the performance service " 

225 "level named as '%(psl)s'. Skipping the weigher. " 

226 "Error: %(error)s", 

227 {'psl': performance_level_name, 'error': e}) 

228 LOG.error(e) 

229 return None 

230 

231 if code != 200: 

232 LOG.error("Could not retrieve the key of the performance service " 

233 "level named as '%(psl)s'. Skipping the weigher.", 

234 {'psl': performance_level_name}) 

235 return None 

236 

237 res = jsonutils.loads(res) if res else {} 

238 psl_list = res.get('records', []) 

239 if len(psl_list) == 0: 

240 LOG.error("Could not found any performance service level named " 

241 "as '%s'. Skipping the weigher.", performance_level_name) 

242 return None 

243 

244 return psl_list[0].get("key", None) 

245 

246 def _get_aggregate_identifier(self, aggr_name, cluster_name): 

247 """Returns the string identifier of an aggregate on a cluster.""" 

248 return f'{aggr_name}:{cluster_name}' 

249 

250 def _get_resource_keys(self, netapp_aggregates_location): 

251 """Map the aggregates names to the AIQ resource keys.""" 

252 aggregate_endpoint = 'datacenter/storage/aggregates' 

253 

254 try: 

255 code, res = self._call_active_iq(aggregate_endpoint, "get") 

256 except Exception as e: 

257 LOG.error("Could not retrieve the aggregates resource keys. " 

258 "Skipping the weigher. Error: %s", e) 

259 LOG.error(e) 

260 return [] 

261 

262 if code != 200: 

263 LOG.error("Could not retrieve the aggregates resource keys. " 

264 "Skipping the weigher.") 

265 return [] 

266 

267 res = jsonutils.loads(res) if res else {} 

268 aggr_map = {} 

269 for aggr in res.get('records', []): 

270 identifier = self._get_aggregate_identifier( 

271 aggr["name"], aggr["cluster"]["name"]) 

272 aggr_map[identifier] = aggr["key"] 

273 

274 # we must keep the lists with the same order. 

275 resource_keys = [] 

276 found_pool_keys = [] 

277 for identifier in netapp_aggregates_location: 

278 if identifier in aggr_map: 

279 found_pool_keys.append(identifier) 

280 # If a pool could not be found, it is marked as resource key 0. 

281 resource_keys.append(aggr_map.get(identifier, 0)) 

282 

283 LOG.debug("The following pools will be evaluated by Active IQ: %s", 

284 found_pool_keys) 

285 

286 return resource_keys 

287 

288 def _balance_aggregates(self, resource_keys, size, performance_level_uuid): 

289 """Call AIQ to generate the weights of each aggregate.""" 

290 balance_endpoint = 'storage-provider/data-placement/balance' 

291 body = { 

292 "capacity": f'{size}GB', 

293 "eval_method": self.eval_method, 

294 # NOTE(felipe_rodrigues): from Active IQ documentation, the 

295 # opt_method only works as 0. 

296 "opt_method": 0, 

297 "priority_order": self.priority_order, 

298 "separate_flag": False, 

299 # NOTE(felipe_rodrigues): remove the keys marked with 0, since they 

300 # are not found the pool keys. 

301 "resource_keys": [key for key in resource_keys if key != 0], 

302 } 

303 if performance_level_uuid: 303 ↛ 306line 303 didn't jump to line 306 because the condition on line 303 was always true

304 body["ssl_key"] = performance_level_uuid 

305 

306 try: 

307 code, res = self._call_active_iq( 

308 balance_endpoint, "post", body=body) 

309 except Exception as e: 

310 LOG.error("Could not balance the aggregates. Skipping the " 

311 "weigher. Error: %s", e) 

312 LOG.error(e) 

313 return [] 

314 

315 if code != 200: 

316 LOG.error("Could not balance the aggregates. Skipping the " 

317 "weigher.") 

318 return [] 

319 

320 res = jsonutils.loads(res) if res else [] 

321 weight_map = {} 

322 for aggr in res: 

323 weight_map[aggr["key"]] = aggr["scores"]["total_weighted_score"] 

324 

325 # it must keep the lists with the same order. 

326 weights = [] 

327 for key in resource_keys: 

328 weights.append(weight_map.get(key, 0.0)) 

329 

330 return weights 

331 

332 def weigh_objects(self, weighed_obj_list, weight_properties): 

333 """Weigh multiple objects using Active IQ.""" 

334 netapp_aggregates_location = [] 

335 for obj in weighed_obj_list: 

336 

337 # if at least one host is not from NetApp, the entire weigher is 

338 # skipped. 

339 if obj.obj.vendor_name != "NetApp": 

340 LOG.debug( 

341 "Skipping Active IQ weigher given that some backends " 

342 "are not from NetApp.") 

343 return [] 

344 else: 

345 cluster_name = obj.obj.capabilities.get("netapp_cluster_name") 

346 aggr_name = obj.obj.pool_name 

347 netapp_aggregates_location.append( 

348 self._get_aggregate_identifier(aggr_name, cluster_name)) 

349 

350 result = self._weigh_active_iq( 

351 netapp_aggregates_location, weight_properties) 

352 

353 LOG.debug("Active IQ weight result: %s", result) 

354 return result