Coverage for manila/network/linux/ip_lib.py: 93%

294 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2014 Mirantis Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import netaddr 

17 

18from manila.i18n import _ 

19from manila import utils 

20 

21 

22LOOPBACK_DEVNAME = 'lo' 

23 

24 

25class SubProcessBase(object): 

26 def __init__(self, namespace=None): 

27 self.namespace = namespace 

28 

29 def _run(self, options, command, args): 

30 if self.namespace: 

31 return self._as_root(options, command, args) 

32 else: 

33 return self._execute(options, command, args) 

34 

35 def _as_root(self, options, command, args, use_root_namespace=False): 

36 namespace = self.namespace if not use_root_namespace else None 

37 

38 return self._execute(options, command, args, namespace, as_root=True) 

39 

40 @classmethod 

41 def _execute(cls, options, command, args, namespace=None, as_root=False): 

42 opt_list = ['-%s' % o for o in options] 

43 if namespace: 

44 ip_cmd = ['ip', 'netns', 'exec', namespace, 'ip'] 

45 else: 

46 ip_cmd = ['ip'] 

47 total_cmd = ip_cmd + opt_list + [command] + list(args) 

48 return utils.execute(*total_cmd, run_as_root=as_root)[0] 

49 

50 

51class IPWrapper(SubProcessBase): 

52 def __init__(self, namespace=None): 

53 super(IPWrapper, self).__init__(namespace=namespace) 

54 self.netns = IpNetnsCommand(self) 

55 

56 def device(self, name): 

57 return IPDevice(name, self.namespace) 

58 

59 def get_devices(self, exclude_loopback=False): 

60 retval = [] 

61 output = self._execute('o', 'link', ('list',), self.namespace) 

62 for line in output.split('\n'): 

63 if '<' not in line: 

64 continue 

65 tokens = line.split(':', 2) 

66 if len(tokens) >= 3: 66 ↛ 62line 66 didn't jump to line 62 because the condition on line 66 was always true

67 name = tokens[1].split('@', 1)[0].strip() 

68 

69 if exclude_loopback and name == LOOPBACK_DEVNAME: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true

70 continue 

71 

72 retval.append(IPDevice(name, self.namespace)) 

73 return retval 

74 

75 def add_tuntap(self, name, mode='tap'): 

76 self._as_root('', 'tuntap', ('add', name, 'mode', mode)) 

77 return IPDevice(name, self.namespace) 

78 

79 def add_veth(self, name1, name2, namespace2=None): 

80 args = ['add', name1, 'type', 'veth', 'peer', 'name', name2] 

81 

82 if namespace2 is None: 

83 namespace2 = self.namespace 

84 else: 

85 self.ensure_namespace(namespace2) 

86 args += ['netns', namespace2] 

87 

88 self._as_root('', 'link', tuple(args)) 

89 

90 return (IPDevice(name1, self.namespace), IPDevice(name2, namespace2)) 

91 

92 def ensure_namespace(self, name): 

93 if not self.netns.exists(name): 

94 ip = self.netns.add(name) 

95 lo = ip.device(LOOPBACK_DEVNAME) 

96 lo.link.set_up() 

97 else: 

98 ip = IPWrapper(name) 

99 return ip 

100 

101 def namespace_is_empty(self): 

102 return not self.get_devices(exclude_loopback=True) 

103 

104 def garbage_collect_namespace(self): 

105 """Conditionally destroy the namespace if it is empty.""" 

106 if self.namespace and self.netns.exists(self.namespace): 

107 if self.namespace_is_empty(): 

108 self.netns.delete(self.namespace) 

109 return True 

110 return False 

111 

112 def add_device_to_namespace(self, device): 

113 if self.namespace: 

114 device.link.set_netns(self.namespace) 

115 

116 @classmethod 

117 def get_namespaces(cls): 

118 output = cls._execute('', 'netns', ('list',)) 

119 return [ns.strip() for ns in output.split('\n')] 

