Coverage for manila/share/drivers/vastdata/driver_util.py: 98%
106 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2024 VAST Data Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15import ipaddress
16import types
18from oslo_config import cfg
19from oslo_log import log
20from oslo_utils import timeutils
23CONF = cfg.CONF
24LOG = log.getLogger(__name__)
27class Bunch(dict):
28 # from https://github.com/real-easypy/easypy
30 __slots__ = ("__stop_recursing__",)
32 def __getattr__(self, name):
33 try:
34 return self[name]
35 except KeyError:
36 if name[0] == "_" and name[1:].isdigit():
37 return self[name[1:]]
38 raise AttributeError(
39 "%s has no attribute %r" % (self.__class__, name)
40 )
42 def __getitem__(self, key):
43 try:
44 return super(Bunch, self).__getitem__(key)
45 except KeyError:
46 from numbers import Integral
48 if isinstance(key, Integral):
49 return self[str(key)]
50 raise
52 def __setattr__(self, name, value):
53 self[name] = value
55 def __delattr__(self, name):
56 try:
57 del self[name]
58 except KeyError:
59 raise AttributeError(
60 "%s has no attribute %r" % (self.__class__, name)
61 )
63 def __getstate__(self):
64 return self
66 def __setstate__(self, dict):
67 self.update(dict)
69 def __repr__(self):
70 if getattr(self, "__stop_recursing__", False):
71 items = sorted(
72 "%s" % k for k in self
73 if isinstance(k, str) and not k.startswith("__")
74 )
75 attrs = ", ".join(items)
76 else:
77 dict.__setattr__(self, "__stop_recursing__", True)
78 try:
79 attrs = self.render()
80 finally:
81 dict.__delattr__(self, "__stop_recursing__")
82 return "%s(%s)" % (self.__class__.__name__, attrs)
84 def render(self):
85 items = sorted(
86 "%s=%r" % (k, v)
87 for k, v in self.items()
88 if isinstance(k, str) and not k.startswith("__")
89 )
90 return ", ".join(items)
92 def to_dict(self):
93 return unbunchify(self)
95 def to_json(self):
96 import json
98 return json.dumps(self.to_dict())
100 def copy(self, deep=False):
101 if deep:
102 return _convert(self, self.__class__)
103 else:
104 return self.__class__(self)
106 @classmethod
107 def from_dict(cls, d):
108 return _convert(d, cls)
110 @classmethod
111 def from_json(cls, d):
112 import json
114 return cls.from_dict(json.loads(d))
116 def __dir__(self):
117 members = set(
118 k
119 for k in self
120 if isinstance(k, str)
121 and (k[0] == "_" or k.replace("_", "").isalnum())
122 )
123 members.update(dict.__dir__(self))
124 return sorted(members)
126 def without(self, *keys):
127 "Return a shallow copy of the bunch without the specified keys"
128 return Bunch((k, v) for k, v in self.items() if k not in keys)
130 def but_with(self, **kw):
131 "Return a shallow copy of the bunch with the specified keys"
132 return Bunch(self, **kw)
135def _convert(d, typ):
136 if isinstance(d, dict):
137 return typ({str(k): _convert(v, typ) for k, v in d.items()})
138 elif isinstance(d, (tuple, list, set)):
139 return type(d)(_convert(e, typ) for e in d)
140 else:
141 return d
144def unbunchify(d):
145 """Recursively convert Bunches in `d` to a regular dicts."""
146 return _convert(d, dict)
149def bunchify(d=None, **kw):
150 """Recursively convert dicts in `d` to Bunches.
152 If `kw` given, recursively convert dicts in
153 it to Bunches and update `d` with it.
154 If `d` is None, an empty Bunch is made.
155 """
157 d = _convert(d, Bunch) if d is not None else Bunch()
158 if kw:
159 d.update(bunchify(kw))
160 return d
163def generate_ip_range(ip_ranges):
164 """Generate list of ips from provided ip ranges.
166 `ip_ranges` should be list of ranges where fist
167 ip in range represents start ip and second is end ip
168 eg: [["15.0.0.1", "15.0.0.4"], ["10.0.0.27", "10.0.0.30"]]
169 """
170 return [
171 ip.compressed
172 for start_ip, end_ip in ip_ranges
173 for net in ipaddress.summarize_address_range(
174 ipaddress.ip_address(start_ip),
175 ipaddress.ip_address(end_ip)
176 )
177 for ip in net
178 ]
181def decorate_methods_with(dec):
182 if not CONF.debug:
183 return lambda cls: cls
185 def inner(cls):
186 for attr_name, attr_val in cls.__dict__.items():
187 if (isinstance(attr_val, types.FunctionType) and
188 not attr_name.startswith("_")):
189 setattr(cls, attr_name, dec(attr_val))
190 return cls
192 return inner
195def verbose_driver_trace(fn):
196 if not CONF.debug: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true
197 return fn
199 def inner(self, *args, **kwargs):
200 start = timeutils.utcnow()
201 LOG.debug(f"[{fn.__name__}] >>>")
202 res = fn(self, *args, **kwargs)
203 end = timeutils.utcnow()
204 LOG.debug(
205 f"Spent {timeutils.delta_seconds(start, end)} sec. "
206 f"Return {res}.\n"
207 f"<<< [{fn.__name__}]"
208 )
209 return res
211 return inner