Coverage for manila/scheduler/weighers/netapp_aiq.py: 85%
156 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2023 NetApp, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16from oslo_config import cfg
17from oslo_log import log as logging
18from oslo_serialization import jsonutils
19from oslo_utils import netutils
20import requests
21from requests.adapters import HTTPAdapter
22from requests import auth
23from urllib3.util import retry
25from manila import exception
26from manila.scheduler.weighers import base_host
28ACTIVE_IQ_WEIGHER_GROUP = 'netapp_active_iq'
31active_iq_weight_opts = [
32 cfg.HostAddressOpt('aiq_hostname',
33 help='The hostname (or IP address) for the Active IQ.'),
34 cfg.PortOpt('aiq_port',
35 help=('The TCP port to use for communication with the Active '
36 'IQ. If not specified, the weigher driver will use 80 '
37 'for HTTP and 443 for HTTPS.')),
38 cfg.StrOpt('aiq_transport_type',
39 default='https',
40 choices=['http', 'https'],
41 help=('The transport protocol used when communicating with '
42 'the Active IQ. Valid values are '
43 'http or https.')),
44 cfg.BoolOpt('aiq_ssl_verify',
45 default=False,
46 help='Verifying the SSL certificate. Default is False.'),
47 cfg.StrOpt('aiq_ssl_cert_path',
48 help=("The path to a CA_BUNDLE file or directory with "
49 "certificates of trusted CA. If set to a directory, it "
50 "must have been processed using the c_rehash utility "
51 "supplied with OpenSSL. If not informed, it will use the "
52 "Mozilla's carefully curated collection of Root "
53 "Certificates for validating the trustworthiness of SSL "
54 "certificates.")),
55 cfg.StrOpt('aiq_username',
56 help=('Administrative user account name used to access the '
57 'Active IQ.')),
58 cfg.StrOpt('aiq_password',
59 help=('Password for the administrative user account '
60 'specified in the aiq_username option.'),
61 secret=True),
62 cfg.IntOpt('aiq_eval_method',
63 default=0,
64 help='Integer indicator of which evaluation method, defaults '
65 'to 0 (0 - by index, 1 - normalized value, 2 - by '
66 'literal value).'),
67 cfg.ListOpt('aiq_priority_order',
68 default=[
69 'ops',
70 'latency',
71 'volume_count',
72 'size'
73 ],
74 help='Permutation of the list ["volume_count", "size", '
75 '"latency", “ops”]. Note that for volume_count and '
76 'latency, the higher the values, the less optimal the '
77 'resources. For capacity and ops, the higher the value '
78 'the more desirable the resources. If metrics are to be '
79 'considered with equal weights, concatenate the strings, '
80 'separated by ":".'
81 'An example is ["volume_count", "size", “latency:ops”] '
82 'if latency and ops want to have equal but minimum '
83 'weights, or ["volume_count:size", "latency", “ops”] '
84 'if volume_count and size have equal maximum weights. '
85 'If not provided, the default order is '
86 '["volume_count", "size", "latency", “ops”].'),
87]
88CONF = cfg.CONF
89CONF.register_opts(active_iq_weight_opts, ACTIVE_IQ_WEIGHER_GROUP)
91LOG = logging.getLogger(__name__)
94class NetAppAIQWeigher(base_host.BaseHostWeigher):
95 """AIQ Weigher. Assign weights based on NetApp Active IQ tool."""
97 def __init__(self, *args, **kwargs):
98 super(NetAppAIQWeigher, self).__init__(*args, **kwargs)
100 self.configuration = CONF[ACTIVE_IQ_WEIGHER_GROUP]
102 self.host = self.configuration.aiq_hostname
103 if not self.host: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true
104 raise exception.NetappActiveIQWeigherRequiredParameter(
105 config="aiq_hostname")
107 self.username = self.configuration.aiq_username
108 if not self.username: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 raise exception.NetappActiveIQWeigherRequiredParameter(
110 config="aiq_username")
112 self.password = self.configuration.aiq_password
113 if not self.password: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 raise exception.NetappActiveIQWeigherRequiredParameter(
115 config="aiq_password")
117 self.protocol = self.configuration.aiq_transport_type
118 self.port = self.configuration.aiq_port
119 if not self.port: 119 ↛ 122line 119 didn't jump to line 122 because the condition on line 119 was always true
120 self.port = "80" if self.protocol == "http" else "443"
122 self.ssl_verify = self.configuration.aiq_ssl_verify
123 if self.ssl_verify and self.configuration.aiq_ssl_cert_path: 123 ↛ 126line 123 didn't jump to line 126 because the condition on line 123 was always true
124 self.ssl_verify = self.configuration.aiq_ssl_cert_path
126 self.eval_method = self.configuration.aiq_eval_method
127 self.priority_order = self.configuration.aiq_priority_order
129 def _weigh_object(self, host_state, weight_properties):
130 """Weight for a specific object from parent abstract class"""
131 # NOTE(felipe_rodrigues): this abstract class method is not called for
132 # the AIQ weigher, since it does not weigh one single object.
133 raise NotImplementedError()
135 def _weigh_active_iq(self, netapp_aggregates_location, weight_properties):
136 """Determine host's rating based on a Active IQ."""
137 size = weight_properties.get('size')
138 share_type = weight_properties.get('share_type', {})
139 performance_level_name = share_type.get('extra_specs', {}).get(
140 'netapp:performance_service_level_name')
142 # retrieves the performance service level key if a PSL name is given.
143 performance_level_id = None
144 if performance_level_name: 144 ↛ 151line 144 didn't jump to line 151 because the condition on line 144 was always true
145 performance_level_id = self._get_performance_level_id(
146 performance_level_name)
147 if not performance_level_id:
148 return []
150 # retrieves the equivalent active IQ keys of the pools.
151 resource_keys = self._get_resource_keys(netapp_aggregates_location)
152 if len(resource_keys) == 0:
153 return []
155 result = self._balance_aggregates(resource_keys, size,
156 performance_level_id)
158 return result
160 def _get_url(self):
161 """Get the base URL for REST requests."""
162 host = netutils.escape_ipv6(self.host)
163 return f'{self.protocol}://{host}:{self.port}/api/'
165 def _get_request_method(self, method, session):
166 """Returns the request method to be used in the REST call."""
168 request_methods = {
169 'post': session.post,
170 'get': session.get,
171 'put': session.put,
172 'delete': session.delete,
173 'patch': session.patch,
174 }
175 return request_methods[method]
177 def _get_session_method(self, method):
178 """Get the REST method from the session."""
180 # NOTE(felipe_rodrigues): request resilient of temporary network
181 # failures (like name resolution failure), retrying until 5 times.
182 _session = requests.Session()
183 max_retries = retry.Retry(total=5, connect=5, read=2, backoff_factor=1)
184 adapter = HTTPAdapter(max_retries=max_retries)
185 _session.mount('%s://' % self.protocol, adapter)
187 _session.auth = auth.HTTPBasicAuth(self.username, self.password)
188 _session.verify = self.ssl_verify
189 _session.headers = {}
191 return self._get_request_method(method, _session)
193 def _call_active_iq(self, action_path, method, body=None):
194 """Call the Active IQ REST API."""
195 rest_method = self._get_session_method(method)
196 url = self._get_url() + action_path
198 msg_args = {
199 "method": method.upper(),
200 "url": url,
201 "body": body,
202 }
203 LOG.debug("REQUEST: %(method)s %(url)s Body=%(body)s", msg_args)
205 response = rest_method(url, json=body)
207 code = response.status_code
208 response_body = response.content
209 msg_args = {
210 "code": code,
211 "body": response_body,
212 }
213 LOG.debug("RESPONSE: %(code)s Body=%(body)s", msg_args)
215 return code, response_body
217 def _get_performance_level_id(self, performance_level_name):
218 """Gets the ID of a performance level name."""
219 psl_endpoint = (f'storage-provider/performance-service-levels?'
220 f'name={performance_level_name}')
221 try:
222 code, res = self._call_active_iq(psl_endpoint, "get")
223 except Exception as e:
224 LOG.error("Could not retrieve the key of the performance service "
225 "level named as '%(psl)s'. Skipping the weigher. "
226 "Error: %(error)s",
227 {'psl': performance_level_name, 'error': e})
228 LOG.error(e)
229 return None
231 if code != 200:
232 LOG.error("Could not retrieve the key of the performance service "
233 "level named as '%(psl)s'. Skipping the weigher.",
234 {'psl': performance_level_name})
235 return None
237 res = jsonutils.loads(res) if res else {}
238 psl_list = res.get('records', [])
239 if len(psl_list) == 0:
240 LOG.error("Could not found any performance service level named "
241 "as '%s'. Skipping the weigher.", performance_level_name)
242 return None
244 return psl_list[0].get("key", None)
246 def _get_aggregate_identifier(self, aggr_name, cluster_name):
247 """Returns the string identifier of an aggregate on a cluster."""
248 return f'{aggr_name}:{cluster_name}'
250 def _get_resource_keys(self, netapp_aggregates_location):
251 """Map the aggregates names to the AIQ resource keys."""
252 aggregate_endpoint = 'datacenter/storage/aggregates'
254 try:
255 code, res = self._call_active_iq(aggregate_endpoint, "get")
256 except Exception as e:
257 LOG.error("Could not retrieve the aggregates resource keys. "
258 "Skipping the weigher. Error: %s", e)
259 LOG.error(e)
260 return []
262 if code != 200:
263 LOG.error("Could not retrieve the aggregates resource keys. "
264 "Skipping the weigher.")
265 return []
267 res = jsonutils.loads(res) if res else {}
268 aggr_map = {}
269 for aggr in res.get('records', []):
270 identifier = self._get_aggregate_identifier(
271 aggr["name"], aggr["cluster"]["name"])
272 aggr_map[identifier] = aggr["key"]
274 # we must keep the lists with the same order.
275 resource_keys = []
276 found_pool_keys = []
277 for identifier in netapp_aggregates_location:
278 if identifier in aggr_map:
279 found_pool_keys.append(identifier)
280 # If a pool could not be found, it is marked as resource key 0.
281 resource_keys.append(aggr_map.get(identifier, 0))
283 LOG.debug("The following pools will be evaluated by Active IQ: %s",
284 found_pool_keys)
286 return resource_keys
288 def _balance_aggregates(self, resource_keys, size, performance_level_uuid):
289 """Call AIQ to generate the weights of each aggregate."""
290 balance_endpoint = 'storage-provider/data-placement/balance'
291 body = {
292 "capacity": f'{size}GB',
293 "eval_method": self.eval_method,
294 # NOTE(felipe_rodrigues): from Active IQ documentation, the
295 # opt_method only works as 0.
296 "opt_method": 0,
297 "priority_order": self.priority_order,
298 "separate_flag": False,
299 # NOTE(felipe_rodrigues): remove the keys marked with 0, since they
300 # are not found the pool keys.
301 "resource_keys": [key for key in resource_keys if key != 0],
302 }
303 if performance_level_uuid: 303 ↛ 306line 303 didn't jump to line 306 because the condition on line 303 was always true
304 body["ssl_key"] = performance_level_uuid
306 try:
307 code, res = self._call_active_iq(
308 balance_endpoint, "post", body=body)
309 except Exception as e:
310 LOG.error("Could not balance the aggregates. Skipping the "
311 "weigher. Error: %s", e)
312 LOG.error(e)
313 return []
315 if code != 200:
316 LOG.error("Could not balance the aggregates. Skipping the "
317 "weigher.")
318 return []
320 res = jsonutils.loads(res) if res else []
321 weight_map = {}
322 for aggr in res:
323 weight_map[aggr["key"]] = aggr["scores"]["total_weighted_score"]
325 # it must keep the lists with the same order.
326 weights = []
327 for key in resource_keys:
328 weights.append(weight_map.get(key, 0.0))
330 return weights
332 def weigh_objects(self, weighed_obj_list, weight_properties):
333 """Weigh multiple objects using Active IQ."""
334 netapp_aggregates_location = []
335 for obj in weighed_obj_list:
337 # if at least one host is not from NetApp, the entire weigher is
338 # skipped.
339 if obj.obj.vendor_name != "NetApp":
340 LOG.debug(
341 "Skipping Active IQ weigher given that some backends "
342 "are not from NetApp.")
343 return []
344 else:
345 cluster_name = obj.obj.capabilities.get("netapp_cluster_name")
346 aggr_name = obj.obj.pool_name
347 netapp_aggregates_location.append(
348 self._get_aggregate_identifier(aggr_name, cluster_name))
350 result = self._weigh_active_iq(
351 netapp_aggregates_location, weight_properties)
353 LOG.debug("Active IQ weight result: %s", result)
354 return result