Coverage for manila/tests/api/openstack/test_wsgi.py: 93%

697 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Licensed under the Apache License, Version 2.0 (the "License"); you may 

2# not use this file except in compliance with the License. You may obtain 

3# a copy of the License at 

4# 

5# http://www.apache.org/licenses/LICENSE-2.0 

6# 

7# Unless required by applicable law or agreed to in writing, software 

8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

10# License for the specific language governing permissions and limitations 

11# under the License. 

12 

13import inspect 

14from unittest import mock 

15 

16import ddt 

17import webob 

18 

19from manila.api.openstack import api_version_request as api_version 

20from manila.api.openstack import wsgi 

21from manila import context 

22from manila import exception 

23from manila import policy 

24from manila import test 

25from manila.tests.api import fakes 

26 

27 

28@ddt.ddt 

29class RequestTest(test.TestCase): 

30 def test_content_type_missing(self): 

31 request = wsgi.Request.blank('/tests/123', method='POST') 

32 request.body = "<body />".encode("utf-8") 

33 self.assertIsNone(request.get_content_type()) 

34 

35 def test_content_type_unsupported(self): 

36 request = wsgi.Request.blank('/tests/123', method='POST') 

37 request.headers["Content-Type"] = "text/html" 

38 request.body = "asdf<br />".encode("utf-8") 

39 self.assertRaises(exception.InvalidContentType, 

40 request.get_content_type) 

41 

42 def test_content_type_with_charset(self): 

43 request = wsgi.Request.blank('/tests/123') 

44 request.headers["Content-Type"] = "application/json; charset=UTF-8" 

45 result = request.get_content_type() 

46 self.assertEqual("application/json", result) 

47 

48 def test_content_type_from_accept(self): 

49 content_type = 'application/json' 

50 request = wsgi.Request.blank('/tests/123') 

51 request.headers["Accept"] = content_type 

52 

53 result = request.best_match_content_type() 

54 

55 self.assertEqual(content_type, result) 

56 

57 def test_content_type_from_accept_best(self): 

58 request = wsgi.Request.blank('/tests/123') 

59 request.headers["Accept"] = "application/xml, application/json" 

60 result = request.best_match_content_type() 

61 self.assertEqual("application/json", result) 

62 

63 request = wsgi.Request.blank('/tests/123') 

64 request.headers["Accept"] = ("application/json; q=0.3, " 

65 "application/xml; q=0.9") 

66 result = request.best_match_content_type() 

67 self.assertEqual("application/json", result) 

68 

69 def test_content_type_from_query_extension(self): 

70 request = wsgi.Request.blank('/tests/123.json') 

71 result = request.best_match_content_type() 

72 self.assertEqual("application/json", result) 

73 

74 request = wsgi.Request.blank('/tests/123.invalid') 

75 result = request.best_match_content_type() 

76 self.assertEqual("application/json", result) 

77 

78 def test_content_type_accept_default(self): 

79 request = wsgi.Request.blank('/tests/123.unsupported') 

80 request.headers["Accept"] = "application/unsupported1" 

81 result = request.best_match_content_type() 

82 self.assertEqual("application/json", result) 

83 

84 def test_cache_and_retrieve_resources(self): 

85 request = wsgi.Request.blank('/foo') 

86 # Test that trying to retrieve a cached object on 

87 # an empty cache fails gracefully 

88 self.assertIsNone(request.cached_resource()) 

89 self.assertIsNone(request.cached_resource_by_id('r-0')) 

90 

91 resources = [{'id': 'r-%s' % x} for x in range(3)] 

92 

93 # Cache an empty list of resources using the default name 

94 request.cache_resource([]) 

95 self.assertEqual({}, request.cached_resource()) 

96 self.assertIsNone(request.cached_resource('r-0')) 

97 # Cache some resources 

98 request.cache_resource(resources[:2]) 

99 # Cache one resource 

100 request.cache_resource(resources[2]) 

101 # Cache a different resource name 

102 other_resource = {'id': 'o-0'} 

103 request.cache_resource(other_resource, name='other-resource') 

104 

105 self.assertEqual(resources[0], request.cached_resource_by_id('r-0')) 

106 self.assertEqual(resources[1], request.cached_resource_by_id('r-1')) 

107 self.assertEqual(resources[2], request.cached_resource_by_id('r-2')) 

108 self.assertIsNone(request.cached_resource_by_id('r-3')) 

109 self.assertEqual( 

110 {'r-0': resources[0], 'r-1': resources[1], 'r-2': resources[2]}, 

111 request.cached_resource()) 

112 self.assertEqual( 

113 other_resource, 

114 request.cached_resource_by_id('o-0', name='other-resource')) 

115 

116 @ddt.data( 

117 'share_type', 

118 ) 

