Coverage for manila/network/linux/ip_lib.py: 93%
294 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2014 Mirantis Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import netaddr
18from manila.i18n import _
19from manila import utils
22LOOPBACK_DEVNAME = 'lo'
25class SubProcessBase(object):
26 def __init__(self, namespace=None):
27 self.namespace = namespace
29 def _run(self, options, command, args):
30 if self.namespace:
31 return self._as_root(options, command, args)
32 else:
33 return self._execute(options, command, args)
35 def _as_root(self, options, command, args, use_root_namespace=False):
36 namespace = self.namespace if not use_root_namespace else None
38 return self._execute(options, command, args, namespace, as_root=True)
40 @classmethod
41 def _execute(cls, options, command, args, namespace=None, as_root=False):
42 opt_list = ['-%s' % o for o in options]
43 if namespace:
44 ip_cmd = ['ip', 'netns', 'exec', namespace, 'ip']
45 else:
46 ip_cmd = ['ip']
47 total_cmd = ip_cmd + opt_list + [command] + list(args)
48 return utils.execute(*total_cmd, run_as_root=as_root)[0]
51class IPWrapper(SubProcessBase):
52 def __init__(self, namespace=None):
53 super(IPWrapper, self).__init__(namespace=namespace)
54 self.netns = IpNetnsCommand(self)
56 def device(self, name):
57 return IPDevice(name, self.namespace)
59 def get_devices(self, exclude_loopback=False):
60 retval = []
61 output = self._execute('o', 'link', ('list',), self.namespace)
62 for line in output.split('\n'):
63 if '<' not in line:
64 continue
65 tokens = line.split(':', 2)
66 if len(tokens) >= 3: 66 ↛ 62line 66 didn't jump to line 62 because the condition on line 66 was always true
67 name = tokens[1].split('@', 1)[0].strip()
69 if exclude_loopback and name == LOOPBACK_DEVNAME: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true
70 continue
72 retval.append(IPDevice(name, self.namespace))
73 return retval
75 def add_tuntap(self, name, mode='tap'):
76 self._as_root('', 'tuntap', ('add', name, 'mode', mode))
77 return IPDevice(name, self.namespace)
79 def add_veth(self, name1, name2, namespace2=None):
80 args = ['add', name1, 'type', 'veth', 'peer', 'name', name2]
82 if namespace2 is None:
83 namespace2 = self.namespace
84 else:
85 self.ensure_namespace(namespace2)
86 args += ['netns', namespace2]
88 self._as_root('', 'link', tuple(args))
90 return (IPDevice(name1, self.namespace), IPDevice(name2, namespace2))
92 def ensure_namespace(self, name):
93 if not self.netns.exists(name):
94 ip = self.netns.add(name)
95 lo = ip.device(LOOPBACK_DEVNAME)
96 lo.link.set_up()
97 else:
98 ip = IPWrapper(name)
99 return ip
101 def namespace_is_empty(self):
102 return not self.get_devices(exclude_loopback=True)
104 def garbage_collect_namespace(self):
105 """Conditionally destroy the namespace if it is empty."""
106 if self.namespace and self.netns.exists(self.namespace):
107 if self.namespace_is_empty():
108 self.netns.delete(self.namespace)
109 return True
110 return False
112 def add_device_to_namespace(self, device):
113 if self.namespace:
114 device.link.set_netns(self.namespace)
116 @classmethod
117 def get_namespaces(cls):
118 output = cls._execute('', 'netns', ('list',))
119 return [ns.strip() for ns in output.split('\n')]
122class IPDevice(SubProcessBase):
123 def __init__(self, name, namespace=None):
124 super(IPDevice, self).__init__(namespace=namespace)
125 self.name = name
126 self.link = IpLinkCommand(self)
127 self.addr = IpAddrCommand(self)
128 self.route = IpRouteCommand(self)
130 def __eq__(self, other):
131 return (other is not None and self.name == other.name
132 and self.namespace == other.namespace)
134 def __str__(self):
135 return self.name
138class IpCommandBase(object):
139 COMMAND = ''
141 def __init__(self, parent):
142 self._parent = parent
144 def _run(self, *args, **kwargs):
145 return self._parent._run(kwargs.get('options', []), self.COMMAND, args)
147 def _as_root(self, *args, **kwargs):
148 return self._parent._as_root(kwargs.get('options', []),
149 self.COMMAND,
150 args,
151 kwargs.get('use_root_namespace', False))
154class IpDeviceCommandBase(IpCommandBase):
155 @property
156 def name(self):
157 return self._parent.name
160class IpLinkCommand(IpDeviceCommandBase):
161 COMMAND = 'link'
163 def set_address(self, mac_address):
164 self._as_root('set', self.name, 'address', mac_address)
166 def set_mtu(self, mtu_size):
167 self._as_root('set', self.name, 'mtu', mtu_size)
169 def set_up(self):
170 self._as_root('set', self.name, 'up')
172 def set_down(self):
173 self._as_root('set', self.name, 'down')
175 def set_netns(self, namespace):
176 self._as_root('set', self.name, 'netns', namespace)
177 self._parent.namespace = namespace
179 def set_name(self, name):
180 self._as_root('set', self.name, 'name', name)
181 self._parent.name = name
183 def set_alias(self, alias_name):
184 self._as_root('set', self.name, 'alias', alias_name)
186 def delete(self):
187 self._as_root('delete', self.name)
189 @property
190 def address(self):
191 return self.attributes.get('link/ether')
193 @property
194 def state(self):
195 return self.attributes.get('state')
197 @property
198 def mtu(self):
199 return self.attributes.get('mtu')
201 @property
202 def qdisc(self):
203 return self.attributes.get('qdisc')
205 @property
206 def qlen(self):
207 return self.attributes.get('qlen')
209 @property
210 def alias(self):
211 return self.attributes.get('alias')
213 @property
214 def attributes(self):
215 return self._parse_line(self._run('show', self.name, options='o'))
217 def _parse_line(self, value):
218 if not value: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 return {}
221 device_name, settings = value.replace("\\", '').split('>', 1)
222 tokens = settings.split()
223 keys = tokens[::2]
224 values = [int(v) if v.isdigit() else v for v in tokens[1::2]]
226 retval = dict(zip(keys, values))
227 return retval
230class IpAddrCommand(IpDeviceCommandBase):
231 COMMAND = 'addr'
233 def add(self, ip_version, cidr, broadcast, scope='global'):
234 self._as_root('add',
235 cidr,
236 'brd',
237 broadcast,
238 'scope',
239 scope,
240 'dev',
241 self.name,
242 options=[ip_version])
244 def delete(self, ip_version, cidr):
245 self._as_root('del',
246 cidr,
247 'dev',
248 self.name,
249 options=[ip_version])
251 def flush(self):
252 self._as_root('flush', self.name)
254 def list(self, scope=None, to=None, filters=None):
255 if filters is None:
256 filters = []
258 retval = []
260 if scope:
261 filters += ['scope', scope]
262 if to: 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true
263 filters += ['to', to]
265 for line in self._run('show', self.name, *filters).split('\n'):
266 line = line.strip()
267 if not line.startswith('inet'):
268 continue
269 parts = line.split()
270 if parts[0] == 'inet6':
271 version = 6
272 scope = parts[3]
273 broadcast = '::'
274 else:
275 version = 4
276 if parts[2] == 'brd':
277 broadcast = parts[3]
278 scope = parts[5]
279 else:
280 # sometimes output of 'ip a' might look like:
281 # inet 192.168.100.100/24 scope global eth0
282 # and broadcast needs to be calculated from CIDR
283 broadcast = str(netaddr.IPNetwork(parts[1]).broadcast)
284 scope = parts[3]
286 retval.append(dict(cidr=parts[1],
287 broadcast=broadcast,
288 scope=scope,
289 ip_version=version,
290 dynamic=('dynamic' == parts[-1])))
291 return retval
294class IpRouteCommand(IpDeviceCommandBase):
295 COMMAND = 'route'
297 def add_gateway(self, gateway, metric=None):
298 args = ['replace', 'default', 'via', gateway]
299 if metric: 299 ↛ 301line 299 didn't jump to line 301 because the condition on line 299 was always true
300 args += ['metric', metric]
301 args += ['dev', self.name]
302 self._as_root(*args)
304 def delete_gateway(self, gateway):
305 self._as_root('del',
306 'default',
307 'via',
308 gateway,
309 'dev',
310 self.name)
312 def get_gateway(self, scope=None, filters=None):
313 if filters is None: 313 ↛ 316line 313 didn't jump to line 316 because the condition on line 313 was always true
314 filters = []
316 retval = None
318 if scope: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true
319 filters += ['scope', scope]
321 route_list_lines = self._run('list', 'dev', self.name,
322 *filters).split('\n')
323 default_route_line = next((x.strip() for x in
324 route_list_lines if
325 x.strip().startswith('default')), None)
326 if default_route_line:
327 gateway_index = 2
328 parts = default_route_line.split()
329 retval = dict(gateway=parts[gateway_index])
330 metric_index = 4
331 parts_has_metric = (len(parts) > metric_index)
332 if parts_has_metric:
333 retval.update(metric=int(parts[metric_index]))
335 return retval
337 def pullup_route(self, interface_name):
338 """Pullup route entry.
340 Ensures that the route entry for the interface is before all
341 others on the same subnet.
342 """
343 device_list = []
344 device_route_list_lines = self._run('list', 'proto', 'kernel',
345 'dev', interface_name).split('\n')
346 for device_route_line in device_route_list_lines:
347 try:
348 subnet = device_route_line.split()[0]
349 except Exception:
350 continue
351 subnet_route_list_lines = self._run(
352 'list', 'proto', 'kernel', 'exact', subnet).split('\n')
353 for subnet_route_line in subnet_route_list_lines: 353 ↛ 369line 353 didn't jump to line 369 because the loop on line 353 didn't complete
354 i = iter(subnet_route_line.split())
355 while next(i) != 'dev':
356 pass
357 device = next(i)
358 try:
359 while next(i) != 'src':
360 pass
361 src = next(i)
362 except Exception:
363 src = ''
364 if device != interface_name:
365 device_list.append((device, src))
366 else:
367 break
369 for (device, src) in device_list:
370 self._as_root('del', subnet, 'dev', device)
371 if (src != ''): 371 ↛ 375line 371 didn't jump to line 375 because the condition on line 371 was always true
372 self._as_root('append', subnet, 'proto', 'kernel',
373 'src', src, 'dev', device)
374 else:
375 self._as_root('append', subnet, 'proto', 'kernel',
376 'dev', device)
378 def clear_outdated_routes(self, cidr):
379 """Removes duplicated routes for a certain network CIDR.
381 Removes all routes related to supplied CIDR except for the one
382 related to this interface device.
384 :param cidr: The network CIDR to be cleared.
385 """
386 routes = self.list()
387 items = [x for x in routes
388 if x['Destination'] == cidr and x.get('Device') and
389 x['Device'] != self.name]
390 for item in items:
391 self.delete_net_route(item['Destination'], item['Device'])
393 def list(self):
394 """List all routes
396 :return: A dictionary with field 'Destination' and 'Device' for each
397 route entry. 'Gateway' field is included if route has a gateway.
398 """
399 routes = []
400 output = self._as_root('list')
401 lines = output.split('\n')
402 for line in lines:
403 items = line.split()
404 if len(items) > 0:
405 item = {'Destination': items[0]}
406 if len(items) > 1: 406 ↛ 413line 406 didn't jump to line 413 because the condition on line 406 was always true
407 if items[1] == 'via':
408 item['Gateway'] = items[2]
409 if len(items) > 3 and items[3] == 'dev':
410 item['Device'] = items[4]
411 if items[1] == 'dev':
412 item['Device'] = items[2]
413 routes.append(item)
414 return routes
416 def delete_net_route(self, cidr, device):
417 """Deletes a route according to supplied CIDR and interface device.
419 :param cidr: The network CIDR to be removed.
420 :param device: The network interface device to be removed.
421 """
422 self._as_root('delete', cidr, 'dev', device)
425class IpNetnsCommand(IpCommandBase):
426 COMMAND = 'netns'
428 def add(self, name):
429 self._as_root('add', name, use_root_namespace=True)
430 return IPWrapper(name)
432 def delete(self, name):
433 self._as_root('delete', name, use_root_namespace=True)
435 def execute(self, cmds, addl_env=None, check_exit_code=True):
436 if addl_env is None:
437 addl_env = dict()
439 if not self._parent.namespace: 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true
440 raise Exception(_('No namespace defined for parent'))
441 else:
442 env_params = []
443 if addl_env:
444 env_params = (['env'] + ['%s=%s' % pair
445 for pair in sorted(addl_env.items())])
446 total_cmd = (['ip', 'netns', 'exec', self._parent.namespace] +
447 env_params + list(cmds))
448 return utils.execute(*total_cmd, run_as_root=True,
449 check_exit_code=check_exit_code)
451 def exists(self, name):
452 output = self._as_root('list', options='o', use_root_namespace=True)
454 for line in output.split('\n'):
455 if name == line.strip():
456 return True
457 return False
460def device_exists(device_name, namespace=None):
461 try:
462 address = IPDevice(device_name, namespace).link.address
463 except Exception as e:
464 if 'does not exist' in str(e): 464 ↛ 466line 464 didn't jump to line 466 because the condition on line 464 was always true
465 return False
466 raise
467 return bool(address)
470def iproute_arg_supported(command, arg):
471 command += ['help']
472 stdout, stderr = utils.execute(command, check_exit_code=False,
473 return_stderr=True)
474 return any(arg in line for line in stderr.split('\n'))