120 

121 

122class IPDevice(SubProcessBase): 

123 def __init__(self, name, namespace=None): 

124 super(IPDevice, self).__init__(namespace=namespace) 

125 self.name = name 

126 self.link = IpLinkCommand(self) 

127 self.addr = IpAddrCommand(self) 

128 self.route = IpRouteCommand(self) 

129 

130 def __eq__(self, other): 

131 return (other is not None and self.name == other.name 

132 and self.namespace == other.namespace) 

133 

134 def __str__(self): 

135 return self.name 

136 

137 

138class IpCommandBase(object): 

139 COMMAND = '' 

140 

141 def __init__(self, parent): 

142 self._parent = parent 

143 

144 def _run(self, *args, **kwargs): 

145 return self._parent._run(kwargs.get('options', []), self.COMMAND, args) 

146 

147 def _as_root(self, *args, **kwargs): 

148 return self._parent._as_root(kwargs.get('options', []), 

149 self.COMMAND, 

150 args, 

151 kwargs.get('use_root_namespace', False)) 

152 

153 

154class IpDeviceCommandBase(IpCommandBase): 

155 @property 

156 def name(self): 

157 return self._parent.name 

158 

159 

160class IpLinkCommand(IpDeviceCommandBase): 

161 COMMAND = 'link' 

162 

163 def set_address(self, mac_address): 

164 self._as_root('set', self.name, 'address', mac_address) 

165 

166 def set_mtu(self, mtu_size): 

167 self._as_root('set', self.name, 'mtu', mtu_size) 

168 

169 def set_up(self): 

170 self._as_root('set', self.name, 'up') 

171 

172 def set_down(self): 

173 self._as_root('set', self.name, 'down') 

174 

175 def set_netns(self, namespace): 

176 self._as_root('set', self.name, 'netns', namespace) 

177 self._parent.namespace = namespace 

178 

179 def set_name(self, name): 

180 self._as_root('set', self.name, 'name', name) 

181 self._parent.name = name 

182 

183 def set_alias(self, alias_name): 

184 self._as_root('set', self.name, 'alias', alias_name) 

185 

186 def delete(self): 

187 self._as_root('delete', self.name) 

188 

189 @property 

190 def address(self): 

191 return self.attributes.get('link/ether') 

192 

193 @property 

194 def state(self): 

195 return self.attributes.get('state') 

196 

197 @property 

198 def mtu(self): 

199 return self.attributes.get('mtu') 

200 

201 @property 

202 def qdisc(self): 

203 return self.attributes.get('qdisc') 

204 

205 @property 

206 def qlen(self): 

207 return self.attributes.get('qlen') 

208 

209 @property 

210 def alias(self): 

211 return self.attributes.get('alias') 

212 

213 @property 

214 def attributes(self): 

215 return self._parse_line(self._run('show', self.name, options='o')) 

216 

217 def _parse_line(self, value): 

218 if not value: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 return {} 

220 

221 device_name, settings = value.replace("\\", '').split('>', 1) 

222 tokens = settings.split() 

223 keys = tokens[::2] 

224 values = [int(v) if v.isdigit() else v for v in tokens[1::2]] 

225 

226 retval = dict(zip(keys, values)) 

227 return retval 

228 

229 

230class IpAddrCommand(IpDeviceCommandBase): 

231 COMMAND = 'addr' 

232 

233 def add(self, ip_version, cidr, broadcast, scope='global'): 

234 self._as_root('add', 

235 cidr, 

236 'brd', 

237 broadcast, 

238 'scope', 

239 scope, 

240 'dev', 

241 self.name, 

242 options=[ip_version]) 

243 

244 def delete(self, ip_version, cidr): 

245 self._as_root('del', 

246 cidr, 

247 'dev', 

248 self.name, 

249 options=[ip_version]) 

250 

251 def flush(self): 

252 self._as_root('flush', self.name) 

253 

254 def list(self, scope=None, to=None, filters=None): 