119 def test_cache_and_retrieve_resources_by_resource(self, resource_name): 

120 cache_all_func = 'cache_db_%ss' % resource_name 

121 cache_one_func = 'cache_db_%s' % resource_name 

122 get_db_all_func = 'get_db_%ss' % resource_name 

123 get_db_one_func = 'get_db_%s' % resource_name 

124 

125 r = wsgi.Request.blank('/foo') 

126 amount = 5 

127 res_range = range(amount) 

128 resources = [{'id': 'id%s' % x} for x in res_range] 

129 

130 # Store 2 

131 getattr(r, cache_all_func)(resources[:amount - 1]) 

132 # Store 1 

133 getattr(r, cache_one_func)(resources[amount - 1]) 

134 

135 for i in res_range: 

136 self.assertEqual( 

137 resources[i], 

138 getattr(r, get_db_one_func)('id%s' % i), 

139 ) 

140 self.assertIsNone(getattr(r, get_db_one_func)('id%s' % amount)) 

141 self.assertEqual( 

142 {'id%s' % i: resources[i] for i in res_range}, 

143 getattr(r, get_db_all_func)()) 

144 

145 def test_set_api_version_request_exception(self): 

146 min_version = api_version.APIVersionRequest('2.0') 

147 max_version = api_version.APIVersionRequest('2.45') 

148 self.mock_object(api_version, 'max_api_version', 

149 mock.Mock(return_value=max_version)) 

150 self.mock_object(api_version, 'min_api_version', 

151 mock.Mock(return_value=min_version)) 

152 headers = {'X-OpenStack-Manila-API-Version': '2.51'} 

153 request = wsgi.Request.blank( 

154 'https://openstack.acme.com/v2/shares', method='GET', 

155 headers=headers, script_name='/v2/shares') 

156 

157 self.assertRaises(exception.InvalidGlobalAPIVersion, 

158 request.set_api_version_request) 

159 self.assertEqual(api_version.APIVersionRequest('2.51'), 

160 request.api_version_request) 

161 

162 @ddt.data('', '/share', '/v1', '/v2/shares', '/v1.1/', '/share/v1', 

163 '/shared-file-sytems/v2', '/share/v3.5/share-replicas', 

164 '/shared-file-sytems/v2/shares/xyzzy/action') 

165 def test_set_api_version_request(self, resource): 

166 min_version = api_version.APIVersionRequest('2.0') 

167 max_version = api_version.APIVersionRequest('3.0') 

168 self.mock_object(api_version, 'max_api_version', 

169 mock.Mock(return_value=max_version)) 

170 self.mock_object(api_version, 'min_api_version', 

171 mock.Mock(return_value=min_version)) 

172 request = wsgi.Request.blank( 

173 'https://openstack.acme.com%s' % resource, method='GET', 

174 headers={'X-OpenStack-Manila-API-Version': '2.117'}, 

175 script_name=resource) 

176 

177 self.assertIsNone(request.set_api_version_request()) 

178 

179 if not resource or not ('/v1' in resource or '/v2' in resource): 

180 self.assertEqual(api_version.APIVersionRequest(), 

181 request.api_version_request) 

182 elif 'v1' in resource: 

183 self.assertEqual(api_version.APIVersionRequest('1.0'), 

184 request.api_version_request) 

185 else: 

186 self.assertEqual(api_version.APIVersionRequest('2.117'), 

187 request.api_version_request) 

188 

189 def test_set_api_version_request_no_version_header(self): 

190 min_version = api_version.APIVersionRequest('2.0') 

191 max_version = api_version.APIVersionRequest('2.45') 

192 self.mock_object(api_version, 'max_api_version', 

193 mock.Mock(return_value=max_version)) 

194 self.mock_object(api_version, 'min_api_version', 

195 mock.Mock(return_value=min_version)) 

196 headers = {} 

197 request = wsgi.Request.blank( 

198 'https://openstack.acme.com/v2/shares', method='GET', 

199 headers=headers, script_name='/v2/shares') 

200 

201 self.assertIsNone(request.set_api_version_request()) 

202 

203 self.assertEqual(api_version.APIVersionRequest('2.0'), 

204 request.api_version_request) 

205 

206 @ddt.data(None, 'true', 'false') 

207 def test_set_api_version_request_experimental_header(self, experimental): 

208 min_version = api_version.APIVersionRequest('2.0') 

209 max_version = api_version.APIVersionRequest('2.45') 

210 self.mock_object(api_version, 'max_api_version', 

211 mock.Mock(return_value=max_version)) 

212 self.mock_object(api_version, 'min_api_version', 

213 mock.Mock(return_value=min_version)) 

214 headers = {'X-OpenStack-Manila-API-Version': '2.38'} 

