Coverage for manila/api/urlmap.py: 56%

161 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2011 OpenStack LLC. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import re 

17from urllib.request import parse_http_list 

18 

19from oslo_utils import netutils 

20import paste.urlmap 

21 

22from manila.api.openstack import wsgi 

23 

24 

25_quoted_string_re = r'"[^"\\]*(?:\\.[^"\\]*)*"' 

26_option_header_piece_re = re.compile( 

27 r';\s*([^\s;=]+|%s)\s*' 

28 r'(?:=\s*([^;]+|%s))?\s*' % 

29 (_quoted_string_re, _quoted_string_re)) 

30 

31 

32def unquote_header_value(value): 

33 """Unquotes a header value. 

34 

35 This does not use the real unquoting but what browsers are actually 

36 using for quoting. 

37 

38 :param value: the header value to unquote. 

39 """ 

40 if value and value[0] == value[-1] == '"': 40 ↛ 45line 40 didn't jump to line 45 because the condition on line 40 was never true

41 # this is not the real unquoting, but fixing this so that the 

42 # RFC is met will result in bugs with internet explorer and 

43 # probably some other browsers as well. IE for example is 

44 # uploading files with "C:\foo\bar.txt" as filename 

45 value = value[1:-1] 

46 return value 

47 

48 

49def parse_list_header(value): 

50 """Parse lists as described by RFC 2068 Section 2. 

51 

52 In particular, parse comma-separated lists where the elements of 

53 the list may include quoted-strings. A quoted-string could 

54 contain a comma. A non-quoted string could have quotes in the 

55 middle. Quotes are removed automatically after parsing. 

56 

57 The return value is a standard :class:`list`: 

58 

59 >>> parse_list_header('token, "quoted value"') 

60 ['token', 'quoted value'] 

61 

62 :param value: a string with a list header. 

63 :return: :class:`list` 

64 """ 

65 result = [] 

66 for item in parse_http_list(value): 66 ↛ 67line 66 didn't jump to line 67 because the loop on line 66 never started

67 if item[:1] == item[-1:] == '"': 

68 item = unquote_header_value(item[1:-1]) 

69 result.append(item) 

70 return result 

71 

72 

73def parse_options_header(value): 

74 """Parse header into content type and options. 

75 

76 Parse a ``Content-Type`` like header into a tuple with the content 

77 type and the options: 

78 

79 >>> parse_options_header('Content-Type: text/html; mimetype=text/html') 

80 ('Content-Type:', {'mimetype': 'text/html'}) 

81 

82 :param value: the header to parse. 

83 :return: (str, options) 

84 """ 

85 def _tokenize(string): 

86 for match in _option_header_piece_re.finditer(string): 

87 key, value = match.groups() 

88 key = unquote_header_value(key) 

89 if value is not None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 value = unquote_header_value(value) 

91 yield key, value 

92 

93 if not value: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true

94 return '', {} 

95 

96 parts = _tokenize(';' + value) 

97 name = next(parts)[0] 

98 extra = dict(parts) 

99 return name, extra 

100 

101 

102class Accept(object): 

103 def __init__(self, value): 

104 self._content_types = [parse_options_header(v) for v in 

105 parse_list_header(value)] 

106 

107 def best_match(self, supported_content_types): 

108 # FIXME: Should we have a more sophisticated matching algorithm that 

109 # takes into account the version as well? 

110 best_quality = -1 

111 best_content_type = None 

112 best_params = {} 

113 best_match = '*/*' 

114 

115 for content_type in supported_content_types: 

116 for content_mask, params in self._content_types: 116 ↛ 117line 116 didn't jump to line 117 because the loop on line 116 never started

117 try: 

118 quality = float(params.get('q', 1)) 

119 except ValueError: 

120 continue 

121 

122 if quality < best_quality: 

123 continue 

124 elif best_quality == quality: 

125 if best_match.count('*') <= content_mask.count('*'): 

126 continue 

127 

128 if self._match_mask(content_mask, content_type): 

129 best_quality = quality 

130 best_content_type = content_type 

131 best_params = params 

132 best_match = content_mask 

133 

134 return best_content_type, best_params 

135 

136 def content_type_params(self, best_content_type): 

137 """Find parameters in Accept header for given content type.""" 

138 for content_type, params in self._content_types: 

139 if best_content_type == content_type: 

140 return params 

141 

142 return {} 

143 

144 def _match_mask(self, mask, content_type): 

145 if '*' not in mask: 

146 return content_type == mask 

147 if mask == '*/*': 

148 return True 

149 mask_major = mask[:-2] 

150 content_type_major = content_type.split('/', 1)[0] 

151 return content_type_major == mask_major 

152 

153 

154def urlmap_factory(loader, global_conf, **local_conf): 

155 if 'not_found_app' in local_conf: 

156 not_found_app = local_conf.pop('not_found_app') 

157 else: 

158 not_found_app = global_conf.get('not_found_app') 

159 if not_found_app: 

160 not_found_app = loader.get_app(not_found_app, global_conf=global_conf) 

161 urlmap = URLMap(not_found_app=not_found_app) 

162 for path, app_name in local_conf.items(): 

