Coverage for manila/network/neutron/api.py: 72%
267 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2013 OpenStack Foundation
2# Copyright 2014 Mirantis Inc.
3# All Rights Reserved
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
17from keystoneauth1 import exceptions as ks_exec
18from keystoneauth1 import loading as ks_loading
19from neutronclient.common import exceptions as neutron_client_exc
20from neutronclient.v2_0 import client as clientv20
21from oslo_config import cfg
22from oslo_log import log
24from manila.common import client_auth
25from manila import context
26from manila import exception
27from manila.network.neutron import constants as neutron_constants
28from manila import utils
31NEUTRON_GROUP = 'neutron'
34neutron_opts = [
35 cfg.StrOpt(
36 'url',
37 help='URL for connecting to neutron.'),
38 cfg.IntOpt(
39 'url_timeout',
40 deprecated_for_removal=True,
41 deprecated_reason='This parameter has had no effect since 2.0.0. '
42 'The timeout parameter should be used instead.',
43 deprecated_since='Yoga',
44 default=30,
45 help='Timeout value for connecting to neutron in seconds.'),
46 cfg.StrOpt(
47 'auth_strategy',
48 deprecated_for_removal=True,
49 deprecated_reason='This parameter has had no effect since 2.0.0. '
50 'Use the auth_type parameter to select '
51 'authentication type',
52 deprecated_since='Yoga',
53 default='keystone',
54 help='Auth strategy for connecting to neutron in admin context.'),
55 cfg.StrOpt(
56 'endpoint_type',
57 default='publicURL',
58 choices=['publicURL', 'internalURL', 'adminURL',
59 'public', 'internal', 'admin'],
60 help='Endpoint type to be used with neutron client calls.'),
61 cfg.StrOpt(
62 'region_name',
63 help='Region name for connecting to neutron in admin context.'),
64]
67CONF = cfg.CONF
68LOG = log.getLogger(__name__)
71class PortBindingAlreadyExistsClient(neutron_client_exc.Conflict):
72 pass
75# We need to monkey-patch neutronclient.common.exceptions module, to make
76# neutron client to raise error specific exceptions. E.g. exception
77# PortBindingAlreadyExistsClient is raised for Neutron API error
78# PortBindingAlreadyExists. If not defined, a general exception of type
79# Conflict will be raised.
80neutron_client_exc.PortBindingAlreadyExistsClient = \
81 PortBindingAlreadyExistsClient
84def list_opts():
85 return client_auth.AuthClientLoader.list_opts(NEUTRON_GROUP)
88class API(object):
89 """API for interacting with the neutron 2.x API.
91 :param configuration: instance of config or config group.
92 """
94 def __init__(self, config_group_name=None):
95 self.config_group_name = config_group_name or 'DEFAULT'
97 ks_loading.register_session_conf_options(
98 CONF, NEUTRON_GROUP)
99 ks_loading.register_auth_conf_options(CONF, NEUTRON_GROUP)
100 CONF.register_opts(neutron_opts, NEUTRON_GROUP)
102 self.configuration = getattr(CONF, self.config_group_name, CONF)
103 self.last_neutron_extension_sync = None
104 self.extensions = {}
105 self.auth_obj = None
107 @property
108 def client(self):
109 return self.get_client(context.get_admin_context())
111 def get_client(self, context):
112 if not self.auth_obj:
113 self.auth_obj = client_auth.AuthClientLoader(
114 client_class=clientv20.Client, cfg_group=NEUTRON_GROUP)
116 return self.auth_obj.get_client(
117 self,
118 context,
119 endpoint_type=CONF[NEUTRON_GROUP].endpoint_type,
120 region_name=CONF[NEUTRON_GROUP].region_name,
121 endpoint_override=CONF[NEUTRON_GROUP].url,
122 )
124 @property
125 def admin_project_id(self):
126 if self.client.httpclient.auth_token is None:
127 try:
128 self.client.httpclient.authenticate()
129 except neutron_client_exc.NeutronClientException as e:
130 raise exception.NetworkException(code=e.status_code,
131 message=e.message)
132 return self.client.httpclient.get_project_id()
134 def get_all_admin_project_networks(self):
135 search_opts = {'tenant_id': self.admin_project_id, 'shared': False}
136 nets = self.client.list_networks(**search_opts).get('networks', [])
137 return nets
139 def create_port(self, tenant_id, network_id, host_id=None, subnet_id=None,
140 fixed_ip=None, device_owner=None, device_id=None,
141 mac_address=None, port_security_enabled=True,
142 security_group_ids=None, dhcp_opts=None, **kwargs):
143 return self._create_port(tenant_id, network_id, host_id=host_id,
144 subnet_id=subnet_id, fixed_ip=fixed_ip,
145 device_owner=device_owner,
146 device_id=device_id, mac_address=mac_address,
147 port_security_enabled=port_security_enabled,
148 security_group_ids=security_group_ids,
149 dhcp_opts=dhcp_opts, **kwargs)
151 @utils.retry(retry_param=ks_exec.ConnectFailure, retries=5)
152 def _create_port(self, tenant_id, network_id, host_id=None, subnet_id=None,
153 fixed_ip=None, device_owner=None, device_id=None,
154 mac_address=None, port_security_enabled=True,
155 security_group_ids=None, dhcp_opts=None, name=None,
156 **kwargs):
157 try:
158 port_req_body = {'port': {}}
159 port_req_body['port']['network_id'] = network_id
160 port_req_body['port']['admin_state_up'] = True
161 port_req_body['port']['tenant_id'] = tenant_id
162 if not port_security_enabled: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 port_req_body['port']['port_security_enabled'] = (
164 port_security_enabled)
165 elif security_group_ids:
166 port_req_body['port']['security_groups'] = security_group_ids
167 if mac_address:
168 port_req_body['port']['mac_address'] = mac_address
169 if host_id:
170 if not self._has_port_binding_extension():
171 msg = ("host_id (%(host_id)s) specified but neutron "
172 "doesn't support port binding. Please activate the "
173 "extension accordingly." % {"host_id": host_id})
174 raise exception.NetworkException(message=msg)
175 port_req_body['port']['binding:host_id'] = host_id
176 if dhcp_opts is not None:
177 port_req_body['port']['extra_dhcp_opts'] = dhcp_opts
178 if subnet_id:
179 fixed_ip_dict = {'subnet_id': subnet_id}
180 if fixed_ip: 180 ↛ 182line 180 didn't jump to line 182 because the condition on line 180 was always true
181 fixed_ip_dict.update({'ip_address': fixed_ip})
182 port_req_body['port']['fixed_ips'] = [fixed_ip_dict]
183 if device_owner:
184 port_req_body['port']['device_owner'] = device_owner
185 if device_id:
186 port_req_body['port']['device_id'] = device_id
187 if name: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true
188 port_req_body['port']['name'] = name
189 if kwargs:
190 port_req_body['port'].update(kwargs)
191 port = self.client.create_port(port_req_body).get('port', {})
192 return port
193 except neutron_client_exc.NeutronClientException as e:
194 LOG.exception('Neutron error creating port on network %s',
195 network_id)
196 if e.status_code == 409:
197 raise exception.PortLimitExceeded()
198 raise exception.NetworkException(code=e.status_code,
199 message=e.message)
200 except neutron_client_exc.IpAddressGenerationFailureClient:
201 LOG.warning('No free IP addresses in neutron subnet %s', subnet_id)
202 raise exception.IpAddressGenerationFailureClient()
203 except ks_exec.ConnectFailure:
204 LOG.warning('Create Port: Neutron connection failure')
205 # check if port is created in neutron else re-raise connectFailure
206 search_opts = {
207 'device_id': device_id,
208 'network_id': network_id,
209 'name': name
210 }
211 try:
212 ports = self.list_ports(**search_opts)
213 return ports[0]
214 except ks_exec.ConnectFailure as kse:
215 raise kse
217 @utils.retry(retry_param=ks_exec.ConnectFailure, retries=5)
218 def delete_port(self, port_id):
219 try:
220 self.client.delete_port(port_id)
221 except neutron_client_exc.PortNotFoundClient:
222 LOG.warning('Neutron port not found: %s', port_id)
223 pass
224 except neutron_client_exc.NeutronClientException as e:
225 raise exception.NetworkException(code=e.status_code,
226 message=e.message)
227 except ks_exec.ConnectFailure as e:
228 raise e
230 def delete_subnet(self, subnet_id):
231 try:
232 self.client.delete_subnet(subnet_id)
233 except neutron_client_exc.NeutronClientException as e:
234 raise exception.NetworkException(code=e.status_code,
235 message=e.message)
237 def list_ports(self, **search_opts):
238 """List ports for the client based on search options."""
239 return self.client.list_ports(**search_opts).get('ports')
241 @utils.retry(retry_param=ks_exec.ConnectFailure, retries=5)
242 def show_port(self, port_id):
243 """Return the port for the client given the port id."""
244 try:
245 return self.client.show_port(port_id).get('port')
246 except neutron_client_exc.NeutronClientException as e:
247 raise exception.NetworkException(code=e.status_code,
248 message=e.message)
249 except ks_exec.ConnectFailure as e:
250 raise e
252 def get_all_networks(self):
253 """Get all networks for client."""
254 return self.client.list_networks().get('networks')
256 @utils.retry(retry_param=ks_exec.ConnectFailure, retries=5)
257 def get_network(self, network_uuid):
258 """Get specific network for client."""
259 try:
260 network = self.client.show_network(network_uuid).get('network', {})
261 return network
262 except neutron_client_exc.NeutronClientException as e:
263 raise exception.NetworkException(code=e.status_code,
264 message=e.message)
265 except ks_exec.ConnectFailure as e:
266 raise e
268 def get_subnet(self, subnet_uuid):
269 """Get specific subnet for client."""
270 try:
271 return self.client.show_subnet(subnet_uuid).get('subnet', {})
272 except neutron_client_exc.NeutronClientException as e:
273 raise exception.NetworkException(code=e.status_code,
274 message=e.message)
276 def list_extensions(self):
277 extensions_list = self.client.list_extensions().get('extensions')
278 return {ext['name']: ext for ext in extensions_list}
280 def _has_port_binding_extension(self):
281 if not self.extensions:
282 self.extensions = self.list_extensions()
283 return neutron_constants.PORTBINDING_EXT in self.extensions
285 def router_create(self, tenant_id, name):
286 router_req_body = {'router': {}}
287 router_req_body['router']['tenant_id'] = tenant_id
288 router_req_body['router']['name'] = name
289 try:
290 return self.client.create_router(router_req_body).get('router', {})
291 except neutron_client_exc.NeutronClientException as e:
292 raise exception.NetworkException(code=e.status_code,
293 message=e.message)
295 def network_create(self, tenant_id, name):
296 network_req_body = {'network': {}}
297 network_req_body['network']['tenant_id'] = tenant_id
298 network_req_body['network']['name'] = name
299 try:
300 return self.client.create_network(
301 network_req_body).get('network', {})
302 except neutron_client_exc.NeutronClientException as e:
303 raise exception.NetworkException(code=e.status_code,
304 message=e.message)
306 def subnet_create(self, tenant_id, net_id, name, cidr, no_gateway=False):
307 subnet_req_body = {'subnet': {}}
308 subnet_req_body['subnet']['tenant_id'] = tenant_id
309 subnet_req_body['subnet']['name'] = name
310 subnet_req_body['subnet']['network_id'] = net_id
311 subnet_req_body['subnet']['cidr'] = cidr
312 subnet_req_body['subnet']['ip_version'] = 4
313 if no_gateway: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 subnet_req_body['subnet']['gateway_ip'] = None
315 try:
316 return self.client.create_subnet(
317 subnet_req_body).get('subnet', {})
318 except neutron_client_exc.NeutronClientException as e:
319 raise exception.NetworkException(code=e.status_code,
320 message=e.message)
322 def router_add_interface(self, router_id, subnet_id, port_id=None):
323 body = {}
324 if subnet_id: 324 ↛ 326line 324 didn't jump to line 326 because the condition on line 324 was always true
325 body['subnet_id'] = subnet_id
326 if port_id: 326 ↛ 328line 326 didn't jump to line 328 because the condition on line 326 was always true
327 body['port_id'] = port_id
328 try:
329 self.client.add_interface_router(router_id, body)
330 except neutron_client_exc.NeutronClientException as e:
331 raise exception.NetworkException(code=e.status_code,
332 message=e.message)
334 def router_remove_interface(self, router_id, subnet_id, port_id=None):
335 body = {}
336 if subnet_id:
337 body['subnet_id'] = subnet_id
338 if port_id:
339 body['port_id'] = port_id
340 try:
341 self.client.remove_interface_router(router_id, body)
342 except neutron_client_exc.NeutronClientException as e:
343 raise exception.NetworkException(code=e.status_code,
344 message=e.message)
346 def router_list(self):
347 try:
348 return self.client.list_routers().get('routers', {})
349 except neutron_client_exc.NeutronClientException as e:
350 raise exception.NetworkException(code=e.status_code,
351 message=e.message)
353 def update_port_fixed_ips(self, port_id, fixed_ips):
354 try:
355 port_req_body = {'port': fixed_ips}
356 port = self.client.update_port(
357 port_id, port_req_body).get('port', {})
358 return port
359 except neutron_client_exc.NeutronClientException as e:
360 raise exception.NetworkException(code=e.status_code,
361 message=e.message)
363 def bind_port_to_host(self, port_id, host, vnic_type):
364 """Add an inactive binding to existing port."""
366 try:
367 data = {"binding": {"host": host, "vnic_type": vnic_type}}
368 return self.client.create_port_binding(port_id, data)['binding']
369 except neutron_client_exc.PortBindingAlreadyExistsClient as e:
370 LOG.warning('Port binding already exists: %s', e)
371 except neutron_client_exc.NeutronClientException as e:
372 raise exception.NetworkException(
373 code=e.status_code, message=e.message)
375 def delete_port_binding(self, port_id, host):
376 try:
377 return self.client.delete_port_binding(port_id, host)
378 except neutron_client_exc.NeutronClientException as e:
379 raise exception.NetworkException(
380 code=e.status_code, message=e.message)
382 def activate_port_binding(self, port_id, host):
383 try:
384 return self.client.activate_port_binding(port_id, host)
385 except neutron_client_exc.NeutronClientException as e:
386 raise exception.NetworkException(
387 code=e.status_code, message=e.message)
389 def show_router(self, router_id):
390 try:
391 return self.client.show_router(router_id).get('router', {})
392 except neutron_client_exc.NeutronClientException as e:
393 raise exception.NetworkException(code=e.status_code,
394 message=e.message)
396 def router_update_routes(self, router_id, routes):
397 try:
398 router_req_body = {'router': routes}
399 port = self.client.update_router(
400 router_id, router_req_body).get('router', {})
401 return port
402 except neutron_client_exc.NeutronClientException as e:
403 raise exception.NetworkException(code=e.status_code,
404 message=e.message)
406 def update_subnet(self, subnet_uuid, name):
407 """Update specific subnet for client."""
408 subnet_req_body = {'subnet': {'name': name}}
409 try:
410 return self.client.update_subnet(
411 subnet_uuid, subnet_req_body).get('subnet', {})
412 except neutron_client_exc.NeutronClientException as e:
413 raise exception.NetworkException(code=e.status_code,
414 message=e.message)
416 def security_group_list(self, search_opts=None):
417 try:
418 return self.client.list_security_groups(**search_opts)
419 except neutron_client_exc.NeutronClientException as e:
420 raise exception.NetworkException(
421 code=e.status_code, message=e.message)
423 def security_group_create(self, name, description=""):
424 try:
425 return self.client.create_security_group(
426 {'security_group': {"name": name, "description": description}})
427 except neutron_client_exc.NeutronClientException as e:
428 raise exception.NetworkException(
429 code=e.status_code, message=e.message)
431 def security_group_rule_create(self, parent_group_id,
432 ip_protocol=None, from_port=None,
433 to_port=None, cidr=None, group_id=None,
434 direction="ingress"):
435 request = {"security_group_id": parent_group_id,
436 "protocol": ip_protocol, "remote_ip_prefix": cidr,
437 "remote_group_id": group_id, "direction": direction}
438 if ip_protocol != "icmp":
439 request["port_range_min"] = from_port
440 request["port_range_max"] = to_port
442 try:
443 return self.client.create_security_group_rule(
444 {"security_group_rule": request})
445 except neutron_client_exc.NeutronClientException as e:
446 raise exception.NetworkException(
447 code=e.status_code, message=e.message)