215 if experimental: 

216 headers['X-OpenStack-Manila-API-Experimental'] = experimental 

217 request = wsgi.Request.blank( 

218 'https://openstack.acme.com/v2/shares', method='GET', 

219 headers=headers, script_name='/v2/shares') 

220 

221 self.assertIsNone(request.set_api_version_request()) 

222 

223 self.assertEqual(request.api_version_request, 

224 api_version.APIVersionRequest('2.38')) 

225 

226 expected_experimental = experimental == 'true' or False 

227 self.assertEqual(expected_experimental, 

228 request.api_version_request.experimental) 

229 

230 

231class ActionDispatcherTest(test.TestCase): 

232 def test_dispatch(self): 

233 serializer = wsgi.ActionDispatcher() 

234 serializer.create = lambda x: 'pants' 

235 self.assertEqual('pants', serializer.dispatch({}, action='create')) 

236 

237 def test_dispatch_action_None(self): 

238 serializer = wsgi.ActionDispatcher() 

239 serializer.create = lambda x: 'pants' 

240 serializer.default = lambda x: 'trousers' 

241 self.assertEqual('trousers', serializer.dispatch({}, action=None)) 

242 

243 def test_dispatch_default(self): 

244 serializer = wsgi.ActionDispatcher() 

245 serializer.create = lambda x: 'pants' 

246 serializer.default = lambda x: 'trousers' 

247 self.assertEqual('trousers', serializer.dispatch({}, action='update')) 

248 

249 

250class DictSerializerTest(test.TestCase): 

251 def test_dispatch_default(self): 

252 serializer = wsgi.DictSerializer() 

253 self.assertEqual('', serializer.serialize({}, 'update')) 

254 

255 

256class JSONDictSerializerTest(test.TestCase): 

257 def test_json(self): 

258 input_dict = dict(servers=dict(a=(2, 3))) 

259 expected_json = '{"servers":{"a":[2,3]}}'.encode("utf-8") 

260 serializer = wsgi.JSONDictSerializer() 

261 result = serializer.serialize(input_dict) 

262 result = result.replace( 

263 '\n'.encode("utf-8"), ''.encode("utf-8")).replace( 

264 ' '.encode("utf-8"), ''.encode("utf-8")) 

265 self.assertEqual(expected_json, result) 

266 

267 

268class TextDeserializerTest(test.TestCase): 

269 def test_dispatch_default(self): 

270 deserializer = wsgi.TextDeserializer() 

271 self.assertEqual({}, deserializer.deserialize({}, 'update')) 

272 

273 

274class JSONDeserializerTest(test.TestCase): 

275 def test_json(self): 

276 data = """{"a": { 

277 "a1": "1", 

278 "a2": "2", 

279 "bs": ["1", "2", "3", {"c": {"c1": "1"}}], 

280 "d": {"e": "1"}, 

281 "f": "1"}}""" 

282 as_dict = { 

283 'body': { 

284 'a': { 

285 'a1': '1', 

286 'a2': '2', 

287 'bs': ['1', '2', '3', {'c': {'c1': '1'}}], 

288 'd': {'e': '1'}, 

289 'f': '1', 

290 }, 

291 }, 

292 } 

293 deserializer = wsgi.JSONDeserializer() 

294 self.assertEqual(as_dict, deserializer.deserialize(data)) 

295 

296 

297class ResourceTest(test.TestCase): 

298 def test_resource_call(self): 

299 class Controller(object): 

300 def index(self, req): 

301 return 'off' 

302 

303 req = webob.Request.blank('/tests') 

304 app = fakes.TestRouter(Controller()) 

305 response = req.get_response(app) 

306 self.assertEqual('off'.encode("utf-8"), response.body) 

307 self.assertEqual(200, response.status_int) 

308 

309 def test_resource_not_authorized(self): 

310 class Controller(object): 

311 def index(self, req): 

312 raise exception.NotAuthorized() 

313 

314 req = webob.Request.blank('/tests') 

315 app = fakes.TestRouter(Controller()) 

316 response = req.get_response(app) 

317 self.assertEqual(403, response.status_int) 

318 

319 def test_dispatch(self): 

320 class Controller(object): 

321 def index(self, req, pants=None): 

322 return pants 

323 

324 controller = Controller() 

325 resource = wsgi.Resource(controller) 

326 method, extensions = resource.get_method(None, 'index', None, '') 

327 actual = resource.dispatch(method, None, {'pants': 'off'}) 

328 expected = 'off' 

329 self.assertEqual(expected, actual) 

330 

331 def test_get_method_undefined_controller_action(self): 

332 class Controller(object): 

333 def index(self, req, pants=None): 

334 return pants 

335 

336 controller = Controller() 

