Coverage for manila/share/drivers/vastdata/driver_util.py: 98%

106 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2024 VAST Data Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15import ipaddress 

16import types 

17 

18from oslo_config import cfg 

19from oslo_log import log 

20from oslo_utils import timeutils 

21 

22 

23CONF = cfg.CONF 

24LOG = log.getLogger(__name__) 

25 

26 

27class Bunch(dict): 

28 # from https://github.com/real-easypy/easypy 

29 

30 __slots__ = ("__stop_recursing__",) 

31 

32 def __getattr__(self, name): 

33 try: 

34 return self[name] 

35 except KeyError: 

36 if name[0] == "_" and name[1:].isdigit(): 

37 return self[name[1:]] 

38 raise AttributeError( 

39 "%s has no attribute %r" % (self.__class__, name) 

40 ) 

41 

42 def __getitem__(self, key): 

43 try: 

44 return super(Bunch, self).__getitem__(key) 

45 except KeyError: 

46 from numbers import Integral 

47 

48 if isinstance(key, Integral): 

49 return self[str(key)] 

50 raise 

51 

52 def __setattr__(self, name, value): 

53 self[name] = value 

54 

55 def __delattr__(self, name): 

56 try: 

57 del self[name] 

58 except KeyError: 

59 raise AttributeError( 

60 "%s has no attribute %r" % (self.__class__, name) 

61 ) 

62 

63 def __getstate__(self): 

64 return self 

65 

66 def __setstate__(self, dict): 

67 self.update(dict) 

68 

69 def __repr__(self): 

70 if getattr(self, "__stop_recursing__", False): 

71 items = sorted( 

72 "%s" % k for k in self 

73 if isinstance(k, str) and not k.startswith("__") 

74 ) 

75 attrs = ", ".join(items) 

76 else: 

77 dict.__setattr__(self, "__stop_recursing__", True) 

78 try: 

79 attrs = self.render() 

80 finally: 

81 dict.__delattr__(self, "__stop_recursing__") 

82 return "%s(%s)" % (self.__class__.__name__, attrs) 

83 

84 def render(self): 

85 items = sorted( 

86 "%s=%r" % (k, v) 

87 for k, v in self.items() 

88 if isinstance(k, str) and not k.startswith("__") 

89 ) 

90 return ", ".join(items) 

91 

92 def to_dict(self): 

93 return unbunchify(self) 

94 

95 def to_json(self): 

96 import json 

97 

98 return json.dumps(self.to_dict()) 

99 

100 def copy(self, deep=False): 

101 if deep: 

102 return _convert(self, self.__class__) 

103 else: 

104 return self.__class__(self) 

105 

106 @classmethod 

107 def from_dict(cls, d): 

108 return _convert(d, cls) 

109 

110 @classmethod 

111 def from_json(cls, d): 

112 import json 

113 

114 return cls.from_dict(json.loads(d)) 

115 

116 def __dir__(self): 

117 members = set( 

118 k 

119 for k in self 

120 if isinstance(k, str) 

121 and (k[0] == "_" or k.replace("_", "").isalnum()) 

122 ) 

123 members.update(dict.__dir__(self)) 

124 return sorted(members) 

125 

126 def without(self, *keys): 

127 "Return a shallow copy of the bunch without the specified keys" 

128 return Bunch((k, v) for k, v in self.items() if k not in keys) 

129 

130 def but_with(self, **kw): 

131 "Return a shallow copy of the bunch with the specified keys" 

132 return Bunch(self, **kw) 

133 

134 

135def _convert(d, typ): 

136 if isinstance(d, dict): 

137 return typ({str(k): _convert(v, typ) for k, v in d.items()}) 

138 elif isinstance(d, (tuple, list, set)): 

139 return type(d)(_convert(e, typ) for e in d) 

140 else: 

141 return d 

142 

143 

144def unbunchify(d): 

145 """Recursively convert Bunches in `d` to a regular dicts.""" 

146 return _convert(d, dict) 

147 

148 

149def bunchify(d=None, **kw): 

150 """Recursively convert dicts in `d` to Bunches. 

151 

152 If `kw` given, recursively convert dicts in 

153 it to Bunches and update `d` with it. 

154 If `d` is None, an empty Bunch is made. 

155 """ 

156 

157 d = _convert(d, Bunch) if d is not None else Bunch() 

158 if kw: 

159 d.update(bunchify(kw)) 

160 return d 

161 

162 

163def generate_ip_range(ip_ranges): 

164 """Generate list of ips from provided ip ranges. 

165 

166 `ip_ranges` should be list of ranges where fist 

167 ip in range represents start ip and second is end ip 

168 eg: [["15.0.0.1", "15.0.0.4"], ["10.0.0.27", "10.0.0.30"]] 

169 """ 

170 return [ 

171 ip.compressed 

172 for start_ip, end_ip in ip_ranges 

173 for net in ipaddress.summarize_address_range( 

174 ipaddress.ip_address(start_ip), 

175 ipaddress.ip_address(end_ip) 

176 ) 

177 for ip in net 

178 ] 

179 

180 

181def decorate_methods_with(dec): 

182 if not CONF.debug: 

183 return lambda cls: cls 

184 

185 def inner(cls): 

186 for attr_name, attr_val in cls.__dict__.items(): 

187 if (isinstance(attr_val, types.FunctionType) and 

188 not attr_name.startswith("_")): 

189 setattr(cls, attr_name, dec(attr_val)) 

190 return cls 

191 

192 return inner 

193 

194 

195def verbose_driver_trace(fn): 

196 if not CONF.debug: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true

197 return fn 

198 

199 def inner(self, *args, **kwargs): 

200 start = timeutils.utcnow() 

201 LOG.debug(f"[{fn.__name__}] >>>") 

202 res = fn(self, *args, **kwargs) 

203 end = timeutils.utcnow() 

204 LOG.debug( 

205 f"Spent {timeutils.delta_seconds(start, end)} sec. " 

206 f"Return {res}.\n" 

207 f"<<< [{fn.__name__}]" 

208 ) 

209 return res 

210 

211 return inner