255 if filters is None: 

256 filters = [] 

257 

258 retval = [] 

259 

260 if scope: 

261 filters += ['scope', scope] 

262 if to: 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true

263 filters += ['to', to] 

264 

265 for line in self._run('show', self.name, *filters).split('\n'): 

266 line = line.strip() 

267 if not line.startswith('inet'): 

268 continue 

269 parts = line.split() 

270 if parts[0] == 'inet6': 

271 version = 6 

272 scope = parts[3] 

273 broadcast = '::' 

274 else: 

275 version = 4 

276 if parts[2] == 'brd': 

277 broadcast = parts[3] 

278 scope = parts[5] 

279 else: 

280 # sometimes output of 'ip a' might look like: 

281 # inet 192.168.100.100/24 scope global eth0 

282 # and broadcast needs to be calculated from CIDR 

283 broadcast = str(netaddr.IPNetwork(parts[1]).broadcast) 

284 scope = parts[3] 

285 

286 retval.append(dict(cidr=parts[1], 

287 broadcast=broadcast, 

288 scope=scope, 

289 ip_version=version, 

290 dynamic=('dynamic' == parts[-1]))) 

291 return retval 

292 

293 

294class IpRouteCommand(IpDeviceCommandBase): 

295 COMMAND = 'route' 

296 

297 def add_gateway(self, gateway, metric=None): 

298 args = ['replace', 'default', 'via', gateway] 

299 if metric: 299 ↛ 301line 299 didn't jump to line 301 because the condition on line 299 was always true

300 args += ['metric', metric] 

301 args += ['dev', self.name] 

302 self._as_root(*args) 

303 

304 def delete_gateway(self, gateway): 

305 self._as_root('del', 

306 'default', 

307 'via', 

308 gateway, 

309 'dev', 

310 self.name) 

311 

312 def get_gateway(self, scope=None, filters=None): 

313 if filters is None: 313 ↛ 316line 313 didn't jump to line 316 because the condition on line 313 was always true

314 filters = [] 

315 

316 retval = None 

317 

318 if scope: 318 ↛ 319line 318 didn't jump to line 319 because the condition on line 318 was never true

319 filters += ['scope', scope] 

320 

321 route_list_lines = self._run('list', 'dev', self.name, 

322 *filters).split('\n') 

323 default_route_line = next((x.strip() for x in 

324 route_list_lines if 

325 x.strip().startswith('default')), None) 

326 if default_route_line: 

327 gateway_index = 2 

328 parts = default_route_line.split() 

329 retval = dict(gateway=parts[gateway_index]) 

330 metric_index = 4 

331 parts_has_metric = (len(parts) > metric_index) 

332 if parts_has_metric: 

333 retval.update(metric=int(parts[metric_index])) 

334 

335 return retval 

336 

337 def pullup_route(self, interface_name): 

338 """Pullup route entry. 

339 

340 Ensures that the route entry for the interface is before all 

341 others on the same subnet. 

342 """ 

343 device_list = [] 

344 device_route_list_lines = self._run('list', 'proto', 'kernel', 

345 'dev', interface_name).split('\n') 

346 for device_route_line in device_route_list_lines: 

347 try: 

348 subnet = device_route_line.split()[0] 

349 except Exception: 

350 continue 

351 subnet_route_list_lines = self._run( 

352 'list', 'proto', 'kernel', 'exact', subnet).split('\n') 

353 for subnet_route_line in subnet_route_list_lines: 353 ↛ 369line 353 didn't jump to line 369 because the loop on line 353 didn't complete

354 i = iter(subnet_route_line.split()) 

355 while next(i) != 'dev': 

356 pass 

357 device = next(i) 

358 try: 

359 while next(i) != 'src': 

360 pass 

361 src = next(i) 

362 except Exception: 

363 src = '' 

364 if device != interface_name: 

365 device_list.append((device, src)) 

366 else: 

367 break 

368 