337 resource = wsgi.Resource(controller) 

338 self.assertRaises(AttributeError, resource.get_method, 

339 None, 'create', None, '') 

340 

341 def test_get_method_action_json(self): 

342 class Controller(wsgi.Controller): 

343 @wsgi.action('fooAction') 

344 def _action_foo(self, req, id, body): 

345 return body 

346 

347 controller = Controller() 

348 resource = wsgi.Resource(controller) 

349 method, extensions = resource.get_method(None, 'action', 

350 'application/json', 

351 '{"fooAction": true}') 

352 self.assertEqual(controller._action_foo, method) 

353 

354 def test_get_method_action_bad_body(self): 

355 class Controller(wsgi.Controller): 

356 @wsgi.action('fooAction') 

357 def _action_foo(self, req, id, body): 

358 return body 

359 

360 controller = Controller() 

361 resource = wsgi.Resource(controller) 

362 self.assertRaises(exception.MalformedRequestBody, resource.get_method, 

363 None, 'action', 'application/json', '{}') 

364 

365 def test_get_method_unknown_controller_action(self): 

366 class Controller(wsgi.Controller): 

367 @wsgi.action('fooAction') 

368 def _action_foo(self, req, id, body): 

369 return body 

370 

371 controller = Controller() 

372 resource = wsgi.Resource(controller) 

373 self.assertRaises(KeyError, resource.get_method, 

374 None, 'action', 'application/json', 

375 '{"barAction": true}') 

376 

377 def test_get_method_action_method(self): 

378 class Controller(object): 

379 def action(self, req, pants=None): 

380 return pants 

381 

382 controller = Controller() 

383 resource = wsgi.Resource(controller) 

384 method, extensions = resource.get_method(None, 'action', 

385 'application/xml', 

386 '<fooAction>true</fooAction') 

387 self.assertEqual(controller.action, method) 

388 

389 def test_get_action_args(self): 

390 class Controller(object): 

391 def index(self, req, pants=None): 

392 return pants 

393 

394 controller = Controller() 

395 resource = wsgi.Resource(controller) 

396 

397 env = { 

398 'wsgiorg.routing_args': [None, { 

399 'controller': None, 

400 'format': None, 

401 'action': 'update', 

402 'id': 12, 

403 }], 

404 } 

405 

406 expected = {'action': 'update', 'id': 12} 

407 

408 self.assertEqual(expected, resource.get_action_args(env)) 

409 

410 def test_get_body_bad_content(self): 

411 class Controller(object): 

412 def index(self, req, pants=None): 

413 return pants 

414 

415 controller = Controller() 

416 resource = wsgi.Resource(controller) 

417 

418 request = wsgi.Request.blank('/', method='POST') 

419 request.headers['Content-Type'] = 'application/none' 

420 request.body = 'foo'.encode("utf-8") 

421 

422 content_type, body = resource.get_body(request) 

423 self.assertIsNone(content_type) 

424 self.assertEqual('', body) 

425 

426 def test_get_body_no_content_type(self): 

427 class Controller(object): 

428 def index(self, req, pants=None): 

429 return pants 

430 

431 controller = Controller() 

432 resource = wsgi.Resource(controller) 

433 

434 request = wsgi.Request.blank('/', method='POST') 

435 request.body = 'foo'.encode("utf-8") 

436 

437 content_type, body = resource.get_body(request) 

438 self.assertIsNone(content_type) 

439 self.assertEqual('', body) 

440 

441 def test_get_body_no_content_body(self): 

442 class Controller(object): 

443 def index(self, req, pants=None): 

444 return pants 

445 

446 controller = Controller() 

447 resource = wsgi.Resource(controller) 

448 

449 request = wsgi.Request.blank('/', method='POST') 

450 request.headers['Content-Type'] = 'application/json' 

451 request.body = ''.encode("utf-8") 

452 

453 content_type, body = resource.get_body(request) 

454 self.assertIsNone(content_type) 

455 self.assertEqual('', body) 

456 

457 def test_get_body(self): 

458 class Controller(object): 

459 def index(self, req, pants=None): 

460 return pants 

461 

462 controller = Controller() 

463 resource = wsgi.Resource(controller) 

464 

465 request = wsgi.Request.blank('/', method='POST') 

466 request.headers['Content-Type'] = 'application/json' 

467 request.body = 'foo'.encode("utf-8") 

468 

469 content_type, body = resource.get_body(request) 

470 self.assertEqual('application/json', content_type) 

471 self.assertEqual('foo'.encode("utf-8"), body) 

472 

473 def test_deserialize_badtype(self): 

474 class Controller(object): 

475 def index(self, req, pants=None): 

476 return pants 

477 

478 controller = Controller() 

479 resource = wsgi.Resource(controller) 

