Coverage for manila/share/drivers/vastdata/rest.py: 98%

173 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2024 VAST Data Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15from abc import ABC 

16import json 

17import pprint 

18import textwrap 

19 

20import cachetools 

21from oslo_log import log as logging 

22from oslo_utils import versionutils 

23from packaging import version as packaging_version 

24import requests 

25 

26from manila import exception 

27from manila.share.drivers.vastdata import driver_util 

28import manila.utils as manila_utils 

29 

30LOG = logging.getLogger(__name__) 

31 

32 

33class Session(requests.Session): 

34 

35 def __init__( 

36 self, 

37 host, 

38 username, 

39 password, 

40 api_token, 

41 ssl_verify, 

42 plugin_version, 

43 ): 

44 super().__init__() 

45 self.base_url = f"https://{host.strip('/')}/api" 

46 self.ssl_verify = ssl_verify 

47 self.username = username 

48 self.password = password 

49 self.token = api_token 

50 self.headers["Accept"] = "application/json" 

51 self.headers["Content-Type"] = "application/json" 

52 self.headers["User-Agent"] = ( 

53 f"manila/v{plugin_version}" 

54 f" ({requests.utils.default_user_agent()})" 

55 ) 

56 if self.token: 

57 LOG.info("VMS session is using API token authentication.") 

58 self.headers["authorization"] = f"Api-Token {self.token}" 

59 else: 

60 # Will be updated on the first request 

61 LOG.info( 

62 "VMS session is using username/password authentication" 

63 " (Bearer token will be acquired)." 

64 ) 

65 self.headers["authorization"] = "Bearer" 

66 

67 if not ssl_verify: 

68 import urllib3 

69 

70 urllib3.disable_warnings() 

71 

72 def refresh_auth_token(self): 

73 try: 

74 resp = super().request( 

75 "POST", 

76 f"{self.base_url}/token/", 

77 verify=self.ssl_verify, 

78 timeout=5, 

79 json={"username": self.username, "password": self.password}, 

80 ) 

81 resp.raise_for_status() 

82 token = resp.json()["access"] 

83 self.headers["authorization"] = f"Bearer {token}" 

84 except ConnectionError as e: 

85 raise exception.VastApiException( 

86 reason=f"The vms on the designated host {self.base_url} " 

87 f"cannot be accessed. Please verify the specified endpoint. " 

88 f"origin error: {e}" 

89 ) 

90 

91 @manila_utils.retry(retry_param=exception.VastApiRetry, retries=3) 

92 def request( 

93 self, verb, api_method, params=None, log_result=True, **kwargs 

94 ): 

95 verb = verb.upper() 

96 api_method = api_method.strip("/") 

97 url = f"{self.base_url}/{api_method}/" 

98 log_pref = f"\n[{verb}] {url}" 

99 

100 if "data" in kwargs: 

101 kwargs["data"] = json.dumps(kwargs["data"]) 

102 

103 if log_result and (params or kwargs): 

104 payload = dict(kwargs, params=params) 

105 formatted_request = textwrap.indent( 

106 pprint.pformat(payload), prefix="| " 

107 ) 

108 LOG.debug(f"{log_pref} >>>:\n{formatted_request}") 

109 else: 

110 LOG.debug(f"{log_pref} >>> (request)") 

111 

112 ret = super().request( 

113 verb, url, verify=self.ssl_verify, params=params, **kwargs 

114 ) 

115 # No refresh for token based auth. Token should be long-lived. 

116 if ret.status_code == 403 and not self.token: 

117 self.refresh_auth_token() 

118 raise exception.VastApiRetry(reason="Token is invalid or expired.") 

119 

120 if ret.status_code in (400, 503) and ret.text: 

121 raise exception.VastApiException(reason=ret.text) 

122 

123 try: 

124 ret.raise_for_status() 

125 except Exception as exc: 

126 raise exception.VastApiException(reason=str(exc)) 

127 

128 ret = ret.json() if ret.content else {} 

129 if ret and log_result: 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 formatted_response = textwrap.indent( 

131 pprint.pformat(ret), prefix="| " 

132 ) 

133 LOG.debug(f"{log_pref} <<<:\n{formatted_response}") 

134 else: 

135 LOG.debug(f"{log_pref} <<< (response)") 

136 return driver_util.Bunch.from_dict(ret) 

137 

138 def __getattr__(self, attr): 

139 if attr.startswith("_"): 

140 raise AttributeError(attr) 

141 

142 def func(**params): 

143 return self.request("get", attr, params=params) 

144 

145 func.__name__ = attr 

146 setattr(self, attr, func) 

147 return func 

148 

149 

150def requisite(semver: str, operation: str = None): 

151 """Use this decorator to indicate the minimum required version cluster 

152 

153 for invoking the API that is being decorated. 

154 Decorator works in two modes: 

155 1. When ignore == False and version mismatch detected then 

156 `OperationNotSupported` exception will be thrown 

157 2. When ignore == True and version mismatch detected then 

158 method decorated method execution never happened 

159 """ 

160 

161 def dec(fn): 

162 

163 def _args_wrapper(self, *args, **kwargs): 

164 

165 version = packaging_version.parse( 

166 self.rest.get_sw_version().replace("-", ".") 

167 ) 

168 sw_version = f"{version.major}.{version.minor}.{version.micro}" 

169 

170 if not versionutils.is_compatible( 

171 semver, sw_version, same_major=False 

172 ): 

173 op = operation or fn.__name__ 

174 raise exception.VastDriverException( 

175 f"Operation {op} is not supported" 

176 f" on VAST version {sw_version}." 

177 f" Required version is {semver}" 

178 ) 