369 for (device, src) in device_list: 

370 self._as_root('del', subnet, 'dev', device) 

371 if (src != ''): 371 ↛ 375line 371 didn't jump to line 375 because the condition on line 371 was always true

372 self._as_root('append', subnet, 'proto', 'kernel', 

373 'src', src, 'dev', device) 

374 else: 

375 self._as_root('append', subnet, 'proto', 'kernel', 

376 'dev', device) 

377 

378 def clear_outdated_routes(self, cidr): 

379 """Removes duplicated routes for a certain network CIDR. 

380 

381 Removes all routes related to supplied CIDR except for the one 

382 related to this interface device. 

383 

384 :param cidr: The network CIDR to be cleared. 

385 """ 

386 routes = self.list() 

387 items = [x for x in routes 

388 if x['Destination'] == cidr and x.get('Device') and 

389 x['Device'] != self.name] 

390 for item in items: 

391 self.delete_net_route(item['Destination'], item['Device']) 

392 

393 def list(self): 

394 """List all routes 

395 

396 :return: A dictionary with field 'Destination' and 'Device' for each 

397 route entry. 'Gateway' field is included if route has a gateway. 

398 """ 

399 routes = [] 

400 output = self._as_root('list') 

401 lines = output.split('\n') 

402 for line in lines: 

403 items = line.split() 

404 if len(items) > 0: 

405 item = {'Destination': items[0]} 

406 if len(items) > 1: 406 ↛ 413line 406 didn't jump to line 413 because the condition on line 406 was always true

407 if items[1] == 'via': 

408 item['Gateway'] = items[2] 

409 if len(items) > 3 and items[3] == 'dev': 

410 item['Device'] = items[4] 

411 if items[1] == 'dev': 

412 item['Device'] = items[2] 

413 routes.append(item) 

414 return routes 

415 

416 def delete_net_route(self, cidr, device): 

417 """Deletes a route according to supplied CIDR and interface device. 

418 

419 :param cidr: The network CIDR to be removed. 

420 :param device: The network interface device to be removed. 

421 """ 

422 self._as_root('delete', cidr, 'dev', device) 

423 

424 

425class IpNetnsCommand(IpCommandBase): 

426 COMMAND = 'netns' 

427 

428 def add(self, name): 

429 self._as_root('add', name, use_root_namespace=True) 

430 return IPWrapper(name) 

431 

432 def delete(self, name): 

433 self._as_root('delete', name, use_root_namespace=True) 

434 

435 def execute(self, cmds, addl_env=None, check_exit_code=True): 

436 if addl_env is None: 

437 addl_env = dict() 

438 

439 if not self._parent.namespace: 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true

440 raise Exception(_('No namespace defined for parent')) 

441 else: 

442 env_params = [] 

443 if addl_env: 

444 env_params = (['env'] + ['%s=%s' % pair 

445 for pair in sorted(addl_env.items())]) 

446 total_cmd = (['ip', 'netns', 'exec', self._parent.namespace] + 

447 env_params + list(cmds)) 

448 return utils.execute(*total_cmd, run_as_root=True, 

449 check_exit_code=check_exit_code) 

450 

451 def exists(self, name): 

452 output = self._as_root('list', options='o', use_root_namespace=True) 

453 

454 for line in output.split('\n'): 

455 if name == line.strip(): 

456 return True 

457 return False 

458 

459 

460def device_exists(device_name, namespace=None): 

461 try: 

462 address = IPDevice(device_name, namespace).link.address 

463 except Exception as e: 

464 if 'does not exist' in str(e): 464 ↛ 466line 464 didn't jump to line 466 because the condition on line 464 was always true

465 return False 

466 raise 

467 return bool(address) 

468 

469 

470def iproute_arg_supported(command, arg): 

471 command += ['help'] 

472 stdout, stderr = utils.execute(command, check_exit_code=False, 

473 return_stderr=True) 

474 return any(arg in line for line in stderr.split('\n'))