163 path = paste.urlmap.parse_path_expression(path) 

164 app = loader.get_app(app_name, global_conf=global_conf) 

165 urlmap[path] = app 

166 return urlmap 

167 

168 

169class URLMap(paste.urlmap.URLMap): 

170 def _match(self, host, port, path_info): 

171 """Find longest match for a given URL path.""" 

172 for (domain, app_url), app in self.applications: 

173 if domain and domain != host and domain != host + ':' + port: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true

174 continue 

175 if (path_info == app_url or path_info.startswith(app_url + '/')): 

176 return app, app_url 

177 

178 return None, None 

179 

180 def _set_script_name(self, app, app_url): 

181 def wrap(environ, start_response): 

182 environ['SCRIPT_NAME'] += app_url 

183 return app(environ, start_response) 

184 

185 return wrap 

186 

187 def _munge_path(self, app, path_info, app_url): 

188 def wrap(environ, start_response): 

189 environ['SCRIPT_NAME'] += app_url 

190 environ['PATH_INFO'] = path_info[len(app_url):] 

191 return app(environ, start_response) 

192 

193 return wrap 

194 

195 def _path_strategy(self, host, port, path_info): 

196 """Check path suffix for MIME type and path prefix for API version.""" 

197 mime_type = app = app_url = None 

198 

199 parts = path_info.rsplit('.', 1) 

200 if len(parts) > 1: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true

201 possible_type = 'application/' + parts[1] 

202 if possible_type in wsgi.SUPPORTED_CONTENT_TYPES: 

203 mime_type = possible_type 

204 

205 parts = path_info.split('/') 

206 if len(parts) > 1: 206 ↛ 213line 206 didn't jump to line 213 because the condition on line 206 was always true

207 possible_app, possible_app_url = self._match(host, port, path_info) 

208 # Don't use prefix if it ends up matching default 

209 if possible_app and possible_app_url: 

210 app_url = possible_app_url 

211 app = self._munge_path(possible_app, path_info, app_url) 

212 

213 return mime_type, app, app_url 

214 

215 def _content_type_strategy(self, host, port, environ): 

216 """Check Content-Type header for API version.""" 

217 app = None 

218 params = parse_options_header(environ.get('CONTENT_TYPE', ''))[1] 

219 if 'version' in params: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 app, app_url = self._match(host, port, '/v' + params['version']) 

221 if app: 

222 app = self._set_script_name(app, app_url) 

223 

224 return app 

225 

226 def _accept_strategy(self, host, port, environ, supported_content_types): 

227 """Check Accept header for best matching MIME type and API version.""" 

228 accept = Accept(environ.get('HTTP_ACCEPT', '')) 

229 

230 app = None 

231 

232 # Find the best match in the Accept header 

233 mime_type, params = accept.best_match(supported_content_types) 

234 if 'version' in params: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true

235 app, app_url = self._match(host, port, '/v' + params['version']) 

236 if app: 

237 app = self._set_script_name(app, app_url) 

238 

239 return mime_type, app 

240 

241 def __call__(self, environ, start_response): 

242 host = environ.get('HTTP_HOST', environ.get('SERVER_NAME')).lower() 

243 if environ['wsgi.url_scheme'] == 'http': 243 ↛ 246line 243 didn't jump to line 246 because the condition on line 243 was always true

244 default_port = '80' 

245 else: 

246 default_port = '443' 

247 host, port = netutils.parse_host_port(host, default_port) 

248 

249 path_info = environ['PATH_INFO'] 

250 path_info = self.normalize_url(path_info, False)[1] 

251 

252 # The API version is determined in one of three ways: 

253 # 1) URL path prefix (eg /v1.1/tenant/servers/detail) 

254 # 2) Content-Type header (eg application/json;version=1.1) 

255 # 3) Accept header (eg application/json;q=0.8;version=1.1) 

256 

257 # Manila supports only application/json as MIME type for the responses. 

258 supported_content_types = list(wsgi.SUPPORTED_CONTENT_TYPES) 

259 

260 mime_type, app, app_url = self._path_strategy(host, port, path_info) 

261 

262 if not app: 

263 app = self._content_type_strategy(host, port, environ) 

264 

265 if not mime_type or not app: 265 ↛ 273line 265 didn't jump to line 273 because the condition on line 265 was always true

266 possible_mime_type, possible_app = self._accept_strategy( 

267 host, port, environ, supported_content_types) 

268 if possible_mime_type and not mime_type: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true

269 mime_type = possible_mime_type 

270 if possible_app and not app: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true

271 app = possible_app 

272 

273 if not mime_type: 273 ↛ 276line 273 didn't jump to line 276 because the condition on line 273 was always true

274 mime_type = 'application/json' 

275 

276 if not app: 

277 # Didn't match a particular version, probably matches default 

278 app, app_url = self._match(host, port, path_info) 

279 if app: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true

280 app = self._munge_path(app, path_info, app_url) 

281 

282 if app: 

283 environ['manila.best_content_type'] = mime_type 

284 return app(environ, start_response) 

285 

286 environ['paste.urlmap_object'] = self 

287 return self.not_found_application(environ, start_response)