179 return fn(self, *args, **kwargs) 

180 

181 return _args_wrapper 

182 

183 return dec 

184 

185 

186class VastResource(ABC): 

187 resource_name = None 

188 

189 def __init__(self, rest): 

190 self.rest = rest # For intercommunication between resources. 

191 self.session = rest.session 

192 

193 def list(self, **params): 

194 """Get list of entries with optional filtering params""" 

195 return self.session.get(self.resource_name, params=params) 

196 

197 def create(self, **params): 

198 """Create new entry with provided params""" 

199 return self.session.post(self.resource_name, data=params) 

200 

201 def update(self, entry_id, **params): 

202 """Update entry by id with provided params""" 

203 return self.session.patch( 

204 f"{self.resource_name}/{entry_id}", data=params 

205 ) 

206 

207 def delete(self, name): 

208 """Delete entry by name. Skip if entry not found.""" 

209 entry = self.one(name) 

210 if not entry: 

211 resource = self.__class__.__name__.lower() 

212 LOG.warning( 

213 f"{resource} {name} not found on VAST, skipping delete" 

214 ) 

215 return 

216 return self.session.delete(f"{self.resource_name}/{entry.id}") 

217 

218 def one(self, name): 

219 """Get single entry by name. 

220 

221 Raise exception if multiple entries found. 

222 """ 

223 entries = self.list(name=name) 

224 if not entries: 

225 return 

226 if len(entries) > 1: 

227 resource = self.__class__.__name__.lower() + "s" 

228 raise exception.VastDriverException( 

229 reason=f"Too many {resource} found with name {name}" 

230 ) 

231 return entries[0] 

232 

233 def ensure(self, name, **params): 

234 entry = self.one(name) 

235 if not entry: 

236 entry = self.create(name=name, **params) 

237 return entry 

238 

239 

240class View(VastResource): 

241 resource_name = "views" 

242 

243 def create(self, name, path, policy_id): 

244 data = dict( 

245 name=name, 

246 path=path, 

247 policy_id=policy_id, 

248 create_dir=True, 

249 protocols=["NFS"], 

250 ) 

251 return super().create(**data) 

252 

253 

254class ViewPolicy(VastResource): 

255 resource_name = "viewpolicies" 

256 

257 

258class CapacityMetrics(VastResource): 

259 

260 def get(self, metrics, object_type="cluster", time_frame="1m"): 

261 """Get capacity metrics for the cluster""" 

262 params = dict( 

263 prop_list=metrics, 

264 object_type=object_type, time_frame=time_frame 

265 ) 

266 ret = self.session.get("monitors/ad_hoc_query", params=params) 

267 last_sample = ret.data[-1] 

268 return driver_util.Bunch( 

269 { 

270 name.partition(",")[-1]: value 

271 for name, value in zip(ret.prop_list, last_sample) 

272 } 

273 ) 

274 

275 

276class Quota(VastResource): 

277 resource_name = "quotas" 

278 

279 

280class VipPool(VastResource): 

281 resource_name = "vippools" 

282 

283 def vips(self, pool_name): 

284 """Get list of ip addresses from vip pool""" 

285 vippool = self.one(name=pool_name) 

286 if not vippool: 

287 raise exception.VastDriverException( 

288 reason=f"No vip pool found with name {pool_name}" 

289 ) 

290 vips = driver_util.generate_ip_range(vippool.ip_ranges) 

291 if not vips: 

292 raise exception.VastDriverException( 

293 reason=f"Pool {pool_name} has no available vips" 

294 ) 

295 return vips 

296 

297 

298class Snapshots(VastResource): 

299 resource_name = "snapshots" 

300 

301 

302class Folders(VastResource): 

303 resource_name = "folders" 

304 

305 @requisite(semver="4.7.0") 

306 def delete(self, path): 

307 try: 

308 self.session.delete( 

309 f"{self.resource_name}/delete_folder/", data=dict(path=path) 

310 ) 

311 except exception.VastApiException as e: 

312 exc_msg = str(e) 

313 if "no such directory" in exc_msg: 

314 LOG.debug(f"remote directory " 

315 f"might have been removed earlier. ({e})") 

316 elif "trash folder disabled" in exc_msg: 316 ↛ 323line 316 didn't jump to line 323 because the condition on line 316 was always true

317 raise exception.VastDriverException( 

318 reason="Trash Folder Access is disabled" 

319 " (see Settings/Cluster/Features in VMS)" 

320 ) 

321 else: 

322 # unpredictable error 

323 raise 

324 

325 

326class RestApi: 

327 

328 def __init__( 

329 self, 

330 host, 

331 username, 

332 password, 

333 api_token, 

334 ssl_verify, 

335 plugin_version, 

336 ): 

337 self.session = Session( 

338 host=host, 

339 username=username, 

340 password=password, 

341 api_token=api_token, 

342 ssl_verify=ssl_verify, 

343 plugin_version=plugin_version, 

344 ) 

345 self.views = View(self) 

346 self.view_policies = ViewPolicy(self) 

347 self.capacity_metrics = CapacityMetrics(self) 

348 self.quotas = Quota(self) 

349 self.vip_pools = VipPool(self) 

350 self.snapshots = Snapshots(self) 

351 self.folders = Folders(self) 

352 

353 # Refresh auth token to avoid initial "forbidden" status error. 

354 self.session.refresh_auth_token() 

355 

356 @cachetools.cached(cache=cachetools.TTLCache(ttl=60 * 60, maxsize=1)) 

357 def get_sw_version(self): 

358 """Software version of cluster Rest API interacts with""" 

359 return self.session.versions(status="success")[0].sys_version