Coverage for manila/api/urlmap.py: 56%
161 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2011 OpenStack LLC.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import re
17from urllib.request import parse_http_list
19from oslo_utils import netutils
20import paste.urlmap
22from manila.api.openstack import wsgi
25_quoted_string_re = r'"[^"\\]*(?:\\.[^"\\]*)*"'
26_option_header_piece_re = re.compile(
27 r';\s*([^\s;=]+|%s)\s*'
28 r'(?:=\s*([^;]+|%s))?\s*' %
29 (_quoted_string_re, _quoted_string_re))
32def unquote_header_value(value):
33 """Unquotes a header value.
35 This does not use the real unquoting but what browsers are actually
36 using for quoting.
38 :param value: the header value to unquote.
39 """
40 if value and value[0] == value[-1] == '"': 40 ↛ 45line 40 didn't jump to line 45 because the condition on line 40 was never true
41 # this is not the real unquoting, but fixing this so that the
42 # RFC is met will result in bugs with internet explorer and
43 # probably some other browsers as well. IE for example is
44 # uploading files with "C:\foo\bar.txt" as filename
45 value = value[1:-1]
46 return value
49def parse_list_header(value):
50 """Parse lists as described by RFC 2068 Section 2.
52 In particular, parse comma-separated lists where the elements of
53 the list may include quoted-strings. A quoted-string could
54 contain a comma. A non-quoted string could have quotes in the
55 middle. Quotes are removed automatically after parsing.
57 The return value is a standard :class:`list`:
59 >>> parse_list_header('token, "quoted value"')
60 ['token', 'quoted value']
62 :param value: a string with a list header.
63 :return: :class:`list`
64 """
65 result = []
66 for item in parse_http_list(value): 66 ↛ 67line 66 didn't jump to line 67 because the loop on line 66 never started
67 if item[:1] == item[-1:] == '"':
68 item = unquote_header_value(item[1:-1])
69 result.append(item)
70 return result
73def parse_options_header(value):
74 """Parse header into content type and options.
76 Parse a ``Content-Type`` like header into a tuple with the content
77 type and the options:
79 >>> parse_options_header('Content-Type: text/html; mimetype=text/html')
80 ('Content-Type:', {'mimetype': 'text/html'})
82 :param value: the header to parse.
83 :return: (str, options)
84 """
85 def _tokenize(string):
86 for match in _option_header_piece_re.finditer(string):
87 key, value = match.groups()
88 key = unquote_header_value(key)
89 if value is not None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 value = unquote_header_value(value)
91 yield key, value
93 if not value: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true
94 return '', {}
96 parts = _tokenize(';' + value)
97 name = next(parts)[0]
98 extra = dict(parts)
99 return name, extra
102class Accept(object):
103 def __init__(self, value):
104 self._content_types = [parse_options_header(v) for v in
105 parse_list_header(value)]
107 def best_match(self, supported_content_types):
108 # FIXME: Should we have a more sophisticated matching algorithm that
109 # takes into account the version as well?
110 best_quality = -1
111 best_content_type = None
112 best_params = {}
113 best_match = '*/*'
115 for content_type in supported_content_types:
116 for content_mask, params in self._content_types: 116 ↛ 117line 116 didn't jump to line 117 because the loop on line 116 never started
117 try:
118 quality = float(params.get('q', 1))
119 except ValueError:
120 continue
122 if quality < best_quality:
123 continue
124 elif best_quality == quality:
125 if best_match.count('*') <= content_mask.count('*'):
126 continue
128 if self._match_mask(content_mask, content_type):
129 best_quality = quality
130 best_content_type = content_type
131 best_params = params
132 best_match = content_mask
134 return best_content_type, best_params
136 def content_type_params(self, best_content_type):
137 """Find parameters in Accept header for given content type."""
138 for content_type, params in self._content_types:
139 if best_content_type == content_type:
140 return params
142 return {}
144 def _match_mask(self, mask, content_type):
145 if '*' not in mask:
146 return content_type == mask
147 if mask == '*/*':
148 return True
149 mask_major = mask[:-2]
150 content_type_major = content_type.split('/', 1)[0]
151 return content_type_major == mask_major
154def urlmap_factory(loader, global_conf, **local_conf):
155 if 'not_found_app' in local_conf:
156 not_found_app = local_conf.pop('not_found_app')
157 else:
158 not_found_app = global_conf.get('not_found_app')
159 if not_found_app:
160 not_found_app = loader.get_app(not_found_app, global_conf=global_conf)
161 urlmap = URLMap(not_found_app=not_found_app)
162 for path, app_name in local_conf.items():
163 path = paste.urlmap.parse_path_expression(path)
164 app = loader.get_app(app_name, global_conf=global_conf)
165 urlmap[path] = app
166 return urlmap
169class URLMap(paste.urlmap.URLMap):
170 def _match(self, host, port, path_info):
171 """Find longest match for a given URL path."""
172 for (domain, app_url), app in self.applications:
173 if domain and domain != host and domain != host + ':' + port: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true
174 continue
175 if (path_info == app_url or path_info.startswith(app_url + '/')):
176 return app, app_url
178 return None, None
180 def _set_script_name(self, app, app_url):
181 def wrap(environ, start_response):
182 environ['SCRIPT_NAME'] += app_url
183 return app(environ, start_response)
185 return wrap
187 def _munge_path(self, app, path_info, app_url):
188 def wrap(environ, start_response):
189 environ['SCRIPT_NAME'] += app_url
190 environ['PATH_INFO'] = path_info[len(app_url):]
191 return app(environ, start_response)
193 return wrap
195 def _path_strategy(self, host, port, path_info):
196 """Check path suffix for MIME type and path prefix for API version."""
197 mime_type = app = app_url = None
199 parts = path_info.rsplit('.', 1)
200 if len(parts) > 1: 200 ↛ 201line 200 didn't jump to line 201 because the condition on line 200 was never true
201 possible_type = 'application/' + parts[1]
202 if possible_type in wsgi.SUPPORTED_CONTENT_TYPES:
203 mime_type = possible_type
205 parts = path_info.split('/')
206 if len(parts) > 1: 206 ↛ 213line 206 didn't jump to line 213 because the condition on line 206 was always true
207 possible_app, possible_app_url = self._match(host, port, path_info)
208 # Don't use prefix if it ends up matching default
209 if possible_app and possible_app_url:
210 app_url = possible_app_url
211 app = self._munge_path(possible_app, path_info, app_url)
213 return mime_type, app, app_url
215 def _content_type_strategy(self, host, port, environ):
216 """Check Content-Type header for API version."""
217 app = None
218 params = parse_options_header(environ.get('CONTENT_TYPE', ''))[1]
219 if 'version' in params: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 app, app_url = self._match(host, port, '/v' + params['version'])
221 if app:
222 app = self._set_script_name(app, app_url)
224 return app
226 def _accept_strategy(self, host, port, environ, supported_content_types):
227 """Check Accept header for best matching MIME type and API version."""
228 accept = Accept(environ.get('HTTP_ACCEPT', ''))
230 app = None
232 # Find the best match in the Accept header
233 mime_type, params = accept.best_match(supported_content_types)
234 if 'version' in params: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true
235 app, app_url = self._match(host, port, '/v' + params['version'])
236 if app:
237 app = self._set_script_name(app, app_url)
239 return mime_type, app
241 def __call__(self, environ, start_response):
242 host = environ.get('HTTP_HOST', environ.get('SERVER_NAME')).lower()
243 if environ['wsgi.url_scheme'] == 'http': 243 ↛ 246line 243 didn't jump to line 246 because the condition on line 243 was always true
244 default_port = '80'
245 else:
246 default_port = '443'
247 host, port = netutils.parse_host_port(host, default_port)
249 path_info = environ['PATH_INFO']
250 path_info = self.normalize_url(path_info, False)[1]
252 # The API version is determined in one of three ways:
253 # 1) URL path prefix (eg /v1.1/tenant/servers/detail)
254 # 2) Content-Type header (eg application/json;version=1.1)
255 # 3) Accept header (eg application/json;q=0.8;version=1.1)
257 # Manila supports only application/json as MIME type for the responses.
258 supported_content_types = list(wsgi.SUPPORTED_CONTENT_TYPES)
260 mime_type, app, app_url = self._path_strategy(host, port, path_info)
262 if not app:
263 app = self._content_type_strategy(host, port, environ)
265 if not mime_type or not app: 265 ↛ 273line 265 didn't jump to line 273 because the condition on line 265 was always true
266 possible_mime_type, possible_app = self._accept_strategy(
267 host, port, environ, supported_content_types)
268 if possible_mime_type and not mime_type: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true
269 mime_type = possible_mime_type
270 if possible_app and not app: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true
271 app = possible_app
273 if not mime_type: 273 ↛ 276line 273 didn't jump to line 276 because the condition on line 273 was always true
274 mime_type = 'application/json'
276 if not app:
277 # Didn't match a particular version, probably matches default
278 app, app_url = self._match(host, port, path_info)
279 if app: 279 ↛ 280line 279 didn't jump to line 280 because the condition on line 279 was never true
280 app = self._munge_path(app, path_info, app_url)
282 if app:
283 environ['manila.best_content_type'] = mime_type
284 return app(environ, start_response)
286 environ['paste.urlmap_object'] = self
287 return self.not_found_application(environ, start_response)