480 self.assertRaises(exception.InvalidContentType, 

481 resource.deserialize, 

482 controller.index, 'application/none', 'foo') 

483 

484 def test_deserialize_default(self): 

485 class JSONDeserializer(object): 

486 def deserialize(self, body): 

487 return 'json' 

488 

489 class XMLDeserializer(object): 

490 def deserialize(self, body): 

491 return 'xml' 

492 

493 class Controller(object): 

494 @wsgi.deserializers(xml=XMLDeserializer) 

495 def index(self, req, pants=None): 

496 return pants 

497 

498 controller = Controller() 

499 resource = wsgi.Resource(controller, json=JSONDeserializer) 

500 

501 obj = resource.deserialize(controller.index, 'application/json', 'foo') 

502 self.assertEqual('json', obj) 

503 

504 def test_deserialize_decorator(self): 

505 class JSONDeserializer(object): 

506 def deserialize(self, body): 

507 return 'json' 

508 

509 class Controller(object): 

510 def index(self, req, pants=None): 

511 return pants 

512 

513 controller = Controller() 

514 resource = wsgi.Resource(controller, json=JSONDeserializer) 

515 

516 obj = resource.deserialize(controller.index, 'application/json', 'foo') 

517 self.assertEqual('json', obj) 

518 

519 def test_register_actions(self): 

520 class Controller(object): 

521 def index(self, req, pants=None): 

522 return pants 

523 

524 class ControllerExtended(wsgi.Controller): 

525 @wsgi.action('fooAction') 

526 def _action_foo(self, req, id, body): 

527 return body 

528 

529 @wsgi.action('barAction') 

530 def _action_bar(self, req, id, body): 

531 return body 

532 

533 controller = Controller() 

534 resource = wsgi.Resource(controller) 

535 self.assertEqual({}, resource.wsgi_actions) 

536 extended = ControllerExtended() 

537 resource.register_actions(extended) 

538 self.assertEqual({'fooAction': extended._action_foo, 

539 'barAction': extended._action_bar, }, 

540 resource.wsgi_actions) 

541 

542 def test_register_extensions(self): 

543 class Controller(object): 

544 def index(self, req, pants=None): 

545 return pants 

546 

547 class ControllerExtended(wsgi.Controller): 

548 @wsgi.extends 

549 def index(self, req, resp_obj, pants=None): 

550 return None 

551 

552 @wsgi.extends(action='fooAction') 

553 def _action_foo(self, req, resp, id, body): 

554 return None 

555 

556 controller = Controller() 

557 resource = wsgi.Resource(controller) 

558 self.assertEqual({}, resource.wsgi_extensions) 

559 self.assertEqual({}, resource.wsgi_action_extensions) 

560 

561 extended = ControllerExtended() 

562 resource.register_extensions(extended) 

563 self.assertEqual({'index': [extended.index]}, resource.wsgi_extensions) 

564 self.assertEqual({'fooAction': [extended._action_foo]}, 

565 resource.wsgi_action_extensions) 

566 

567 def test_get_method_extensions(self): 

568 class Controller(object): 

569 def index(self, req, pants=None): 

570 return pants 

571 

572 class ControllerExtended(wsgi.Controller): 

573 @wsgi.extends 

574 def index(self, req, resp_obj, pants=None): 

575 return None 

576 

577 controller = Controller() 

578 extended = ControllerExtended() 

579 resource = wsgi.Resource(controller) 

580 resource.register_extensions(extended) 

581 method, extensions = resource.get_method(None, 'index', None, '') 

582 self.assertEqual(controller.index, method) 

583 self.assertEqual([extended.index], extensions) 

584 

585 def test_get_method_action_extensions(self): 

586 class Controller(wsgi.Controller): 

587 def index(self, req, pants=None): 

588 return pants 

589 

590 @wsgi.action('fooAction') 

591 def _action_foo(self, req, id, body): 

592 return body 

593 

594 class ControllerExtended(wsgi.Controller): 

595 @wsgi.extends(action='fooAction') 

596 def _action_foo(self, req, resp_obj, id, body): 

597 return None 

598 

599 controller = Controller() 

600 extended = ControllerExtended() 

601 resource = wsgi.Resource(controller) 

602 resource.register_extensions(extended) 

603 method, extensions = resource.get_method(None, 'action', 

604 'application/json', 

605 '{"fooAction": true}') 

606 self.assertEqual(controller._action_foo, method) 

607 self.assertEqual([extended._action_foo], extensions) 

608 

609 def test_get_method_action_whitelist_extensions(self): 

610 class Controller(wsgi.Controller): 

611 def index(self, req, pants=None): 

612 return pants 

613 

614 class ControllerExtended(wsgi.Controller): 

615 @wsgi.action('create') 

616 def _create(self, req, body): 

617 pass 

618 

619 @wsgi.action('delete') 

620 def _delete(self, req, id): 

621 pass 

622 

623 controller = Controller() 

624 extended = ControllerExtended() 

625 resource = wsgi.Resource(controller) 

626 resource.register_actions(extended) 

627 

628 method, extensions = resource.get_method(None, 'create', 

629 'application/json', 

630 '{"create": true}') 

631 self.assertEqual(extended._create, method) 

632 self.assertEqual([], extensions) 

633 

634 method, extensions = resource.get_method(None, 'delete', None, None) 

635 self.assertEqual(extended._delete, method) 

636 self.assertEqual([], extensions) 

637 

638 def test_pre_process_extensions_regular(self): 

639 class Controller(object): 

640 def index(self, req, pants=None): 

641 return pants 

642 

643 controller = Controller() 

644 resource = wsgi.Resource(controller) 

645 

646 called = [] 

647 

648 def extension1(req, resp_obj): 

649 called.append(1) 

650 return None 

651 

652 def extension2(req, resp_obj): 

653 called.append(2) 

654 return None 

655 

656 extensions = [extension1, extension2] 

657 response, post = resource.pre_process_extensions(extensions, None, {}) 

658 self.assertEqual([], called) 

659 self.assertIsNone(response) 

660 self.assertEqual([extension2, extension1], list(post)) 

661 

662 def test_pre_process_extensions_generator(self): 

663 class Controller(object): 

664 def index(self, req, pants=None): 

665 return pants 

666 

667 controller = Controller() 

668 resource = wsgi.Resource(controller) 

669 

670 called = [] 

671 

672 def extension1(req): 

673 called.append('pre1') 

674 resp_obj = yield 

675 self.assertIsNone(resp_obj) 

676 called.append('post1') 

677 

678 def extension2(req): 

679 called.append('pre2') 

680 resp_obj = yield 

681 self.assertIsNone(resp_obj) 

682 called.append('post2') 

683 

684 extensions = [extension1, extension2] 

685 response, post = resource.pre_process_extensions(extensions, None, {}) 

686 post = list(post) 

687 self.assertEqual(['pre1', 'pre2'], called) 

688 self.assertIsNone(response) 

689 self.assertEqual(2, len(post)) 

690 self.assertTrue(inspect.isgenerator(post[0])) 

691 self.assertTrue(inspect.isgenerator(post[1])) 

692 

693 for gen in post: 

694 try: 

695 gen.send(None) 

696 except StopIteration: 

697 continue 

698 

699 self.assertEqual(['pre1', 'pre2', 'post2', 'post1'], called) 

700 

701 def test_pre_process_extensions_generator_response(self): 

702 class Controller(object): 

703 def index(self, req, pants=None): 

704 return pants 

705 

706 controller = Controller() 

707 resource = wsgi.Resource(controller) 

708 

709 called = [] 

710 

711 def extension1(req): 

712 called.append('pre1') 

713 yield 'foo' 

714 

715 def extension2(req): 

716 called.append('pre2') 

717 

718 extensions = [extension1, extension2] 

719 response, post = resource.pre_process_extensions(extensions, None, {}) 

720 self.assertEqual(['pre1'], called) 

721 self.assertEqual('foo', response) 

722 self.assertEqual([], post) 

723 

724 def test_post_process_extensions_regular(self): 

725 class Controller(object): 

726 def index(self, req, pants=None): 

727 return pants 

728 

729 controller = Controller() 

730 resource = wsgi.Resource(controller) 

731 

732 called = [] 

733 

734 def extension1(req, resp_obj): 

735 called.append(1) 

736 return None 

737 

738 def extension2(req, resp_obj): 

739 called.append(2) 

740 return None 

741 

742 response = resource.post_process_extensions([extension2, extension1], 

743 None, None, {}) 

744 self.assertEqual([2, 1], called) 

745 self.assertIsNone(response) 

746 

747 def test_post_process_extensions_regular_response(self): 

748 class Controller(object): 

749 def index(self, req, pants=None): 

750 return pants 

751 

752 controller = Controller() 

753 resource = wsgi.Resource(controller) 

754 

755 called = [] 

756 

757 def extension1(req, resp_obj): 

758 called.append(1) 

759 return None 

760 

761 def extension2(req, resp_obj): 

762 called.append(2) 

763 return 'foo' 

764 

765 response = resource.post_process_extensions([extension2, extension1], 

766 None, None, {}) 

767 self.assertEqual([2], called) 

768 self.assertEqual('foo', response) 

769 

770 def test_post_process_extensions_version_not_found(self): 

771 class Controller(object): 

772 def index(self, req, pants=None): 

773 return pants 

774 

775 controller = Controller() 

776 resource = wsgi.Resource(controller) 

777 

778 called = [] 

779 

780 def extension1(req, resp_obj): 

781 called.append(1) 

782 return 'bar' 

783 

784 def extension2(req, resp_obj): 

785 raise exception.VersionNotFoundForAPIMethod(version='fake_version') 

786 

787 response = resource.post_process_extensions([extension2, extension1], 

788 None, None, {}) 

789 self.assertEqual([1], called) 

790 self.assertEqual('bar', response) 

791 

792 def test_post_process_extensions_generator(self): 

793 class Controller(object): 

794 def index(self, req, pants=None): 

795 return pants 

796 

797 controller = Controller() 

798 resource = wsgi.Resource(controller) 

799 

800 called = [] 

801 

802 def extension1(req): 

803 resp_obj = yield 

804 self.assertIsNone(resp_obj) 

805 called.append(1) 

806 

807 def extension2(req): 

808 resp_obj = yield 

809 self.assertIsNone(resp_obj) 

810 called.append(2) 

811 

812 ext1 = extension1(None) 

813 next(ext1) 

814 ext2 = extension2(None) 

815 next(ext2) 

816 

817 response = resource.post_process_extensions([ext2, ext1], 

818 None, None, {}) 

819 

820 self.assertEqual([2, 1], called) 

821 self.assertIsNone(response) 

822 

823 def test_post_process_extensions_generator_response(self): 

824 class Controller(object): 

825 def index(self, req, pants=None): 

826 return pants 

827 

828 controller = Controller() 

829 resource = wsgi.Resource(controller) 

830 

831 called = [] 

832 

833 def extension1(req): 

834 resp_obj = yield 

835 self.assertIsNone(resp_obj) 

836 called.append(1) 

837 

838 def extension2(req): 

839 resp_obj = yield 

840 self.assertIsNone(resp_obj) 

841 called.append(2) 

842 yield 'foo' 

843 

844 ext1 = extension1(None) 

845 next(ext1) 

846 ext2 = extension2(None) 

847 next(ext2) 

848 

849 response = resource.post_process_extensions([ext2, ext1], 

850 None, None, {}) 

851 

852 self.assertEqual([2], called) 

853 self.assertEqual('foo', response) 

854 

855 

856class ResponseObjectTest(test.TestCase): 

857 def test_default_code(self): 

858 robj = wsgi.ResponseObject({}) 

859 self.assertEqual(200, robj.code) 

860 

861 def test_modified_code(self): 

862 robj = wsgi.ResponseObject({}) 

863 robj._default_code = 202 

864 self.assertEqual(202, robj.code) 

865 

866 def test_override_default_code(self): 

867 robj = wsgi.ResponseObject({}, code=404) 

868 self.assertEqual(404, robj.code) 

869 

870 def test_override_modified_code(self): 

871 robj = wsgi.ResponseObject({}, code=404) 

872 robj._default_code = 202 

873 self.assertEqual(404, robj.code) 

874 

875 def test_set_header(self): 

876 robj = wsgi.ResponseObject({}) 

877 robj['Header'] = 'foo' 

878 self.assertEqual({'header': 'foo'}, robj.headers) 

879 

880 def test_get_header(self): 

881 robj = wsgi.ResponseObject({}) 

882 robj['Header'] = 'foo' 

883 self.assertEqual('foo', robj['hEADER']) 

884 

885 def test_del_header(self): 

886 robj = wsgi.ResponseObject({}) 

887 robj['Header'] = 'foo' 

888 del robj['hEADER'] 

889 self.assertNotIn('header', robj.headers) 

890 

891 def test_header_isolation(self): 

892 robj = wsgi.ResponseObject({}) 

893 robj['Header'] = 'foo' 

894 hdrs = robj.headers 

895 hdrs['hEADER'] = 'bar' 

896 self.assertEqual('foo', robj['hEADER']) 

897 

898 def test_default_serializers(self): 

899 robj = wsgi.ResponseObject({}) 

900 self.assertEqual({}, robj.serializers) 

901 

902 def test_bind_serializers(self): 

903 robj = wsgi.ResponseObject({}, json='foo') 

904 robj._bind_method_serializers(dict(xml='bar', json='baz')) 

905 self.assertEqual(dict(xml='bar', json='foo'), robj.serializers) 

906 

907 def test_get_serializer(self): 

908 robj = wsgi.ResponseObject({}, json='json', xml='xml', atom='atom') 

909 for content_type, mtype in wsgi._MEDIA_TYPE_MAP.items(): 

910 _mtype, serializer = robj.get_serializer(content_type) 

911 self.assertEqual(mtype, serializer) 

912 

913 def test_get_serializer_defaults(self): 

914 robj = wsgi.ResponseObject({}) 

915 default_serializers = dict(json='json', xml='xml', atom='atom') 

916 for content_type, mtype in wsgi._MEDIA_TYPE_MAP.items(): 

917 self.assertRaises(exception.InvalidContentType, 

918 robj.get_serializer, content_type) 

919 _mtype, serializer = robj.get_serializer(content_type, 

920 default_serializers) 

921 self.assertEqual(mtype, serializer) 

922 

923 def test_serialize(self): 

924 class JSONSerializer(object): 

925 def serialize(self, obj): 

926 return 'json'.encode("utf-8") 

927 

928 class XMLSerializer(object): 

929 def serialize(self, obj): 

930 return 'xml'.encode("utf-8") 

931 

932 class AtomSerializer(object): 

933 def serialize(self, obj): 

934 return 'atom'.encode("utf-8") 

935 

936 robj = wsgi.ResponseObject({}, code=202, 

937 json=JSONSerializer, 

938 xml=XMLSerializer, 

939 atom=AtomSerializer) 

940 robj['X-header1'] = 'header1' 

941 robj['X-header2'] = 'header2' 

942 

943 for content_type, mtype in wsgi._MEDIA_TYPE_MAP.items(): 

944 request = wsgi.Request.blank('/tests/123') 

945 response = robj.serialize(request, content_type) 

946 

947 self.assertEqual(content_type, response.headers['Content-Type']) 

948 self.assertEqual('header1', response.headers['X-header1']) 

949 self.assertEqual('header2', response.headers['X-header2']) 

950 self.assertEqual(202, response.status_int) 

951 self.assertEqual(mtype.encode("utf-8"), response.body) 

952 

953 

954class ValidBodyTest(test.TestCase): 

955 

956 def setUp(self): 

957 super(ValidBodyTest, self).setUp() 

958 self.controller = wsgi.Controller() 

959 

960 def test_is_valid_body(self): 

961 body = {'foo': {}} 

962 self.assertTrue(self.controller.is_valid_body(body, 'foo')) 

963 

964 def test_is_valid_body_none(self): 

965 self.assertFalse(self.controller.is_valid_body(None, 'foo')) 

966 

967 def test_is_valid_body_empty(self): 

968 self.assertFalse(self.controller.is_valid_body({}, 'foo')) 

969 

970 def test_is_valid_body_no_entity(self): 

971 body = {'bar': {}} 

972 self.assertFalse(self.controller.is_valid_body(body, 'foo')) 

973 

974 def test_is_valid_body_malformed_entity(self): 

975 body = {'foo': 'bar'} 

976 self.assertFalse(self.controller.is_valid_body(body, 'foo')) 

977 

978 

979class AuthorizeDecoratorTest(test.TestCase): 

980 class FakeController(wsgi.Controller): 

981 resource_name = 'fake_resource_name' 

982 

983 @wsgi.Controller.authorize 

984 def fake_action_1(self, req): 

985 pass 

986 

987 @wsgi.Controller.authorize('fake_action') 

988 def fake_action_2(self, req): 

989 pass 

990 

991 def setUp(self): 

992 super(AuthorizeDecoratorTest, self).setUp() 

993 self.controller = self.FakeController() 

994 self.admin_context = context.get_admin_context() 

995 self.user_context = fakes.FakeRequestContext 

996 self.mock_policy_check = self.mock_object(policy, 'check_policy') 

997 

998 def test_authorize_decorator_no_args(self): 

999 req = fakes.HTTPRequest.blank('/v2/fake/fake_id') 

1000 req.environ['manila.context'] = self.user_context 

1001 

1002 self.controller.fake_action_1(req) 

1003 

1004 self.mock_policy_check.assert_called_once_with( 

1005 self.user_context, self.controller.resource_name, 'fake_action_1') 

1006 

1007 def test_authorize_decorator_action_name(self): 

1008 req = fakes.HTTPRequest.blank('/v2/fake/fake_id') 

1009 req.environ['manila.context'] = self.admin_context 

1010 

1011 self.controller.fake_action_2(req) 

1012 

1013 self.mock_policy_check.assert_called_once_with( 

1014 self.admin_context, self.controller.resource_name, 'fake_action') 

1015 

1016 def test_authorize_exception(self): 

1017 req = fakes.HTTPRequest.blank('/v2/fake/fake_id') 

1018 req.environ['manila.context'] = self.admin_context 

1019 exc = exception.PolicyNotAuthorized(action='fake_action') 

1020 

1021 with mock.patch.object(policy, 'check_policy', 

1022 mock.Mock(side_effect=exc)): 

1023 self.assertRaises(webob.exc.HTTPForbidden, 

1024 self.controller.fake_action_2, req)